This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c72c2bdc48 Flink: Nanosecond gaps in SortKeySerializer and
ColumnStatsWatermarkExtractor for 1.20 (#16323)
c72c2bdc48 is described below
commit c72c2bdc48e308c8f118f55f9b5c9e053f886894
Author: Talat UYARER <[email protected]>
AuthorDate: Wed May 13 12:20:48 2026 -0700
Flink: Nanosecond gaps in SortKeySerializer and
ColumnStatsWatermarkExtractor for 1.20 (#16323)
---
.../flink/sink/shuffle/SortKeySerializer.java | 2 +
.../reader/ColumnStatsWatermarkExtractor.java | 19 +++++++--
.../flink/sink/TestRowDataPartitionKey.java | 49 +++++++++++++++++++++-
.../shuffle/TestSortKeySerializerPrimitives.java | 3 ++
.../reader/TestColumnStatsWatermarkExtractor.java | 19 ++++++++-
5 files changed, 86 insertions(+), 6 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java
index 6f5bb67227..ec099fbfe8 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java
@@ -159,6 +159,7 @@ class SortKeySerializer extends TypeSerializer<SortKey> {
case LONG:
case TIME:
case TIMESTAMP:
+ case TIMESTAMP_NANO:
target.writeLong(record.get(i, Long.class));
break;
case FLOAT:
@@ -237,6 +238,7 @@ class SortKeySerializer extends TypeSerializer<SortKey> {
case LONG:
case TIME:
case TIMESTAMP:
+ case TIMESTAMP_NANO:
reuse.set(i, source.readLong());
break;
case FLOAT:
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
index 4bb6f0a98c..34de689c6c 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
@@ -53,13 +53,24 @@ public class ColumnStatsWatermarkExtractor implements
SplitWatermarkExtractor, S
Types.NestedField field = schema.findField(eventTimeFieldName);
TypeID typeID = field.type().typeId();
Preconditions.checkArgument(
- typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
- "Found %s, expected a LONG or TIMESTAMP column for watermark
generation.",
+ typeID.equals(TypeID.LONG)
+ || typeID.equals(TypeID.TIMESTAMP)
+ || typeID.equals(TypeID.TIMESTAMP_NANO),
+ "Found %s, expected a LONG, TIMESTAMP, or TIMESTAMP_NANO column for
watermark generation.",
typeID);
this.eventTimeFieldId = field.fieldId();
this.eventTimeFieldName = eventTimeFieldName;
- // Use the timeUnit only for Long columns.
- this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit :
TimeUnit.MICROSECONDS;
+ // Use the timeUnit only for Long columns; timestamp columns store
fixed-precision longs.
+ switch (typeID) {
+ case LONG:
+ this.timeUnit = timeUnit;
+ break;
+ case TIMESTAMP_NANO:
+ this.timeUnit = TimeUnit.NANOSECONDS;
+ break;
+ default:
+ this.timeUnit = TimeUnit.MICROSECONDS;
+ }
}
@VisibleForTesting
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
index 919fef579a..2e7e6db176 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
@@ -57,7 +57,11 @@ public class TestRowDataPartitionKey {
Types.NestedField.required(12, "decimalType2",
Types.DecimalType.of(10, 5)),
Types.NestedField.required(13, "decimalType3",
Types.DecimalType.of(38, 19)),
Types.NestedField.required(14, "floatType", Types.FloatType.get()),
- Types.NestedField.required(15, "doubleType",
Types.DoubleType.get()));
+ Types.NestedField.required(15, "doubleType", Types.DoubleType.get()),
+ Types.NestedField.required(
+ 16, "timestampNanoWithoutZone",
Types.TimestampNanoType.withoutZone()),
+ Types.NestedField.required(
+ 17, "timestampNanoWithZone",
Types.TimestampNanoType.withZone()));
private static final List<String> SUPPORTED_PRIMITIVES =
SCHEMA.asStruct().fields().stream().map(Types.NestedField::name).collect(Collectors.toList());
@@ -248,4 +252,47 @@ public class TestRowDataPartitionKey {
}
}
}
+
+ @Test
+ public void testTimestampNanoPartitionTransforms() {
+ RowType rowType = FlinkSchemaUtil.convert(SCHEMA);
+ RowDataWrapper rowWrapper = new RowDataWrapper(rowType, SCHEMA.asStruct());
+ InternalRecordWrapper recordWrapper = new
InternalRecordWrapper(SCHEMA.asStruct());
+
+ List<Record> records = RandomGenericData.generate(SCHEMA, 10, 1995);
+ List<RowData> rows = Lists.newArrayList(RandomRowData.convert(SCHEMA,
records));
+
+ String[] columns = {"timestampNanoWithoutZone", "timestampNanoWithZone"};
+ for (String column : columns) {
+ List<PartitionSpec> specs =
+ Lists.newArrayList(
+ PartitionSpec.builderFor(SCHEMA).identity(column).build(),
+ PartitionSpec.builderFor(SCHEMA).year(column).build(),
+ PartitionSpec.builderFor(SCHEMA).month(column).build(),
+ PartitionSpec.builderFor(SCHEMA).day(column).build(),
+ PartitionSpec.builderFor(SCHEMA).hour(column).build(),
+ PartitionSpec.builderFor(SCHEMA).bucket(column, 16).build());
+
+ for (PartitionSpec spec : specs) {
+ Class<?>[] javaClasses = spec.javaClasses();
+ PartitionKey pk = new PartitionKey(spec, SCHEMA);
+ PartitionKey expectedPK = new PartitionKey(spec, SCHEMA);
+
+ for (int j = 0; j < rows.size(); j++) {
+ pk.partition(rowWrapper.wrap(rows.get(j)));
+ expectedPK.partition(recordWrapper.wrap(records.get(j)));
+
+ assertThat(pk.size()).isEqualTo(1);
+ assertThat(pk.get(0, javaClasses[0]))
+ .as(
+ "Partition with column "
+ + column
+ + " and spec "
+ + spec
+ + " should match Iceberg-side computation")
+ .isEqualTo(expectedPK.get(0, javaClasses[0]));
+ }
+ }
+ }
+ }
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java
index ac2e2784e6..44791094de 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java
@@ -54,6 +54,9 @@ public class TestSortKeySerializerPrimitives extends
TestSortKeySerializerBase {
.sortBy(Expressions.bucket("uuid_field", 16), SortDirection.ASC,
NullOrder.NULLS_FIRST)
.sortBy(Expressions.hour("ts_with_zone_field"), SortDirection.ASC,
NullOrder.NULLS_FIRST)
.sortBy(Expressions.day("ts_without_zone_field"), SortDirection.ASC,
NullOrder.NULLS_FIRST)
+ .asc("ts_ns_with_zone_field")
+ .sortBy(
+ Expressions.hour("ts_ns_without_zone_field"), SortDirection.ASC,
NullOrder.NULLS_FIRST)
// can not test HeapByteBuffer due to equality test inside
SerializerTestBase
// .sortBy(Expressions.truncate("binary_field", 2), SortDirection.ASC,
// NullOrder.NULLS_FIRST)
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
index 761e562227..191dfe088c 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
@@ -61,6 +61,13 @@ public class TestColumnStatsWatermarkExtractor {
required(3, "long_column", Types.LongType.get()),
required(4, "string_column", Types.StringType.get()));
+ // Separate schema for nanosecond columns: TIMESTAMP_NANO requires table
format v3, which the
+ // HadoopTableExtension above does not provision. Tested via constructor
preconditions only.
+ private static final Schema NANO_SCHEMA =
+ new Schema(
+ required(1, "timestamp_ns_column",
Types.TimestampNanoType.withoutZone()),
+ required(2, "timestamptz_ns_column",
Types.TimestampNanoType.withZone()));
+
private static final List<List<Record>> TEST_RECORDS =
ImmutableList.of(
RandomGenericData.generate(SCHEMA, 3, 2L),
RandomGenericData.generate(SCHEMA, 3, 19L));
@@ -147,7 +154,17 @@ public class TestColumnStatsWatermarkExtractor {
assertThatThrownBy(() -> new ColumnStatsWatermarkExtractor(SCHEMA,
columnName, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
- "Found STRING, expected a LONG or TIMESTAMP column for watermark
generation.");
+ "Found STRING, expected a LONG, TIMESTAMP, or TIMESTAMP_NANO
column for watermark generation.");
+ }
+
+ @TestTemplate
+ public void testTimestampNanoAccepted() {
+ // Run the precondition check exactly once across the parameterized matrix.
+ assumeThat(columnName).isEqualTo("timestamp_column");
+
+ // Both flavours of TIMESTAMP_NANO must be accepted by the extractor's
precondition check.
+ new ColumnStatsWatermarkExtractor(NANO_SCHEMA, "timestamp_ns_column",
null);
+ new ColumnStatsWatermarkExtractor(NANO_SCHEMA, "timestamptz_ns_column",
null);
}
@TestTemplate