Copilot commented on code in PR #2417:
URL: https://github.com/apache/fluss/pull/2417#discussion_r2708217313
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -469,7 +470,12 @@ private TableDescriptor applySystemDefaults(
// override the datalake format if the table hasn't set it and the
cluster configured
if (dataLakeFormat != null
&&
!properties.containsKey(ConfigOptions.TABLE_DATALAKE_FORMAT.key())) {
- newDescriptor = newDescriptor.withDataLakeFormat(dataLakeFormat);
+ Map<String, String> newProperties = new
HashMap<>(newDescriptor.getProperties());
+ newProperties.put(ConfigOptions.TABLE_DATALAKE_FORMAT.key(),
dataLakeFormat.toString());
+ newProperties.put(
+ ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key(),
+ String.valueOf(CURRENT_LAKE_STORAGE_VERSION));
+ newDescriptor = newDescriptor.withProperties(newProperties);
Review Comment:
When a user explicitly sets TABLE_DATALAKE_FORMAT in table properties, the
storage version is not automatically set because the condition checks if the
format key is NOT in properties. This means explicitly configured datalake
tables will be treated as legacy v1 tables (with system columns) instead of v2
tables. Consider setting the storage version for all datalake-enabled tables,
not just those that inherit the cluster's default format.
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java:
##########
@@ -39,15 +40,25 @@ public class FlussRecordAsPaimonRow extends
FlussRowAsPaimonRow {
private final int offsetFieldIndex;
private final int timestampFieldIndex;
- public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
+ public FlussRecordAsPaimonRow(
Review Comment:
Parameter name has a typo: 'tableTowType' should be 'tableRowType'.
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/PaimonTestUtils.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.testutils;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.TableRegistration;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataTypes;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+
+/** THe utils for paimon testing. */
Review Comment:
There is a typo in the Javadoc comment. "THe" should be "The".
```suggestion
/** The utils for paimon testing. */
```
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java:
##########
@@ -206,9 +222,15 @@ public static Schema toPaimonSchema(TableDescriptor
tableDescriptor) {
column.getComment().orElse(null));
}
- // add system metadata columns to schema
- for (Map.Entry<String, DataType> systemColumn :
SYSTEM_COLUMNS.entrySet()) {
- schemaBuilder.column(systemColumn.getKey(),
systemColumn.getValue());
+ // add system metadata columns to schema only for legacy tables
(storage-version not set)
+ Optional<Integer> storageVersion =
+ Configuration.fromMap(tableDescriptor.getProperties())
+ .getOptional(TABLE_DATALAKE_STORAGE_VERSION);
+ if (!storageVersion.isPresent()) {
+ // Legacy table: add system columns,
Review Comment:
The comment has an unnecessary trailing comma. It should be "Legacy table:
add system columns" without the comma at the end.
```suggestion
// Legacy table: add system columns
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]