Copilot commented on code in PR #2464:
URL: https://github.com/apache/fluss/pull/2464#discussion_r2723861094
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussLakeTableITCase.java:
##########
@@ -262,11 +268,12 @@ private Map<TableBucket, List<InternalRow>>
writeRowsAndVerifyBucket(
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
long tableId = tableInfo.getTableId();
DataLakeFormat dataLakeFormat =
tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
- int rowNums = 30;
+ int rowNums = 1;
Review Comment:
Changing the number of test rows from 30 to 1 significantly reduces test
coverage. This change appears unrelated to the lake fix purpose and may reduce
the test's effectiveness at catching bugs. If this change is necessary, it
should be documented or explained. If it's for debugging purposes, it should be
reverted to the original value of 30.
##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -1283,6 +1283,21 @@ public class ConfigOptions {
"The format of the kv records in kv store. The
default value is `compacted`. "
+ "The supported formats are `compacted`
and `indexed`.");
+ /**
+ * The version of the kv format. This is used for backward compatibility
when encoding strategy
+ * changes. Version 0 (absent): Old tables use datalake encoding for both
primary key and bucket
+ * key. Version 1: New tables use Fluss encoding for primary key (to
support prefix lookup) and
+ * datalake encoding for bucket key (to align with datalake bucket).
+ */
+ public static final ConfigOption<Integer> TABLE_KV_FORMAT_VERSION =
+ key("table.kv.format.version")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The version of the kv format for backward
compatibility. "
+ + "When absent (old tables), primary key
encoding may differ from new tables. "
+ + "Version 1 indicates new encoding
strategy for datalake tables.");
Review Comment:
The documentation describes version 1 as the new encoding strategy, but the
code actually uses version 2 (as seen in KeyEncoder.java line 81 and
CoordinatorService.java line 183). The documentation should be updated to
reflect that version 2 is the new encoding strategy, not version 1.
Additionally, the comment in ConfigOptions mentions "Version 0 (absent)" and
"Version 1" but the actual implementation uses version 2.
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -305,10 +309,10 @@ void testPutAndPrefixLookup() throws Exception {
createTable(tablePath, descriptor, false);
Table table = conn.getTable(tablePath);
TableInfo tableInfo = table.getTableInfo();
- verifyPutAndLookup(table, new Object[] {1, "a", 1L, "value1"});
- verifyPutAndLookup(table, new Object[] {1, "a", 2L, "value2"});
- verifyPutAndLookup(table, new Object[] {1, "a", 3L, "value3"});
- verifyPutAndLookup(table, new Object[] {2, "a", 4L, "value4"});
+ verifyPutAndLookup(table, new Object[] {1, "a", 1L, "valuevalue1"});
+ verifyPutAndLookup(table, new Object[] {1, "a", 2L, "valuevalue2"});
+ verifyPutAndLookup(table, new Object[] {1, "a", 3L, "valuevalue3"});
+ verifyPutAndLookup(table, new Object[] {2, "a", 4L, "valuevalue4"});
Review Comment:
The test value strings are being changed from "value1" to "valuevalue1" (and
similar for other values). This appears to be unrelated to the lake fix purpose
of this PR. If these changes are intentional for some reason related to the
fix, they should be documented or explained, as changing test data without
explanation makes it harder to review. If these are accidental changes, they
should be reverted.
##########
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java:
##########
@@ -592,9 +606,14 @@ public void lookups(
responseCallback.accept(lookupResultForBucketMap);
}
- /** Lookup multi prefixKeys by prefix scan on kv store. */
+ /**
+ * Lookup multi prefixKeys by prefix scan on kv store.
+ *
+ * @param apiVersion the client API version for backward compatibility
validation
+ */
public void prefixLookups(
Map<TableBucket, List<byte[]>> entriesPerBucket,
+ int apiVersion,
Review Comment:
The apiVersion parameter type is inconsistent. This method uses `int
apiVersion` while other methods like `lookups()` (line 577) and
`putRecordsToKv()` (line 533) use `short apiVersion`. The API version should be
consistently typed as `short` across all methods since it's retrieved from
Session.getApiVersion() which returns a short.
```suggestion
short apiVersion,
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -497,6 +501,20 @@ private TableDescriptor applySystemDefaults(
newDescriptor = newDescriptor.withProperties(newProperties);
}
}
+
+ if (newDescriptor.hasPrimaryKey()) {
+ Map<String, String> newProperties = new
HashMap<>(newDescriptor.getProperties());
+ String formatVersion =
newProperties.get(ConfigOptions.TABLE_KV_FORMAT_VERSION.key());
+ if (formatVersion != null) {
+ throw new IllegalArgumentException("todo");
Review Comment:
The error message is a placeholder "todo" which should not be in production
code. This exception is thrown when a user manually sets the kv format version,
which should either be forbidden or have a proper error message explaining why
manual setting is not allowed.
```suggestion
throw new IllegalArgumentException(
"Manual configuration of '"
+ ConfigOptions.TABLE_KV_FORMAT_VERSION.key()
+ "' is not supported. The coordinator
chooses the appropriate "
+ "KV format version automatically. Remove
this property from the "
+ "table properties (current value: '"
+ formatVersion
+ "').");
```
##########
fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java:
##########
@@ -1882,6 +1906,92 @@ private void verifyPrefixLookup(
}
}
+ @Test
+ void testOldClientVersionRejectNewFormatTable() throws Exception {
+ // Test that old client version (version 0) is rejected for new format
tables
+ // with non-default bucket key. This is because new tables use
CompactedKeyEncoder
+ // which is incompatible with old clients that expect lake's encoder.
+ TablePath tablePath = TablePath.of("test_db", "new_format_table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .column("c", DataTypes.STRING())
+ .primaryKey("a", "b")
+ .build();
+ RowType rowType = schema.getRowType();
+ RowType keyType =
+ DataTypes.ROW(
+ new DataField("a", DataTypes.INT()),
+ new DataField("b", DataTypes.STRING()));
+
+ // Create table with:
+ // 1. kv format version 2 (new format)
+ // 2. non-default bucket key (a is subset of pk (a, b))
+ // 3. data lake format (paimon)
+ Map<String, String> properties = new HashMap<>();
+ properties.put(
+ ConfigOptions.TABLE_KV_FORMAT_VERSION.key(),
String.valueOf(KV_FORMAT_VERSION_2));
+ properties.put(ConfigOptions.TABLE_DATALAKE_FORMAT.key(),
DataLakeFormat.PAIMON.toString());
+ long tableId =
+ registerTableInZkClient(
+ tablePath,
+ schema,
+ 2998232L,
+ Collections.singletonList("a"), // bucket key is
subset of pk
+ properties);
+ TableBucket tb = new TableBucket(tableId, 0);
+ makeKvTableAsLeader(tableId, tablePath, tb.getBucket());
+
+ List<Tuple2<Object[], Object[]>> data =
+ Collections.singletonList(
+ Tuple2.of(new Object[] {1, "a"}, new Object[] {1, "a",
"value1"}));
+
+ // Test putKvRecords with old client version (version 0) - should fail
+ short oldClientVersion = 0;
+ CompletableFuture<List<PutKvResultForBucket>> putFuture = new
CompletableFuture<>();
+ replicaManager.putRecordsToKv(
+ 20000,
+ 1,
+ Collections.singletonMap(tb, genKvRecordBatch(keyType,
rowType, data)),
+ null,
+ oldClientVersion,
+ putFuture::complete);
+ PutKvResultForBucket putResult = putFuture.get().get(0);
+ assertThat(putResult.failed()).isTrue();
+
assertThat(putResult.getErrorCode()).isEqualTo(Errors.UNSUPPORTED_VERSION.code());
+ assertThat(putResult.getErrorMessage()).contains("Client API version 0
is not supported");
+
+ // Test lookups with old client version (version 0) - should fail
+ CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(rowType, new
int[] {0, 1});
+ byte[] keyBytes = keyEncoder.encodeKey(row(1, "a"));
+ CompletableFuture<Map<TableBucket, LookupResultForBucket>>
lookupFuture =
+ new CompletableFuture<>();
+ replicaManager.lookups(
+ Collections.singletonMap(tb,
Collections.singletonList(keyBytes)),
+ oldClientVersion,
+ lookupFuture::complete);
+ LookupResultForBucket lookupResult = lookupFuture.get().get(tb);
+ assertThat(lookupResult.failed()).isTrue();
+
assertThat(putResult.getErrorCode()).isEqualTo(Errors.UNSUPPORTED_VERSION.code());
+ assertThat(putResult.getErrorMessage()).contains("Client API version 0
is not supported");
Review Comment:
The assertions are checking the wrong variable. Lines 1976-1977 check
`putResult` instead of `lookupResult`. This means the lookup operation's error
validation is not being tested correctly. The assertions should check
`lookupResult.getErrorCode()` and `lookupResult.getErrorMessage()` instead of
`putResult`.
```suggestion
assertThat(lookupResult.getErrorCode()).isEqualTo(Errors.UNSUPPORTED_VERSION.code());
assertThat(lookupResult.getErrorMessage())
.contains("Client API version 0 is not supported");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]