This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new da01e2ed6 [hotfix] use ltz_millis as paimon system column for
timestamp datatype (#2320)
da01e2ed6 is described below
commit da01e2ed61447aa25d7bc002e54dbbb309d90607
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Jan 8 09:21:42 2026 +0800
[hotfix] use ltz_millis as paimon system column for timestamp datatype
(#2320)
---
.../fluss/lake/paimon/PaimonLakeCatalog.java | 2 +-
.../lake/paimon/utils/PaimonTableValidation.java | 46 ++++++++++++++
.../lake/paimon/LakeEnabledTableCreateITCase.java | 72 +++++++++++++++++-----
.../lake/paimon/tiering/PaimonTieringTest.java | 9 ++-
4 files changed, 109 insertions(+), 20 deletions(-)
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index f8cb46ce4..500546e64 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -68,7 +68,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
// for consistent behavior
SYSTEM_COLUMNS.put(BUCKET_COLUMN_NAME, DataTypes.INT());
SYSTEM_COLUMNS.put(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
- SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME,
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+ SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME,
DataTypes.TIMESTAMP_LTZ_MILLIS());
}
private final Catalog paimonCatalog;
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
index 0a050f362..94580df17 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
@@ -25,12 +25,18 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
/** Utils to verify whether the existing Paimon table is compatible with the
table to be created. */
public class PaimonTableValidation {
@@ -61,6 +67,13 @@ public class PaimonTableValidation {
existingOptions.entrySet().removeIf(entry ->
!newOptions.containsKey(entry.getKey()));
if (!existingSchema.equals(newSchema)) {
+ // Allow different precisions for __timestamp column for backward
compatibility,
+ // old cluster will use precision 6, but new cluster will use
precision 3,
+ // we allow such precision difference
+ if (equalIgnoreSystemColumnTimestampPrecision(existingSchema,
newSchema)) {
+ return;
+ }
+
throw new TableAlreadyExistException(
String.format(
"The table %s already exists in Paimon catalog,
but the table schema is not compatible. "
@@ -70,6 +83,39 @@ public class PaimonTableValidation {
}
}
+ /**
+ * Check if the {@code existingSchema} is compatible with {@code
newSchema} by ignoring the
+ * precision difference of the system column {@code __timestamp}.
+ *
+ * <p>This is crucial for backward compatibility during cluster upgrades
or configuration
+ * changes (e.g., transitioning from precision 6 to 3). Without this
relaxed check, users would
+ * be unable to re-enable lake synchronization for existing tables if the
cluster-wide default
+ * timestamp precision has evolved.
+ *
+ * @param existingSchema the schema currently persisted in the Paimon
catalog
+ * @param newSchema the new schema descriptor generated by the current
Fluss cluster
+ * @return true if the schemas are identical, disregarding the precision
of the system timestamp
+ */
+ private static boolean equalIgnoreSystemColumnTimestampPrecision(
+ Schema existingSchema, Schema newSchema) {
+ List<DataField> existingFields = new
ArrayList<>(existingSchema.fields());
+ DataField systemTimestampField =
existingFields.get(existingFields.size() - 1);
+ if (systemTimestampField.name().equals(TIMESTAMP_COLUMN_NAME)
+ && systemTimestampField
+ .type()
+
.equalsIgnoreFieldId(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())) {
+ existingFields.set(
+ existingFields.size() - 1,
+ new DataField(
+ systemTimestampField.id(),
+ systemTimestampField.name(),
+ DataTypes.TIMESTAMP_LTZ_MILLIS(),
+ systemTimestampField.description()));
+ }
+ existingSchema =
existingSchema.copy(RowType.of(existingFields.toArray(new DataField[0])));
+ return existingSchema.equals(newSchema);
+ }
+
private static void removeChangeableOptions(Map<String, String> options) {
options.entrySet()
.removeIf(
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 9491918b4..6fb700322 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -168,7 +168,7 @@ class LakeEnabledTableCreateITCase {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
@@ -206,7 +206,7 @@ class LakeEnabledTableCreateITCase {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
@@ -245,7 +245,7 @@ class LakeEnabledTableCreateITCase {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"pk_c1",
@@ -288,7 +288,7 @@ class LakeEnabledTableCreateITCase {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
@@ -355,7 +355,7 @@ class LakeEnabledTableCreateITCase {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
@@ -456,8 +456,8 @@ class LakeEnabledTableCreateITCase {
.isInstanceOf(LakeTableAlreadyExistException.class)
.hasMessage(
"The table `fluss`.`log_table_with_exist_lake_table`
already exists in Paimon catalog, but the table schema is not compatible. "
- + "Existing schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
file.format=parquet, fluss.k1=v1}, comment=null}, "
- + "new schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ + "Existing schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
file.format=parquet, fluss.k1=v1}, comment=null}, "
+ + "new schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ "Please first drop the table in Paimon
catalog or use a new table name.");
// create log table with different fields will throw exception
@@ -475,8 +475,8 @@ class LakeEnabledTableCreateITCase {
.isInstanceOf(LakeTableAlreadyExistException.class)
.hasMessage(
"The table `fluss`.`log_table_with_exist_lake_table`
already exists in Paimon catalog, but the table schema is not compatible. "
- + "Existing schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
file.format=parquet, fluss.k1=v1}, comment=null}, "
- + "new schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp`
TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[],
options={bucket=-1, fluss.table.replication.factor=1,
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon,
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ + "Existing schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
file.format=parquet, fluss.k1=v1}, comment=null}, "
+ + "new schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp`
TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[],
options={bucket=-1, fluss.table.replication.factor=1,
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon,
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ "Please first drop the table in Paimon
catalog or use a new table name.");
// add an insignificant option to Paimon table will be ok
@@ -615,7 +615,7 @@ class LakeEnabledTableCreateITCase {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
@@ -711,7 +711,7 @@ class LakeEnabledTableCreateITCase {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
@@ -742,7 +742,7 @@ class LakeEnabledTableCreateITCase {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
@@ -832,7 +832,7 @@ class LakeEnabledTableCreateITCase {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
@@ -885,7 +885,7 @@ class LakeEnabledTableCreateITCase {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
@@ -927,6 +927,50 @@ class LakeEnabledTableCreateITCase {
assertThat(alteredRowType.getField("c3").description()).isEqualTo("c3
comment");
}
+ @Test
+ void testEnableLakeTableWithLegacySystemTimestampColumn() throws Exception
{
+ TablePath tablePath = TablePath.of(DATABASE,
"timestamp_precision_compat");
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(Schema.newBuilder().column("c1",
DataTypes.INT()).build())
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .build();
+
+ admin.createTable(tablePath, tableDescriptor, false).get();
+
+ Identifier paimonIdentifier = Identifier.create(DATABASE,
tablePath.getTableName());
+
+ // alter to TIMESTAMP_WITH_LOCAL_TIME_ZONE to mock the legacy behavior
+ paimonCatalog.alterTable(
+ paimonIdentifier,
+ SchemaChange.updateColumnType(
+ TIMESTAMP_COLUMN_NAME,
+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
+ false);
+
+ // disable data lake
+ admin.alterTable(
+ tablePath,
+ Collections.singletonList(
+ TableChange.set(
+
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false")),
+ false)
+ .get();
+
assertThat(admin.getTableInfo(tablePath).get().getTableConfig().isDataLakeEnabled())
+ .isFalse();
+
+ // enable data lake again, should still enable it
+ admin.alterTable(
+ tablePath,
+ Collections.singletonList(
+ TableChange.set(
+
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")),
+ false)
+ .get();
+
assertThat(admin.getTableInfo(tablePath).get().getTableConfig().isDataLakeEnabled())
+ .isTrue();
+ }
+
private void verifyPaimonTable(
Table paimonTable,
TableDescriptor flussTable,
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index cadd2a46a..56b3f533d 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -438,7 +438,7 @@ class PaimonTieringTest {
// check system columns: __bucket, __offset, __timestamp
assertThat(actualRow.getInt(4)).isEqualTo(expectBucket);
assertThat(actualRow.getLong(5)).isEqualTo(expectRecord.logOffset());
- assertThat(actualRow.getTimestamp(6, 6).getMillisecond())
+ assertThat(actualRow.getTimestamp(6, 3).getMillisecond())
.isEqualTo(expectRecord.timestamp());
}
assertThat(actualRecords.hasNext()).isFalse();
@@ -468,7 +468,7 @@ class PaimonTieringTest {
// check system columns: __bucket, __offset, __timestamp
assertThat(actualRow.getInt(5)).isEqualTo(expectBucket);
assertThat(actualRow.getLong(6)).isEqualTo(expectRecord.logOffset());
- assertThat(actualRow.getTimestamp(7, 6).getMillisecond())
+ assertThat(actualRow.getTimestamp(7, 3).getMillisecond())
.isEqualTo(expectRecord.timestamp());
}
assertThat(actualRecords.hasNext()).isFalse();
@@ -496,7 +496,7 @@ class PaimonTieringTest {
// check system columns: __bucket, __offset, __timestamp
assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
- assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
+ assertThat(actualRow.getTimestamp(5, 3).getMillisecond())
.isEqualTo(expectRecord.timestamp());
}
assertThat(actualRecords.hasNext()).isFalse();
@@ -814,8 +814,7 @@ class PaimonTieringTest {
throws Exception {
paimonSchemaBuilder.column(BUCKET_COLUMN_NAME, DataTypes.INT());
paimonSchemaBuilder.column(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
- paimonSchemaBuilder.column(
- TIMESTAMP_COLUMN_NAME,
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+ paimonSchemaBuilder.column(TIMESTAMP_COLUMN_NAME,
DataTypes.TIMESTAMP_LTZ_MILLIS());
paimonSchemaBuilder.option(
CoreOptions.COMMIT_CALLBACKS.key(),
PaimonLakeCommitter.PaimonCommitCallback.class.getName());