tsreaper commented on code in PR #4445:
URL: https://github.com/apache/paimon/pull/4445#discussion_r1828666439


##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java:
##########
@@ -335,6 +338,77 @@ public void testBatchJobWithConflictAndRestart() throws 
Exception {
         }
     }
 
+    @Timeout(120)
+    @ParameterizedTest()
+    @ValueSource(booleans = {false, true})
+    public void testRecreateTableWithException(boolean isReloadData) throws 
Exception {
+        TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
+        bEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+        bEnv.executeSql("USE CATALOG testCatalog");
+        bEnv.executeSql(
+                "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) 
NOT ENFORCED ) "
+                        + "PARTITIONED BY (pt) "
+                        + "WITH ("
+                        + "    'bucket' = '2'\n"
+                        + ")");

Review Comment:
   Set `continuous.discovery-interval` to a smaller value (for example `1s` or 
`500ms`) so this test can run faster. No need to wait for 10 seconds.
   



##########
paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java:
##########
@@ -45,8 +45,19 @@ public Snapshot getNextSnapshot(long nextSnapshotId) {
         }
 
         Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
         // No snapshot now
         if (earliestSnapshotId == null || earliestSnapshotId <= 
nextSnapshotId) {
+            if ((earliestSnapshotId == null && nextSnapshotId > 1)
+                    || (latestSnapshotId != null && nextSnapshotId > 
latestSnapshotId + 1)) {
+                throw new OutOfRangeException(
+                        String.format(
+                                "The next expected snapshot is too big! Most 
possible cause might be the table had been recreated."
+                                        + "The next snapshot id is %d, while 
the latest snapshot id is %s",
+                                nextSnapshotId,
+                                latestSnapshotId == null ? "null" : 
latestSnapshotId));

Review Comment:
   ```suggestion
                                   latestSnapshotId));
   ```



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java:
##########
@@ -335,6 +338,77 @@ public void testBatchJobWithConflictAndRestart() throws 
Exception {
         }
     }
 
+    @Timeout(120)
+    @ParameterizedTest()
+    @ValueSource(booleans = {false, true})
+    public void testRecreateTableWithException(boolean isReloadData) throws 
Exception {
+        TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
+        bEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+        bEnv.executeSql("USE CATALOG testCatalog");
+        bEnv.executeSql(
+                "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) 
NOT ENFORCED ) "
+                        + "PARTITIONED BY (pt) "
+                        + "WITH ("
+                        + "    'bucket' = '2'\n"
+                        + ")");
+
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .parallelism(4)
+                        .checkpointIntervalMs(1000)
+                        .build();
+        sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+        sEnv.executeSql("USE CATALOG testCatalog");
+
+        // first write
+        List<String> values = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            values.add(String.format("(0, %d, %d)", i, i));
+            values.add(String.format("(1, %d, %d)", i, i));
+        }
+        bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", 
values)).await();
+
+        // second write
+        values.clear();
+        for (int i = 0; i < 10; i++) {
+            values.add(String.format("(0, %d, %d)", i, i + 1));
+            values.add(String.format("(1, %d, %d)", i, i + 1));
+        }
+        bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", 
values)).await();
+
+        // start a read job
+        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
t").collect();
+
+        // wait the read job to read the current table
+        Thread.sleep(10000);

Review Comment:
   Assert that all snapshots are read by checking the result of `SELECT`. Don't 
sleep for this long!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to