This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6df6861af [FLINK-31479] Close blocking iterators in 
ContinuousFileStoreITCase (#611)
6df6861af is described below

commit 6df6861af728c5007b5d3fe8d0caf8dc56620376
Author: GuojunLi <[email protected]>
AuthorDate: Wed Mar 22 18:30:19 2023 +0800

    [FLINK-31479] Close blocking iterators in ContinuousFileStoreITCase (#611)
---
 .../paimon/flink/ContinuousFileStoreITCase.java       | 19 ++++++++++++-------
 .../paimon/flink/ContinuousFileStoreITCase.java       | 19 ++++++++++++-------
 .../paimon/flink/ContinuousFileStoreITCase.java       | 13 ++++++++-----
 3 files changed, 32 insertions(+), 19 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index a769ca633..21738eed1 100644
--- 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.STREAMING_READ_ATOMIC;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -115,7 +114,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         testProjection("T2");
     }
 
-    private void testSimple(String table) throws TimeoutException {
+    private void testSimple(String table) throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
 
@@ -125,9 +124,10 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", 
"8", "9"));
+        iterator.close();
     }
 
-    private void testProjection(String table) throws TimeoutException {
+    private void testProjection(String table) throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT b, c FROM %s", 
table));
 
@@ -137,10 +137,11 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("8", 
"9"));
+        iterator.close();
     }
 
     @Test
-    public void testContinuousLatestChangelogFileTrue() throws 
TimeoutException {
+    public void testContinuousLatestChangelogFileTrue() throws Exception {
         changelogFile = true;
         batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
 
@@ -151,10 +152,11 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", 
"11", "12"));
+        iterator.close();
     }
 
     @Test
-    public void testContinuousLatestChangelogFileFalse() throws 
TimeoutException {
+    public void testContinuousLatestChangelogFileFalse() throws Exception {
         changelogFile = false;
         batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
 
@@ -165,6 +167,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", 
"11", "12"));
+        iterator.close();
     }
 
     @Test
@@ -424,7 +427,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testIgnoreOverwriteChangelogTrue() throws TimeoutException {
+    public void testIgnoreOverwriteChangelogTrue() throws Exception {
         changelogFile = true;
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
@@ -438,10 +441,11 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9", 
"10", "11"));
+        iterator.close();
     }
 
     @Test
-    public void testIgnoreOverwriteChangelogFalse() throws TimeoutException {
+    public void testIgnoreOverwriteChangelogFalse() throws Exception {
         changelogFile = false;
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
@@ -455,6 +459,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9", 
"10", "11"));
+        iterator.close();
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index a769ca633..21738eed1 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.STREAMING_READ_ATOMIC;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -115,7 +114,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         testProjection("T2");
     }
 
-    private void testSimple(String table) throws TimeoutException {
+    private void testSimple(String table) throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
 
@@ -125,9 +124,10 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", 
"8", "9"));
+        iterator.close();
     }
 
-    private void testProjection(String table) throws TimeoutException {
+    private void testProjection(String table) throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT b, c FROM %s", 
table));
 
@@ -137,10 +137,11 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("8", 
"9"));
+        iterator.close();
     }
 
     @Test
-    public void testContinuousLatestChangelogFileTrue() throws 
TimeoutException {
+    public void testContinuousLatestChangelogFileTrue() throws Exception {
         changelogFile = true;
         batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
 
@@ -151,10 +152,11 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", 
"11", "12"));
+        iterator.close();
     }
 
     @Test
-    public void testContinuousLatestChangelogFileFalse() throws 
TimeoutException {
+    public void testContinuousLatestChangelogFileFalse() throws Exception {
         changelogFile = false;
         batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
 
@@ -165,6 +167,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", 
"11", "12"));
+        iterator.close();
     }
 
     @Test
@@ -424,7 +427,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testIgnoreOverwriteChangelogTrue() throws TimeoutException {
+    public void testIgnoreOverwriteChangelogTrue() throws Exception {
         changelogFile = true;
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
@@ -438,10 +441,11 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9", 
"10", "11"));
+        iterator.close();
     }
 
     @Test
-    public void testIgnoreOverwriteChangelogFalse() throws TimeoutException {
+    public void testIgnoreOverwriteChangelogFalse() throws Exception {
         changelogFile = false;
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
@@ -455,6 +459,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9", 
"10", "11"));
+        iterator.close();
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index e09e5b037..851a3603a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -36,7 +36,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.STREAMING_READ_ATOMIC;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -92,7 +91,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         testProjection("T2");
     }
 
-    private void testSimple(String table) throws TimeoutException {
+    private void testSimple(String table) throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
 
@@ -102,9 +101,10 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", 
"8", "9"));
+        iterator.close();
     }
 
-    private void testProjection(String table) throws TimeoutException {
+    private void testProjection(String table) throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT b, c FROM %s", 
table));
 
@@ -114,10 +114,11 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("8", 
"9"));
+        iterator.close();
     }
 
     @TestTemplate
-    public void testContinuousLatest() throws TimeoutException {
+    public void testContinuousLatest() throws Exception {
         batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
 
         BlockingIterator<Row, Row> iterator =
@@ -127,6 +128,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", 
"11", "12"));
+        iterator.close();
     }
 
     @TestTemplate
@@ -260,7 +262,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @TestTemplate
-    public void testIgnoreOverwrite() throws TimeoutException {
+    public void testIgnoreOverwrite() throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
 
@@ -273,6 +275,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9", 
"10", "11"));
+        iterator.close();
     }
 
     @TestTemplate

Reply via email to