tsreaper commented on code in PR #4445:
URL: https://github.com/apache/paimon/pull/4445#discussion_r1828666439
##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java:
##########
@@ -335,6 +338,77 @@ public void testBatchJobWithConflictAndRestart() throws
Exception {
}
}
+ @Timeout(120)
+ @ParameterizedTest()
+ @ValueSource(booleans = {false, true})
+ public void testRecreateTableWithException(boolean isReloadData) throws
Exception {
+ TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
+ bEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ bEnv.executeSql("USE CATALOG testCatalog");
+ bEnv.executeSql(
+ "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k)
NOT ENFORCED ) "
+ + "PARTITIONED BY (pt) "
+ + "WITH ("
+ + " 'bucket' = '2'\n"
+ + ")");
Review Comment:
Set `continuous.discovery-interval` to a smaller value (for example `1s` or
`500ms`) so this test can run faster. No need to wait for 10 seconds.
##########
paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java:
##########
@@ -45,8 +45,19 @@ public Snapshot getNextSnapshot(long nextSnapshotId) {
}
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
// No snapshot now
if (earliestSnapshotId == null || earliestSnapshotId <=
nextSnapshotId) {
+ if ((earliestSnapshotId == null && nextSnapshotId > 1)
+ || (latestSnapshotId != null && nextSnapshotId >
latestSnapshotId + 1)) {
+ throw new OutOfRangeException(
+ String.format(
+ "The next expected snapshot is too big! Most
possible cause might be the table had been recreated."
+ + "The next snapshot id is %d, while
the latest snapshot id is %s",
+ nextSnapshotId,
+ latestSnapshotId == null ? "null" :
latestSnapshotId));
Review Comment:
```suggestion
latestSnapshotId));
```
##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java:
##########
@@ -335,6 +338,77 @@ public void testBatchJobWithConflictAndRestart() throws
Exception {
}
}
+ @Timeout(120)
+ @ParameterizedTest()
+ @ValueSource(booleans = {false, true})
+ public void testRecreateTableWithException(boolean isReloadData) throws
Exception {
+ TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
+ bEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ bEnv.executeSql("USE CATALOG testCatalog");
+ bEnv.executeSql(
+ "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k)
NOT ENFORCED ) "
+ + "PARTITIONED BY (pt) "
+ + "WITH ("
+ + " 'bucket' = '2'\n"
+ + ")");
+
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .parallelism(4)
+ .checkpointIntervalMs(1000)
+ .build();
+ sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ sEnv.executeSql("USE CATALOG testCatalog");
+
+ // first write
+ List<String> values = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ values.add(String.format("(0, %d, %d)", i, i));
+ values.add(String.format("(1, %d, %d)", i, i));
+ }
+ bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ",
values)).await();
+
+ // second write
+ values.clear();
+ for (int i = 0; i < 10; i++) {
+ values.add(String.format("(0, %d, %d)", i, i + 1));
+ values.add(String.format("(1, %d, %d)", i, i + 1));
+ }
+ bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ",
values)).await();
+
+ // start a read job
+ CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM
t").collect();
+
+ // wait the read job to read the current table
+ Thread.sleep(10000);
Review Comment:
Assert that all snapshots are read by checking the result of `SELECT`. Don't
sleep for this long!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]