This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new da01e2ed6 [hotfix] use ltz_millis  as paimon system column for 
timestamp datatype (#2320)
da01e2ed6 is described below

commit da01e2ed61447aa25d7bc002e54dbbb309d90607
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Jan 8 09:21:42 2026 +0800

    [hotfix] use ltz_millis  as paimon system column for timestamp datatype 
(#2320)
---
 .../fluss/lake/paimon/PaimonLakeCatalog.java       |  2 +-
 .../lake/paimon/utils/PaimonTableValidation.java   | 46 ++++++++++++++
 .../lake/paimon/LakeEnabledTableCreateITCase.java  | 72 +++++++++++++++++-----
 .../lake/paimon/tiering/PaimonTieringTest.java     |  9 ++-
 4 files changed, 109 insertions(+), 20 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index f8cb46ce4..500546e64 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -68,7 +68,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
         // for consistent behavior
         SYSTEM_COLUMNS.put(BUCKET_COLUMN_NAME, DataTypes.INT());
         SYSTEM_COLUMNS.put(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
-        SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+        SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, 
DataTypes.TIMESTAMP_LTZ_MILLIS());
     }
 
     private final Catalog paimonCatalog;
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
index 0a050f362..94580df17 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
@@ -25,12 +25,18 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 
 /** Utils to verify whether the existing Paimon table is compatible with the 
table to be created. */
 public class PaimonTableValidation {
@@ -61,6 +67,13 @@ public class PaimonTableValidation {
         existingOptions.entrySet().removeIf(entry -> 
!newOptions.containsKey(entry.getKey()));
 
         if (!existingSchema.equals(newSchema)) {
+            // Allow different precisions for __timestamp column for backward 
compatibility,
+            // old cluster will use precision 6, but new cluster will use 
precision 3,
+            // we allow such precision difference
+            if (equalIgnoreSystemColumnTimestampPrecision(existingSchema, 
newSchema)) {
+                return;
+            }
+
             throw new TableAlreadyExistException(
                     String.format(
                             "The table %s already exists in Paimon catalog, 
but the table schema is not compatible. "
@@ -70,6 +83,39 @@ public class PaimonTableValidation {
         }
     }
 
+    /**
+     * Check if the {@code existingSchema} is compatible with {@code 
newSchema} by ignoring the
+     * precision difference of the system column {@code __timestamp}.
+     *
+     * <p>This is crucial for backward compatibility during cluster upgrades 
or configuration
+     * changes (e.g., transitioning from precision 6 to 3). Without this 
relaxed check, users would
+     * be unable to re-enable lake synchronization for existing tables if the 
cluster-wide default
+     * timestamp precision has evolved.
+     *
+     * @param existingSchema the schema currently persisted in the Paimon 
catalog
+     * @param newSchema the new schema descriptor generated by the current 
Fluss cluster
+     * @return true if the schemas are identical, disregarding the precision 
of the system timestamp
+     */
+    private static boolean equalIgnoreSystemColumnTimestampPrecision(
+            Schema existingSchema, Schema newSchema) {
+        List<DataField> existingFields = new 
ArrayList<>(existingSchema.fields());
+        DataField systemTimestampField = 
existingFields.get(existingFields.size() - 1);
+        if (systemTimestampField.name().equals(TIMESTAMP_COLUMN_NAME)
+                && systemTimestampField
+                        .type()
+                        
.equalsIgnoreFieldId(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())) {
+            existingFields.set(
+                    existingFields.size() - 1,
+                    new DataField(
+                            systemTimestampField.id(),
+                            systemTimestampField.name(),
+                            DataTypes.TIMESTAMP_LTZ_MILLIS(),
+                            systemTimestampField.description()));
+        }
+        existingSchema = 
existingSchema.copy(RowType.of(existingFields.toArray(new DataField[0])));
+        return existingSchema.equals(newSchema);
+    }
+
     private static void removeChangeableOptions(Map<String, String> options) {
         options.entrySet()
                 .removeIf(
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 9491918b4..6fb700322 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -168,7 +168,7 @@ class LakeEnabledTableCreateITCase {
                             // for __bucket, __offset, __timestamp
                             org.apache.paimon.types.DataTypes.INT(),
                             org.apache.paimon.types.DataTypes.BIGINT(),
-                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
                         },
                         new String[] {
                             "log_c1",
@@ -206,7 +206,7 @@ class LakeEnabledTableCreateITCase {
                             // for __bucket, __offset, __timestamp
                             org.apache.paimon.types.DataTypes.INT(),
                             org.apache.paimon.types.DataTypes.BIGINT(),
-                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
                         },
                         new String[] {
                             "log_c1",
@@ -245,7 +245,7 @@ class LakeEnabledTableCreateITCase {
                             // for __bucket, __offset, __timestamp
                             org.apache.paimon.types.DataTypes.INT(),
                             org.apache.paimon.types.DataTypes.BIGINT(),
-                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
                         },
                         new String[] {
                             "pk_c1",
@@ -288,7 +288,7 @@ class LakeEnabledTableCreateITCase {
                             // for __bucket, __offset, __timestamp
                             org.apache.paimon.types.DataTypes.INT(),
                             org.apache.paimon.types.DataTypes.BIGINT(),
-                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
                         },
                         new String[] {
                             "c1",
@@ -355,7 +355,7 @@ class LakeEnabledTableCreateITCase {
                             // for __bucket, __offset, __timestamp
                             org.apache.paimon.types.DataTypes.INT(),
                             org.apache.paimon.types.DataTypes.BIGINT(),
-                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
                         },
                         new String[] {
                             "log_c1",
@@ -456,8 +456,8 @@ class LakeEnabledTableCreateITCase {
                 .isInstanceOf(LakeTableAlreadyExistException.class)
                 .hasMessage(
                         "The table `fluss`.`log_table_with_exist_lake_table` 
already exists in Paimon catalog, but the table schema is not compatible. "
-                                + "Existing schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
file.format=parquet, fluss.k1=v1}, comment=null}, "
-                                + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+                                + "Existing schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
file.format=parquet, fluss.k1=v1}, comment=null}, "
+                                + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
                                 + "Please first drop the table in Paimon 
catalog or use a new table name.");
 
         // create log table with different fields will throw exception
@@ -475,8 +475,8 @@ class LakeEnabledTableCreateITCase {
                 .isInstanceOf(LakeTableAlreadyExistException.class)
                 .hasMessage(
                         "The table `fluss`.`log_table_with_exist_lake_table` 
already exists in Paimon catalog, but the table schema is not compatible. "
-                                + "Existing schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
file.format=parquet, fluss.k1=v1}, comment=null}, "
-                                + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` 
TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], 
options={bucket=-1, fluss.table.replication.factor=1, 
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, 
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+                                + "Existing schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
file.format=parquet, fluss.k1=v1}, comment=null}, "
+                                + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` 
TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], 
options={bucket=-1, fluss.table.replication.factor=1, 
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, 
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
                                 + "Please first drop the table in Paimon 
catalog or use a new table name.");
 
         // add an insignificant option to Paimon table will be ok
@@ -615,7 +615,7 @@ class LakeEnabledTableCreateITCase {
                             // for __bucket, __offset, __timestamp
                             org.apache.paimon.types.DataTypes.INT(),
                             org.apache.paimon.types.DataTypes.BIGINT(),
-                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
                         },
                         new String[] {
                             "log_c1",
@@ -711,7 +711,7 @@ class LakeEnabledTableCreateITCase {
                             // for __bucket, __offset, __timestamp
                             org.apache.paimon.types.DataTypes.INT(),
                             org.apache.paimon.types.DataTypes.BIGINT(),
-                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
                         },
                         new String[] {
                             "c1",
@@ -742,7 +742,7 @@ class LakeEnabledTableCreateITCase {
                             // for __bucket, __offset, __timestamp
                             org.apache.paimon.types.DataTypes.INT(),
                             org.apache.paimon.types.DataTypes.BIGINT(),
-                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
                         },
                         new String[] {
                             "c1",
@@ -832,7 +832,7 @@ class LakeEnabledTableCreateITCase {
                             // for __bucket, __offset, __timestamp
                             org.apache.paimon.types.DataTypes.INT(),
                             org.apache.paimon.types.DataTypes.BIGINT(),
-                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
                         },
                         new String[] {
                             "c1",
@@ -885,7 +885,7 @@ class LakeEnabledTableCreateITCase {
                             // for __bucket, __offset, __timestamp
                             org.apache.paimon.types.DataTypes.INT(),
                             org.apache.paimon.types.DataTypes.BIGINT(),
-                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
                         },
                         new String[] {
                             "c1",
@@ -927,6 +927,50 @@ class LakeEnabledTableCreateITCase {
         assertThat(alteredRowType.getField("c3").description()).isEqualTo("c3 
comment");
     }
 
+    @Test
+    void testEnableLakeTableWithLegacySystemTimestampColumn() throws Exception 
{
+        TablePath tablePath = TablePath.of(DATABASE, 
"timestamp_precision_compat");
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(Schema.newBuilder().column("c1", 
DataTypes.INT()).build())
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .build();
+
+        admin.createTable(tablePath, tableDescriptor, false).get();
+
+        Identifier paimonIdentifier = Identifier.create(DATABASE, 
tablePath.getTableName());
+
+        // alter to TIMESTAMP_WITH_LOCAL_TIME_ZONE to mock the legacy behavior
+        paimonCatalog.alterTable(
+                paimonIdentifier,
+                SchemaChange.updateColumnType(
+                        TIMESTAMP_COLUMN_NAME,
+                        
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
+                false);
+
+        // disable data lake
+        admin.alterTable(
+                        tablePath,
+                        Collections.singletonList(
+                                TableChange.set(
+                                        
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false")),
+                        false)
+                .get();
+        
assertThat(admin.getTableInfo(tablePath).get().getTableConfig().isDataLakeEnabled())
+                .isFalse();
+
+        // enable data lake again, should still enable it
+        admin.alterTable(
+                        tablePath,
+                        Collections.singletonList(
+                                TableChange.set(
+                                        
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")),
+                        false)
+                .get();
+        
assertThat(admin.getTableInfo(tablePath).get().getTableConfig().isDataLakeEnabled())
+                .isTrue();
+    }
+
     private void verifyPaimonTable(
             Table paimonTable,
             TableDescriptor flussTable,
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index cadd2a46a..56b3f533d 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -438,7 +438,7 @@ class PaimonTieringTest {
             // check system columns: __bucket, __offset, __timestamp
             assertThat(actualRow.getInt(4)).isEqualTo(expectBucket);
             
assertThat(actualRow.getLong(5)).isEqualTo(expectRecord.logOffset());
-            assertThat(actualRow.getTimestamp(6, 6).getMillisecond())
+            assertThat(actualRow.getTimestamp(6, 3).getMillisecond())
                     .isEqualTo(expectRecord.timestamp());
         }
         assertThat(actualRecords.hasNext()).isFalse();
@@ -468,7 +468,7 @@ class PaimonTieringTest {
             // check system columns: __bucket, __offset, __timestamp
             assertThat(actualRow.getInt(5)).isEqualTo(expectBucket);
             
assertThat(actualRow.getLong(6)).isEqualTo(expectRecord.logOffset());
-            assertThat(actualRow.getTimestamp(7, 6).getMillisecond())
+            assertThat(actualRow.getTimestamp(7, 3).getMillisecond())
                     .isEqualTo(expectRecord.timestamp());
         }
         assertThat(actualRecords.hasNext()).isFalse();
@@ -496,7 +496,7 @@ class PaimonTieringTest {
             // check system columns: __bucket, __offset, __timestamp
             assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
             
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
-            assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
+            assertThat(actualRow.getTimestamp(5, 3).getMillisecond())
                     .isEqualTo(expectRecord.timestamp());
         }
         assertThat(actualRecords.hasNext()).isFalse();
@@ -814,8 +814,7 @@ class PaimonTieringTest {
             throws Exception {
         paimonSchemaBuilder.column(BUCKET_COLUMN_NAME, DataTypes.INT());
         paimonSchemaBuilder.column(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
-        paimonSchemaBuilder.column(
-                TIMESTAMP_COLUMN_NAME, 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+        paimonSchemaBuilder.column(TIMESTAMP_COLUMN_NAME, 
DataTypes.TIMESTAMP_LTZ_MILLIS());
         paimonSchemaBuilder.option(
                 CoreOptions.COMMIT_CALLBACKS.key(),
                 PaimonLakeCommitter.PaimonCommitCallback.class.getName());

Reply via email to