tsreaper commented on code in PR #4445:
URL: https://github.com/apache/paimon/pull/4445#discussion_r1830328316


##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java:
##########
@@ -335,6 +338,88 @@ public void testBatchJobWithConflictAndRestart() throws 
Exception {
         }
     }
 
+    @Timeout(120)
+    @ParameterizedTest()
+    @ValueSource(booleans = {false, true})
+    public void testRecreateTableWithException(boolean isReloadData) throws 
Exception {
+        TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
+        bEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+        bEnv.executeSql("USE CATALOG testCatalog");
+        bEnv.executeSql(
+                "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) 
NOT ENFORCED ) "
+                        + "PARTITIONED BY (pt) "
+                        + "WITH ("
+                        + "    'bucket' = '2'\n"
+                        + "    ,'continuous.discovery-interval' = '1s'\n"
+                        + ")");
+
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .parallelism(4)
+                        .checkpointIntervalMs(1000)
+                        .build();
+        sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+        sEnv.executeSql("USE CATALOG testCatalog");
+        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
t").collect();
+
+        // first write
+        List<String> values = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            values.add(String.format("(0, %d, %d)", i, i));
+            values.add(String.format("(1, %d, %d)", i, i));
+        }
+        bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", 
values)).await();
+        List<Row> expected = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            expected.add(Row.ofKind(RowKind.INSERT, 0, i, i));
+            expected.add(Row.ofKind(RowKind.INSERT, 1, i, i));
+        }
+        assertStreamingResult(it, expected);
+
+        // second write
+        values.clear();
+        for (int i = 0; i < 10; i++) {
+            values.add(String.format("(0, %d, %d)", i, i + 1));
+            values.add(String.format("(1, %d, %d)", i, i + 1));
+        }
+        bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", 
values)).await();
+
+        // start a read job
+        for (int i = 0; i < 10; i++) {
+            expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 0, i, i));
+            expected.add(Row.ofKind(RowKind.UPDATE_BEFORE, 1, i, i));
+            expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 0, i, i + 1));
+            expected.add(Row.ofKind(RowKind.UPDATE_AFTER, 1, i, i + 1));
+        }
+        assertStreamingResult(it, expected.subList(20, 60));
+
+        // delete table and recreate a same table
+        bEnv.executeSql("DROP TABLE t");
+        bEnv.executeSql(
+                "CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) 
NOT ENFORCED ) "
+                        + "PARTITIONED BY (pt) "
+                        + "WITH ("
+                        + "    'bucket' = '2'\n"
+                        + ")");
+
+        // if reload data, it will generate a new snapshot for recreated table
+        if (isReloadData) {
+            bEnv.executeSql("INSERT INTO t VALUES " + String.join(", ", 
values)).await();
+        }
+        assertThatCode(
+                        () -> {
+                            while (true) {
+                                if (it.hasNext()) {
+                                    it.next();
+                                }
+                            }
+                        })

Review Comment:
   ```suggestion
           assertThatCode(() -> it.next())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to