This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6df6861af [FLINK-31479] Close blocking iterators in
ContinuousFileStoreITCase (#611)
6df6861af is described below
commit 6df6861af728c5007b5d3fe8d0caf8dc56620376
Author: GuojunLi <[email protected]>
AuthorDate: Wed Mar 22 18:30:19 2023 +0800
[FLINK-31479] Close blocking iterators in ContinuousFileStoreITCase (#611)
---
.../paimon/flink/ContinuousFileStoreITCase.java | 19 ++++++++++++-------
.../paimon/flink/ContinuousFileStoreITCase.java | 19 ++++++++++++-------
.../paimon/flink/ContinuousFileStoreITCase.java | 13 ++++++++-----
3 files changed, 32 insertions(+), 19 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index a769ca633..21738eed1 100644
---
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
-import java.util.concurrent.TimeoutException;
import static
org.apache.paimon.flink.FlinkConnectorOptions.STREAMING_READ_ATOMIC;
import static org.assertj.core.api.Assertions.assertThat;
@@ -115,7 +114,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
testProjection("T2");
}
- private void testSimple(String table) throws TimeoutException {
+ private void testSimple(String table) throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
@@ -125,9 +124,10 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7",
"8", "9"));
+ iterator.close();
}
- private void testProjection(String table) throws TimeoutException {
+ private void testProjection(String table) throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT b, c FROM %s",
table));
@@ -137,10 +137,11 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("8",
"9"));
+ iterator.close();
}
@Test
- public void testContinuousLatestChangelogFileTrue() throws
TimeoutException {
+ public void testContinuousLatestChangelogFileTrue() throws Exception {
changelogFile = true;
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
@@ -151,10 +152,11 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10",
"11", "12"));
+ iterator.close();
}
@Test
- public void testContinuousLatestChangelogFileFalse() throws
TimeoutException {
+ public void testContinuousLatestChangelogFileFalse() throws Exception {
changelogFile = false;
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
@@ -165,6 +167,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10",
"11", "12"));
+ iterator.close();
}
@Test
@@ -424,7 +427,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testIgnoreOverwriteChangelogTrue() throws TimeoutException {
+ public void testIgnoreOverwriteChangelogTrue() throws Exception {
changelogFile = true;
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
@@ -438,10 +441,11 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9",
"10", "11"));
+ iterator.close();
}
@Test
- public void testIgnoreOverwriteChangelogFalse() throws TimeoutException {
+ public void testIgnoreOverwriteChangelogFalse() throws Exception {
changelogFile = false;
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
@@ -455,6 +459,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9",
"10", "11"));
+ iterator.close();
}
@Test
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index a769ca633..21738eed1 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
-import java.util.concurrent.TimeoutException;
import static
org.apache.paimon.flink.FlinkConnectorOptions.STREAMING_READ_ATOMIC;
import static org.assertj.core.api.Assertions.assertThat;
@@ -115,7 +114,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
testProjection("T2");
}
- private void testSimple(String table) throws TimeoutException {
+ private void testSimple(String table) throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
@@ -125,9 +124,10 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7",
"8", "9"));
+ iterator.close();
}
- private void testProjection(String table) throws TimeoutException {
+ private void testProjection(String table) throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT b, c FROM %s",
table));
@@ -137,10 +137,11 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("8",
"9"));
+ iterator.close();
}
@Test
- public void testContinuousLatestChangelogFileTrue() throws
TimeoutException {
+ public void testContinuousLatestChangelogFileTrue() throws Exception {
changelogFile = true;
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
@@ -151,10 +152,11 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10",
"11", "12"));
+ iterator.close();
}
@Test
- public void testContinuousLatestChangelogFileFalse() throws
TimeoutException {
+ public void testContinuousLatestChangelogFileFalse() throws Exception {
changelogFile = false;
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
@@ -165,6 +167,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10",
"11", "12"));
+ iterator.close();
}
@Test
@@ -424,7 +427,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testIgnoreOverwriteChangelogTrue() throws TimeoutException {
+ public void testIgnoreOverwriteChangelogTrue() throws Exception {
changelogFile = true;
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
@@ -438,10 +441,11 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9",
"10", "11"));
+ iterator.close();
}
@Test
- public void testIgnoreOverwriteChangelogFalse() throws TimeoutException {
+ public void testIgnoreOverwriteChangelogFalse() throws Exception {
changelogFile = false;
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
@@ -455,6 +459,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9",
"10", "11"));
+ iterator.close();
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index e09e5b037..851a3603a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -36,7 +36,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
-import java.util.concurrent.TimeoutException;
import static
org.apache.paimon.flink.FlinkConnectorOptions.STREAMING_READ_ATOMIC;
import static org.assertj.core.api.Assertions.assertThat;
@@ -92,7 +91,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
testProjection("T2");
}
- private void testSimple(String table) throws TimeoutException {
+ private void testSimple(String table) throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
@@ -102,9 +101,10 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7",
"8", "9"));
+ iterator.close();
}
- private void testProjection(String table) throws TimeoutException {
+ private void testProjection(String table) throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT b, c FROM %s",
table));
@@ -114,10 +114,11 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("8",
"9"));
+ iterator.close();
}
@TestTemplate
- public void testContinuousLatest() throws TimeoutException {
+ public void testContinuousLatest() throws Exception {
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
BlockingIterator<Row, Row> iterator =
@@ -127,6 +128,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10",
"11", "12"));
+ iterator.close();
}
@TestTemplate
@@ -260,7 +262,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@TestTemplate
- public void testIgnoreOverwrite() throws TimeoutException {
+ public void testIgnoreOverwrite() throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
@@ -273,6 +275,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9",
"10", "11"));
+ iterator.close();
}
@TestTemplate