beryllw commented on code in PR #2161:
URL: https://github.com/apache/fluss/pull/2161#discussion_r2668355682


##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -454,6 +455,177 @@ void testInvalidPrefixLookup() throws Exception {
                                 + "because the lookup columns [b, a] must 
contain all bucket keys [a, b] in order.");
     }
 
+    @Test
+    void testSingleBucketPutAutoIncColumnAndLookup() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("col1", DataTypes.STRING())
+                        .withComment("col1 is first column")
+                        .column("col2", DataTypes.BIGINT())
+                        .withComment("col2 is second column, auto increment 
column")
+                        .column("col3", DataTypes.STRING())
+                        .withComment("col3 is third column")
+                        .enableAutoIncrement("col2")
+                        .primaryKey("col1")
+                        .build();
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder().schema(schema).distributedBy(1, 
"col1").build();
+        // create the table
+        TablePath tablePath =
+                TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), 
"test_pk_table_auto_inc");
+        createTable(tablePath, tableDescriptor, true);
+        Table autoIncTable = conn.getTable(tablePath);
+        UpsertWriter upsertWriter =
+                autoIncTable.newUpsert().partialUpdate("col1", 
"col3").createWriter();
+        Object[][] records = {
+            {"a", 0L, "batch1"},
+            {"b", 1L, "batch1"},
+            {"c", 2L, "batch1"},
+            {"d", 3L, "batch1"},
+            {"e", 4L, "batch1"}
+        };
+        for (Object[] record : records) {
+            upsertWriter.upsert(row(record[0], null, record[2]));
+        }
+        upsertWriter.flush();
+
+        Lookuper lookuper = autoIncTable.newLookup().createLookuper();
+        ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
+
+        for (Object[] record : records) {
+            assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row(record))))
+                    .withSchema(schema.getRowType())
+                    .isEqualTo(row(record));
+        }
+
+        for (Object[] record : records) {
+            record[2] = "batch2";
+            upsertWriter.upsert(row(record[0], null, record[2]));
+        }
+        upsertWriter.flush();
+
+        for (Object[] record : records) {
+            assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row(record))))
+                    .withSchema(schema.getRowType())
+                    .isEqualTo(row(record));
+        }
+
+        admin.alterTable(
+                        tablePath,
+                        Collections.singletonList(
+                                TableChange.addColumn(
+                                        "col4",
+                                        DataTypes.INT(),
+                                        null,
+                                        TableChange.ColumnPosition.last())),
+                        false)
+                .get();
+        Table newSchemaTable = conn.getTable(tablePath);
+        Schema newSchema = newSchemaTable.getTableInfo().getSchema();
+        Lookuper newLookuper = newSchemaTable.newLookup().createLookuper();
+
+        Object[][] recordsWithNewSchema = {
+            {"a", 0L, "batch2", 10},
+            {"b", 1L, "batch2", 11},
+            {"c", 2L, "batch2", 12},
+            {"d", 3L, "batch2", 13},
+            {"e", 4L, "batch2", 14}
+        };
+        // schema change case1: read new data with new schema.
+        for (Object[] record : recordsWithNewSchema) {
+            assertThatRow(lookupRow(newLookuper, 
keyRow.replaceRow(row(record))))
+                    .withSchema(newSchema.getRowType())
+                    .isEqualTo(row(record[0], record[1], record[2], null));
+        }
+
+        // schema change case2: update new data with new schema.
+        UpsertWriter newUpsertWriter =
+                newSchemaTable.newUpsert().partialUpdate("col1", "col3", 
"col4").createWriter();
+        for (Object[] record : recordsWithNewSchema) {
+            record[2] = "batch3";
+            newUpsertWriter.upsert(row(record));
+        }
+        newUpsertWriter.flush();
+
+        // schema change case3: read data with old schema.
+        for (Object[] record : records) {
+            record[2] = "batch3";
+            assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row(record))))
+                    .withSchema(schema.getRowType())
+                    .isEqualTo(row(record));
+        }
+
+        // schema change case4: read data with new schema.
+        for (Object[] record : recordsWithNewSchema) {
+            assertThatRow(lookupRow(newLookuper, 
keyRow.replaceRow(row(record))))
+                    .withSchema(newSchema.getRowType())
+                    .isEqualTo(row(record));
+        }
+    }
+
+    @Test
+    void testPutAutoIncColumnAndLookup() throws Exception {

Review Comment:
   We should also test partitioned and multi-bucket scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to