Copilot commented on code in PR #2464:
URL: https://github.com/apache/fluss/pull/2464#discussion_r2723861094


##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussLakeTableITCase.java:
##########
@@ -262,11 +268,12 @@ private Map<TableBucket, List<InternalRow>> 
writeRowsAndVerifyBucket(
         TableInfo tableInfo = admin.getTableInfo(tablePath).get();
         long tableId = tableInfo.getTableId();
         DataLakeFormat dataLakeFormat = 
tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
-        int rowNums = 30;
+        int rowNums = 1;

Review Comment:
   Changing the number of test rows from 30 to 1 significantly reduces test 
coverage. This change appears unrelated to the lake fix purpose and may reduce 
the test's effectiveness at catching bugs. If this change is necessary, it 
should be documented or explained. If it's for debugging purposes, it should be 
reverted to the original value of 30.



##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -1283,6 +1283,21 @@ public class ConfigOptions {
                             "The format of the kv records in kv store. The 
default value is `compacted`. "
                                     + "The supported formats are `compacted` 
and `indexed`.");
 
+    /**
+     * The version of the kv format. This is used for backward compatibility 
when encoding strategy
+     * changes. Version 0 (absent): Old tables use datalake encoding for both 
primary key and bucket
+     * key. Version 1: New tables use Fluss encoding for primary key (to 
support prefix lookup) and
+     * datalake encoding for bucket key (to align with datalake bucket).
+     */
+    public static final ConfigOption<Integer> TABLE_KV_FORMAT_VERSION =
+            key("table.kv.format.version")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The version of the kv format for backward 
compatibility. "
+                                    + "When absent (old tables), primary key 
encoding may differ from new tables. "
+                                    + "Version 1 indicates new encoding 
strategy for datalake tables.");

Review Comment:
   The documentation describes version 1 as the new encoding strategy, but the 
code actually uses version 2 (as seen in KeyEncoder.java line 81 and 
CoordinatorService.java line 183). The documentation should be updated to 
reflect that version 2 is the new encoding strategy, not version 1. 
Additionally, the comment in ConfigOptions mentions "Version 0 (absent)" and 
"Version 1" but the actual implementation uses version 2.



##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -305,10 +309,10 @@ void testPutAndPrefixLookup() throws Exception {
         createTable(tablePath, descriptor, false);
         Table table = conn.getTable(tablePath);
         TableInfo tableInfo = table.getTableInfo();
-        verifyPutAndLookup(table, new Object[] {1, "a", 1L, "value1"});
-        verifyPutAndLookup(table, new Object[] {1, "a", 2L, "value2"});
-        verifyPutAndLookup(table, new Object[] {1, "a", 3L, "value3"});
-        verifyPutAndLookup(table, new Object[] {2, "a", 4L, "value4"});
+        verifyPutAndLookup(table, new Object[] {1, "a", 1L, "valuevalue1"});
+        verifyPutAndLookup(table, new Object[] {1, "a", 2L, "valuevalue2"});
+        verifyPutAndLookup(table, new Object[] {1, "a", 3L, "valuevalue3"});
+        verifyPutAndLookup(table, new Object[] {2, "a", 4L, "valuevalue4"});

Review Comment:
   The test value strings are being changed from "value1" to "valuevalue1" (and 
similar for other values). This appears to be unrelated to the lake fix purpose 
of this PR. If these changes are intentional for some reason related to the 
fix, they should be documented or explained, as changing test data without 
explanation makes it harder to review. If these are accidental changes, they 
should be reverted.



##########
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java:
##########
@@ -592,9 +606,14 @@ public void lookups(
         responseCallback.accept(lookupResultForBucketMap);
     }
 
-    /** Lookup multi prefixKeys by prefix scan on kv store. */
+    /**
+     * Lookup multi prefixKeys by prefix scan on kv store.
+     *
+     * @param apiVersion the client API version for backward compatibility 
validation
+     */
     public void prefixLookups(
             Map<TableBucket, List<byte[]>> entriesPerBucket,
+            int apiVersion,

Review Comment:
   The apiVersion parameter type is inconsistent. This method uses `int 
apiVersion` while other methods like `lookups()` (line 577) and 
`putRecordsToKv()` (line 533) use `short apiVersion`. The API version should be 
consistently typed as `short` across all methods since it's retrieved from 
Session.getApiVersion() which returns a short.
   ```suggestion
               short apiVersion,
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -497,6 +501,20 @@ private TableDescriptor applySystemDefaults(
                 newDescriptor = newDescriptor.withProperties(newProperties);
             }
         }
+
+        if (newDescriptor.hasPrimaryKey()) {
+            Map<String, String> newProperties = new 
HashMap<>(newDescriptor.getProperties());
+            String formatVersion = 
newProperties.get(ConfigOptions.TABLE_KV_FORMAT_VERSION.key());
+            if (formatVersion != null) {
+                throw new IllegalArgumentException("todo");

Review Comment:
   The error message is a placeholder "todo" which should not be in production 
code. This exception is thrown when a user manually sets the kv format version, 
which should either be forbidden or have a proper error message explaining why 
manual setting is not allowed.
   ```suggestion
                   throw new IllegalArgumentException(
                           "Manual configuration of '"
                                   + ConfigOptions.TABLE_KV_FORMAT_VERSION.key()
                                   + "' is not supported. The coordinator 
chooses the appropriate "
                                   + "KV format version automatically. Remove 
this property from the "
                                   + "table properties (current value: '"
                                   + formatVersion
                                   + "').");
   ```



##########
fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java:
##########
@@ -1882,6 +1906,92 @@ private void verifyPrefixLookup(
         }
     }
 
+    @Test
+    void testOldClientVersionRejectNewFormatTable() throws Exception {
+        // Test that old client version (version 0) is rejected for new format 
tables
+        // with non-default bucket key. This is because new tables use 
CompactedKeyEncoder
+        // which is incompatible with old clients that expect lake's encoder.
+        TablePath tablePath = TablePath.of("test_db", "new_format_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.STRING())
+                        .primaryKey("a", "b")
+                        .build();
+        RowType rowType = schema.getRowType();
+        RowType keyType =
+                DataTypes.ROW(
+                        new DataField("a", DataTypes.INT()),
+                        new DataField("b", DataTypes.STRING()));
+
+        // Create table with:
+        // 1. kv format version 2 (new format)
+        // 2. non-default bucket key (a is subset of pk (a, b))
+        // 3. data lake format (paimon)
+        Map<String, String> properties = new HashMap<>();
+        properties.put(
+                ConfigOptions.TABLE_KV_FORMAT_VERSION.key(), 
String.valueOf(KV_FORMAT_VERSION_2));
+        properties.put(ConfigOptions.TABLE_DATALAKE_FORMAT.key(), 
DataLakeFormat.PAIMON.toString());
+        long tableId =
+                registerTableInZkClient(
+                        tablePath,
+                        schema,
+                        2998232L,
+                        Collections.singletonList("a"), // bucket key is 
subset of pk
+                        properties);
+        TableBucket tb = new TableBucket(tableId, 0);
+        makeKvTableAsLeader(tableId, tablePath, tb.getBucket());
+
+        List<Tuple2<Object[], Object[]>> data =
+                Collections.singletonList(
+                        Tuple2.of(new Object[] {1, "a"}, new Object[] {1, "a", 
"value1"}));
+
+        // Test putKvRecords with old client version (version 0) - should fail
+        short oldClientVersion = 0;
+        CompletableFuture<List<PutKvResultForBucket>> putFuture = new 
CompletableFuture<>();
+        replicaManager.putRecordsToKv(
+                20000,
+                1,
+                Collections.singletonMap(tb, genKvRecordBatch(keyType, 
rowType, data)),
+                null,
+                oldClientVersion,
+                putFuture::complete);
+        PutKvResultForBucket putResult = putFuture.get().get(0);
+        assertThat(putResult.failed()).isTrue();
+        
assertThat(putResult.getErrorCode()).isEqualTo(Errors.UNSUPPORTED_VERSION.code());
+        assertThat(putResult.getErrorMessage()).contains("Client API version 0 
is not supported");
+
+        // Test lookups with old client version (version 0) - should fail
+        CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(rowType, new 
int[] {0, 1});
+        byte[] keyBytes = keyEncoder.encodeKey(row(1, "a"));
+        CompletableFuture<Map<TableBucket, LookupResultForBucket>> 
lookupFuture =
+                new CompletableFuture<>();
+        replicaManager.lookups(
+                Collections.singletonMap(tb, 
Collections.singletonList(keyBytes)),
+                oldClientVersion,
+                lookupFuture::complete);
+        LookupResultForBucket lookupResult = lookupFuture.get().get(tb);
+        assertThat(lookupResult.failed()).isTrue();
+        
assertThat(putResult.getErrorCode()).isEqualTo(Errors.UNSUPPORTED_VERSION.code());
+        assertThat(putResult.getErrorMessage()).contains("Client API version 0 
is not supported");

Review Comment:
   The assertions are checking the wrong variable. Lines 1976-1977 check 
`putResult` instead of `lookupResult`. This means the lookup operation's error 
validation is not being tested correctly. The assertions should check 
`lookupResult.getErrorCode()` and `lookupResult.getErrorMessage()` instead of 
`putResult`.
   ```suggestion
           
assertThat(lookupResult.getErrorCode()).isEqualTo(Errors.UNSUPPORTED_VERSION.code());
           assertThat(lookupResult.getErrorMessage())
                   .contains("Client API version 0 is not supported");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to