JingsongLi commented on code in PR #493:
URL: https://github.com/apache/flink-table-store/pull/493#discussion_r1094325477


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java:
##########
@@ -225,56 +225,54 @@ public static void checkFileStorePath(String table, 
List<String> partitionSpec)
     public static void testBatchRead(String query, List<Row> expected) throws 
Exception {
         CloseableIterator<Row> resultItr = bEnv.executeSql(query).collect();
         try (BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(resultItr)) {
-            if (expected.isEmpty()) {
-                assertThat(resultItr.hasNext()).isFalse();
-            } else {
+            if (!expected.isEmpty()) {
                 assertThat(
                                 iterator.collect(
                                         expected.size(), TIME_OUT.getSize(), 
TIME_OUT.getUnit()))
                         .containsExactlyInAnyOrderElementsOf(expected);
             }
+            assertThat(resultItr.hasNext()).isFalse();
         }
     }
 
+    // Note: if expected is empty, the iterator will be closed
     public static BlockingIterator<Row, Row> testStreamingRead(String query, 
List<Row> expected)
             throws Exception {
         BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
-
-        if (expected.isEmpty()) {
-            assertNoMoreRecords(iterator);
-        } else {
-            assertThat(iterator.collect(expected.size()))
-                    .containsExactlyInAnyOrderElementsOf(expected);
-        }
-
+        validateStreamingReadResult(iterator, expected);
         return iterator;
     }
 
+    // Note: if expected is empty, the iterator will be closed
     public static BlockingIterator<Row, Row> testStreamingReadWithReadFirst(
             String source, String sink, String query, List<Row> expected) 
throws Exception {
         BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
-
         insertIntoFromTable(source, sink);
+        validateStreamingReadResult(iterator, expected);
+        return iterator;
+    }
 
+    // Note: if expected is empty, the iterator will be closed
+    public static void validateStreamingReadResult(
+            BlockingIterator<Row, Row> streamingItr, List<Row> expected) 
throws Exception {
         if (expected.isEmpty()) {
-            assertNoMoreRecords(iterator);
+            assertNoMoreRecords(streamingItr);
         } else {
-            assertThat(iterator.collect(expected.size()))
+            assertThat(streamingItr.collect(expected.size()))
                     .containsExactlyInAnyOrderElementsOf(expected);
         }
-
-        return iterator;
     }
 
+    // Note: the iterator will be closed
     public static void assertNoMoreRecords(BlockingIterator<Row, Row> 
iterator) throws Exception {
         List<Row> expectedRecords = Collections.emptyList();
         try {
             // set expectation size to 1 to let time pass by until timeout
             // just wait 5s to avoid too long time
             expectedRecords = iterator.collect(1, 5L, TimeUnit.SECONDS);
-            iterator.close();
         } catch (TimeoutException ignored) {
             // don't throw exception
+            iterator.close();

Review Comment:
   We should not close iterator here, `assertNoMoreRecords` should be equal to 
`iterator.collect(expected.size())`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to