This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 76972ef77b Flink: Dynamic Iceberg Sink: Add HashKeyGenerator / RowDataEvolver / TableUpdateOperator (#13277) 76972ef77b is described below commit 76972ef77b96f864712bdde2749a170f9494b001 Author: Maximilian Michels <m...@apache.org> AuthorDate: Wed Jun 11 17:52:56 2025 +0200 Flink: Dynamic Iceberg Sink: Add HashKeyGenerator / RowDataEvolver / TableUpdateOperator (#13277) --- .../iceberg/flink/sink/BaseDeltaTaskWriter.java | 4 +- .../flink/sink/EqualityFieldKeySelector.java | 12 +- .../org/apache/iceberg/flink/sink/FlinkSink.java | 8 +- .../org/apache/iceberg/flink/sink/IcebergSink.java | 9 +- .../iceberg/flink/sink/PartitionKeySelector.java | 6 +- .../iceberg/flink/sink/PartitionedDeltaWriter.java | 4 +- .../flink/sink/RowDataTaskWriterFactory.java | 17 +- .../org/apache/iceberg/flink/sink/SinkUtil.java | 7 +- .../flink/sink/UnpartitionedDeltaWriter.java | 4 +- .../iceberg/flink/sink/dynamic/DynamicRecord.java | 8 +- .../flink/sink/dynamic/DynamicRecordInternal.java | 10 +- .../dynamic/DynamicRecordInternalSerializer.java | 16 +- .../flink/sink/dynamic/DynamicSinkUtil.java | 65 ++++ .../sink/dynamic/DynamicTableUpdateOperator.java | 78 +++++ .../iceberg/flink/sink/dynamic/DynamicWriter.java | 10 +- .../flink/sink/dynamic/HashKeyGenerator.java | 379 +++++++++++++++++++++ .../iceberg/flink/sink/dynamic/RowDataEvolver.java | 190 +++++++++++ .../iceberg/flink/sink/dynamic/WriteTarget.java | 16 +- .../iceberg/flink/sink/TestDeltaTaskWriter.java | 18 +- .../DynamicRecordInternalSerializerTestBase.java | 2 +- .../dynamic/TestDynamicCommittableSerializer.java | 6 +- .../flink/sink/dynamic/TestDynamicCommitter.java | 34 +- .../dynamic/TestDynamicTableUpdateOperator.java | 112 ++++++ .../dynamic/TestDynamicWriteResultAggregator.java | 7 +- .../dynamic/TestDynamicWriteResultSerializer.java | 6 +- .../flink/sink/dynamic/TestDynamicWriter.java | 4 +- .../flink/sink/dynamic/TestHashKeyGenerator.java | 354 +++++++++++++++++++ .../flink/sink/dynamic/TestRowDataEvolver.java | 256 ++++++++++++++ .../flink/source/TestProjectMetaColumn.java | 11 +- 29 files changed, 1547 insertions(+), 106 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index e8a46c5bec..d845046cd2 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -19,7 +19,7 @@ package org.apache.iceberg.flink.sink; import java.io.IOException; -import java.util.List; +import java.util.Set; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -56,7 +56,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> { long targetFileSize, Schema schema, RowType flinkSchema, - List<Integer> equalityFieldIds, + Set<Integer> equalityFieldIds, boolean upsert) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); this.schema = schema; diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java index 18b269d6c3..92e47792c1 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java @@ -18,13 +18,13 @@ */ package org.apache.iceberg.flink.sink; -import java.util.List; +import java.util.Set; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.Schema; import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.StructLikeWrapper; import org.apache.iceberg.util.StructProjection; @@ -33,7 +33,8 @@ import org.apache.iceberg.util.StructProjection; * Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record * will be emitted to same writer in order. */ -class EqualityFieldKeySelector implements KeySelector<RowData, Integer> { +@Internal +public class EqualityFieldKeySelector implements KeySelector<RowData, Integer> { private final Schema schema; private final RowType flinkSchema; @@ -43,10 +44,11 @@ class EqualityFieldKeySelector implements KeySelector<RowData, Integer> { private transient StructProjection structProjection; private transient StructLikeWrapper structLikeWrapper; - EqualityFieldKeySelector(Schema schema, RowType flinkSchema, List<Integer> equalityFieldIds) { + public EqualityFieldKeySelector( + Schema schema, RowType flinkSchema, Set<Integer> equalityFieldIds) { this.schema = schema; this.flinkSchema = flinkSchema; - this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); + this.deleteSchema = TypeUtil.select(schema, equalityFieldIds); } /** diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 3f2b265f7b..c42e4a015b 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -419,7 +419,7 @@ public class FlinkSink { flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig); // Find out the equality field id list based on the user-provided equality field column names. - List<Integer> equalityFieldIds = + Set<Integer> equalityFieldIds = SinkUtil.checkAndGetEqualityFieldIds(table, equalityFieldColumns); RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema); @@ -524,7 +524,7 @@ public class FlinkSink { private SingleOutputStreamOperator<FlinkWriteResult> appendWriter( DataStream<RowData> input, RowType flinkRowType, - List<Integer> equalityFieldIds, + Set<Integer> equalityFieldIds, int writerParallelism) { // Validate the equality fields and partition fields if we enable the upsert mode. if (flinkWriteConf.upsertMode()) { @@ -575,7 +575,7 @@ public class FlinkSink { private DataStream<RowData> distributeDataStream( DataStream<RowData> input, - List<Integer> equalityFieldIds, + Set<Integer> equalityFieldIds, RowType flinkRowType, int writerParallelism) { DistributionMode writeMode = flinkWriteConf.distributionMode(); @@ -711,7 +711,7 @@ public class FlinkSink { SerializableSupplier<Table> tableSupplier, FlinkWriteConf flinkWriteConf, RowType flinkRowType, - List<Integer> equalityFieldIds) { + Set<Integer> equalityFieldIds) { Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier shouldn't be null"); Table initTable = tableSupplier.get(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 52db4d4340..a2aec53a74 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -31,6 +31,7 @@ import java.io.UncheckedIOException; import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.function.Function; import org.apache.flink.annotation.Experimental; @@ -152,7 +153,7 @@ public class IcebergSink private final RowType flinkRowType; private final SerializableSupplier<Table> tableSupplier; private final transient FlinkWriteConf flinkWriteConf; - private final List<Integer> equalityFieldIds; + private final Set<Integer> equalityFieldIds; private final boolean upsertMode; private final FileFormat dataFileFormat; private final long targetDataFileSize; @@ -163,7 +164,7 @@ public class IcebergSink private final transient FlinkMaintenanceConfig flinkMaintenanceConfig; private final Table table; - private final List<String> equalityFieldColumns = null; + private final Set<String> equalityFieldColumns = null; private IcebergSink( TableLoader tableLoader, @@ -174,7 +175,7 @@ public class IcebergSink RowType flinkRowType, SerializableSupplier<Table> tableSupplier, FlinkWriteConf flinkWriteConf, - List<Integer> equalityFieldIds, + Set<Integer> equalityFieldIds, String branch, boolean overwriteMode, FlinkMaintenanceConfig flinkMaintenanceConfig) { @@ -617,7 +618,7 @@ public class IcebergSink boolean overwriteMode = flinkWriteConf.overwriteMode(); // Validate the equality fields and partition fields if we enable the upsert mode. - List<Integer> equalityFieldIds = + Set<Integer> equalityFieldIds = SinkUtil.checkAndGetEqualityFieldIds(table, equalityFieldColumns); if (flinkWriteConf.upsertMode()) { diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java index df951684b4..17c8233e1f 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; @@ -31,7 +32,8 @@ import org.apache.iceberg.flink.RowDataWrapper; * wrote by only one task. That will reduce lots of small files in partitioned fanout write policy * for {@link FlinkSink}. */ -class PartitionKeySelector implements KeySelector<RowData, String> { +@Internal +public class PartitionKeySelector implements KeySelector<RowData, String> { private final Schema schema; private final PartitionKey partitionKey; @@ -39,7 +41,7 @@ class PartitionKeySelector implements KeySelector<RowData, String> { private transient RowDataWrapper rowDataWrapper; - PartitionKeySelector(PartitionSpec spec, Schema schema, RowType flinkSchema) { + public PartitionKeySelector(PartitionSpec spec, Schema schema, RowType flinkSchema) { this.schema = schema; this.partitionKey = new PartitionKey(spec, schema); this.flinkSchema = flinkSchema; diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 38062dd1a2..3eb4dba802 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -20,8 +20,8 @@ package org.apache.iceberg.flink.sink; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -49,7 +49,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { long targetFileSize, Schema schema, RowType flinkSchema, - List<Integer> equalityFieldIds, + Set<Integer> equalityFieldIds, boolean upsert) { super( spec, diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 8dc8d38869..7c11b20c44 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -18,8 +18,9 @@ */ package org.apache.iceberg.flink.sink; -import java.util.List; +import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; @@ -48,7 +49,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> { private final PartitionSpec spec; private final long targetFileSizeBytes; private final FileFormat format; - private final List<Integer> equalityFieldIds; + private final Set<Integer> equalityFieldIds; private final boolean upsert; private final FileAppenderFactory<RowData> appenderFactory; @@ -60,7 +61,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> { long targetFileSizeBytes, FileFormat format, Map<String, String> writeProperties, - List<Integer> equalityFieldIds, + Collection<Integer> equalityFieldIds, boolean upsert) { this( () -> table, @@ -78,7 +79,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> { long targetFileSizeBytes, FileFormat format, Map<String, String> writeProperties, - List<Integer> equalityFieldIds, + Collection<Integer> equalityFieldIds, boolean upsert) { this( tableSupplier, @@ -98,7 +99,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> { long targetFileSizeBytes, FileFormat format, Map<String, String> writeProperties, - List<Integer> equalityFieldIds, + Collection<Integer> equalityFieldIds, boolean upsert, Schema schema, PartitionSpec spec) { @@ -117,7 +118,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> { this.spec = spec; this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; - this.equalityFieldIds = equalityFieldIds; + this.equalityFieldIds = equalityFieldIds != null ? Sets.newHashSet(equalityFieldIds) : null; this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { @@ -137,7 +138,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> { flinkSchema, writeProperties, spec, - ArrayUtil.toIntArray(equalityFieldIds), + ArrayUtil.toPrimitive(equalityFieldIds.toArray(new Integer[0])), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null); } else { @@ -148,7 +149,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> { flinkSchema, writeProperties, spec, - ArrayUtil.toIntArray(equalityFieldIds), + ArrayUtil.toPrimitive(equalityFieldIds.toArray(new Integer[0])), schema, null); } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java index 0a2a7c1b88..3f60b45a1f 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java @@ -24,7 +24,6 @@ import java.util.Set; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,8 +41,8 @@ class SinkUtil { private static final Logger LOG = LoggerFactory.getLogger(SinkUtil.class); - static List<Integer> checkAndGetEqualityFieldIds(Table table, List<String> equalityFieldColumns) { - List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds()); + static Set<Integer> checkAndGetEqualityFieldIds(Table table, List<String> equalityFieldColumns) { + Set<Integer> equalityFieldIds = Sets.newHashSet(table.schema().identifierFieldIds()); if (equalityFieldColumns != null && !equalityFieldColumns.isEmpty()) { Set<Integer> equalityFieldSet = Sets.newHashSetWithExpectedSize(equalityFieldColumns.size()); for (String column : equalityFieldColumns) { @@ -63,7 +62,7 @@ class SinkUtil { equalityFieldSet, table.schema().identifierFieldIds()); } - equalityFieldIds = Lists.newArrayList(equalityFieldSet); + equalityFieldIds = Sets.newHashSet(equalityFieldSet); } return equalityFieldIds; } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java index 7680fb933b..b6ad03514b 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java @@ -19,7 +19,7 @@ package org.apache.iceberg.flink.sink; import java.io.IOException; -import java.util.List; +import java.util.Set; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -41,7 +41,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { long targetFileSize, Schema schema, RowType flinkSchema, - List<Integer> equalityFieldIds, + Set<Integer> equalityFieldIds, boolean upsert) { super( spec, diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java index 994f7a0865..600a4d8b95 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java @@ -18,7 +18,7 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import java.util.List; +import java.util.Set; import javax.annotation.Nullable; import org.apache.flink.table.data.RowData; import org.apache.iceberg.DistributionMode; @@ -37,7 +37,7 @@ public class DynamicRecord { private DistributionMode distributionMode; private int writeParallelism; private boolean upsertMode; - @Nullable private List<String> equalityFields; + @Nullable private Set<String> equalityFields; public DynamicRecord( TableIdentifier tableIdentifier, @@ -120,11 +120,11 @@ public class DynamicRecord { this.upsertMode = upsertMode; } - public List<String> equalityFields() { + public Set<String> equalityFields() { return equalityFields; } - public void setEqualityFields(List<String> equalityFields) { + public void setEqualityFields(Set<String> equalityFields) { this.equalityFields = equalityFields; } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java index 958a1e8539..fe1f4cdac9 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java @@ -18,8 +18,8 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import java.util.List; import java.util.Objects; +import java.util.Set; import org.apache.flink.annotation.Internal; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; @@ -37,7 +37,7 @@ class DynamicRecordInternal { private int writerKey; private RowData rowData; private boolean upsertMode; - private List<Integer> equalityFieldIds; + private Set<Integer> equalityFieldIds; // Required for serialization instantiation DynamicRecordInternal() {} @@ -50,7 +50,7 @@ class DynamicRecordInternal { PartitionSpec spec, int writerKey, boolean upsertMode, - List<Integer> equalityFieldsIds) { + Set<Integer> equalityFieldsIds) { this.tableName = tableName; this.branch = branch; this.schema = schema; @@ -117,11 +117,11 @@ class DynamicRecordInternal { this.upsertMode = upsertMode; } - public List<Integer> equalityFields() { + public Set<Integer> equalityFields() { return equalityFieldIds; } - public void setEqualityFieldIds(List<Integer> equalityFieldIds) { + public void setEqualityFieldIds(Set<Integer> equalityFieldIds) { this.equalityFieldIds = equalityFieldIds; } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java index a6f38f7d6b..d0a335b182 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java @@ -20,8 +20,8 @@ package org.apache.iceberg.flink.sink.dynamic; import java.io.IOException; import java.util.Collections; -import java.util.List; import java.util.Objects; +import java.util.Set; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; @@ -31,11 +31,11 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.hadoop.util.Sets; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; @Internal class DynamicRecordInternalSerializer extends TypeSerializer<DynamicRecordInternal> { @@ -126,11 +126,11 @@ class DynamicRecordInternalSerializer extends TypeSerializer<DynamicRecordIntern RowData rowData = rowDataSerializer.deserialize(dataInputView); boolean upsertMode = dataInputView.readBoolean(); int numEqualityFields = dataInputView.readInt(); - final List<Integer> equalityFieldIds; + final Set<Integer> equalityFieldIds; if (numEqualityFields > 0) { - equalityFieldIds = Lists.newArrayList(); + equalityFieldIds = Sets.newHashSetWithExpectedSize(numEqualityFields); } else { - equalityFieldIds = Collections.emptyList(); + equalityFieldIds = Collections.emptySet(); } for (int i = 0; i < numEqualityFields; i++) { @@ -173,11 +173,11 @@ class DynamicRecordInternalSerializer extends TypeSerializer<DynamicRecordIntern RowData rowData = rowDataSerializer.deserialize(dataInputView); boolean upsertMode = dataInputView.readBoolean(); int numEqualityFields = dataInputView.readInt(); - final List<Integer> equalityFieldIds; + final Set<Integer> equalityFieldIds; if (numEqualityFields > 0) { - equalityFieldIds = Lists.newArrayList(); + equalityFieldIds = Sets.newHashSetWithExpectedSize(numEqualityFields); } else { - equalityFieldIds = Collections.emptyList(); + equalityFieldIds = Collections.emptySet(); } for (int i = 0; i < numEqualityFields; i++) { equalityFieldIds.add(dataInputView.readInt()); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicSinkUtil.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicSinkUtil.java new file mode 100644 index 0000000000..6ea6dcab86 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicSinkUtil.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.Collections; +import java.util.Set; +import org.apache.hadoop.util.Sets; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; + +class DynamicSinkUtil { + + private DynamicSinkUtil() {} + + static Set<Integer> getEqualityFieldIds(Set<String> equalityFields, Schema schema) { + if (equalityFields == null || equalityFields.isEmpty()) { + if (!schema.identifierFieldIds().isEmpty()) { + return schema.identifierFieldIds(); + } else { + return Collections.emptySet(); + } + } + + Set<Integer> equalityFieldIds = Sets.newHashSetWithExpectedSize(equalityFields.size()); + for (String equalityField : equalityFields) { + Types.NestedField field = schema.findField(equalityField); + Preconditions.checkNotNull( + field, "Equality field %s does not exist in schema", equalityField); + equalityFieldIds.add(field.fieldId()); + } + + return equalityFieldIds; + } + + static int safeAbs(int input) { + if (input >= 0) { + return input; + } + + if (input == Integer.MIN_VALUE) { + // -Integer.MIN_VALUE would be Integer.MIN_VALUE due to integer overflow. Map to + // Integer.MAX_VALUE instead! + return Integer.MAX_VALUE; + } + + return -input; + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java new file mode 100644 index 0000000000..c37532714d --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; + +/** + * An optional operator to perform table updates for tables (e.g. schema update) in a non-concurrent + * way. Records must be keyed / routed to this operator by table name to ensure non-concurrent + * updates. The operator itself forwards the record after updating schema / spec of the table. The + * update is also reflected in the record. + */ +@Internal +class DynamicTableUpdateOperator + extends RichMapFunction<DynamicRecordInternal, DynamicRecordInternal> { + private final CatalogLoader catalogLoader; + private final int cacheMaximumSize; + private final long cacheRefreshMs; + private transient TableUpdater updater; + + DynamicTableUpdateOperator( + CatalogLoader catalogLoader, int cacheMaximumSize, long cacheRefreshMs) { + this.catalogLoader = catalogLoader; + this.cacheMaximumSize = cacheMaximumSize; + this.cacheRefreshMs = cacheRefreshMs; + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + Catalog catalog = catalogLoader.loadCatalog(); + this.updater = + new TableUpdater( + new TableMetadataCache(catalog, cacheMaximumSize, cacheRefreshMs), catalog); + } + + @Override + public DynamicRecordInternal map(DynamicRecordInternal data) throws Exception { + Tuple3<Schema, CompareSchemasVisitor.Result, PartitionSpec> newData = + updater.update( + TableIdentifier.parse(data.tableName()), data.branch(), data.schema(), data.spec()); + + data.setSchema(newData.f0); + data.setSpec(newData.f2); + + if (newData.f1 == CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED) { + RowData newRowData = RowDataEvolver.convert(data.rowData(), data.schema(), newData.f0); + data.setRowData(newRowData); + } + + return data; + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java index 3851dbf956..c4c4e61de1 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -114,7 +114,7 @@ class DynamicWriter implements CommittingSinkWriter<DynamicRecordInternal, Dynam Maps.newHashMap(commonWriteProperties); tableWriteProperties.putAll(table.properties()); - List<Integer> equalityFieldIds = + Set<Integer> equalityFieldIds = getEqualityFields(table, element.equalityFields()); if (element.upsertMode()) { Preconditions.checkState( @@ -138,7 +138,7 @@ class DynamicWriter implements CommittingSinkWriter<DynamicRecordInternal, Dynam targetDataFileSize, dataFileFormat, tableWriteProperties, - equalityFieldIds, + Lists.newArrayList(equalityFieldIds), element.upsertMode(), element.schema(), element.spec()); @@ -199,15 +199,15 @@ class DynamicWriter implements CommittingSinkWriter<DynamicRecordInternal, Dynam return result; } - private static List<Integer> getEqualityFields(Table table, List<Integer> equalityFieldIds) { + private static Set<Integer> getEqualityFields(Table table, Set<Integer> equalityFieldIds) { if (equalityFieldIds != null && !equalityFieldIds.isEmpty()) { return equalityFieldIds; } Set<Integer> identifierFieldIds = table.schema().identifierFieldIds(); if (identifierFieldIds != null && !identifierFieldIds.isEmpty()) { - return Lists.newArrayList(identifierFieldIds); + return identifierFieldIds; } - return Collections.emptyList(); + return Collections.emptySet(); } @VisibleForTesting diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java new file mode 100644 index 0000000000..6cb1f46089 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.util.Collections; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.sink.EqualityFieldKeySelector; +import org.apache.iceberg.flink.sink.PartitionKeySelector; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The HashKeyGenerator is responsible for creating the appropriate hash key for Flink's keyBy + * operation. The hash key is generated depending on the user-provided DynamicRecord and the table + * metadata. Under the hood, we maintain a set of Flink {@link KeySelector}s which implement the + * appropriate Iceberg {@link DistributionMode}. For every table, we randomly select a consistent + * subset of writer subtasks which receive data via their associated keys, depending on the chosen + * DistributionMode. + * + * <p>Caching ensures that a new key selector is also created when the table metadata (e.g. schema, + * spec) or the user-provided metadata changes (e.g. distribution mode, write parallelism). + * + * <p>Note: The hashing must be deterministic given the same parameters of the KeySelector and the + * same provided values. + */ +class HashKeyGenerator { + private static final Logger LOG = LoggerFactory.getLogger(HashKeyGenerator.class); + + private final int maxWriteParallelism; + private final Cache<SelectorKey, KeySelector<RowData, Integer>> keySelectorCache; + + HashKeyGenerator(int maxCacheSize, int maxWriteParallelism) { + this.maxWriteParallelism = maxWriteParallelism; + this.keySelectorCache = Caffeine.newBuilder().maximumSize(maxCacheSize).build(); + } + + int generateKey(DynamicRecord dynamicRecord) throws Exception { + return generateKey(dynamicRecord, null, null, null); + } + + int generateKey( + DynamicRecord dynamicRecord, + @Nullable Schema tableSchema, + @Nullable PartitionSpec tableSpec, + @Nullable RowData overrideRowData) + throws Exception { + String tableIdent = dynamicRecord.tableIdentifier().toString(); + SelectorKey cacheKey = + new SelectorKey( + tableIdent, + dynamicRecord.branch(), + tableSchema != null ? tableSchema.schemaId() : null, + tableSpec != null ? tableSpec.specId() : null, + dynamicRecord.schema(), + dynamicRecord.spec(), + dynamicRecord.equalityFields()); + return keySelectorCache + .get( + cacheKey, + k -> + getKeySelector( + tableIdent, + MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()), + MoreObjects.firstNonNull(tableSpec, dynamicRecord.spec()), + MoreObjects.firstNonNull( + dynamicRecord.distributionMode(), DistributionMode.NONE), + MoreObjects.firstNonNull( + dynamicRecord.equalityFields(), Collections.emptySet()), + dynamicRecord.writeParallelism())) + .getKey(overrideRowData != null ? overrideRowData : dynamicRecord.rowData()); + } + + private KeySelector<RowData, Integer> getKeySelector( + String tableName, + Schema schema, + PartitionSpec spec, + DistributionMode mode, + Set<String> equalityFields, + int writeParallelism) { + LOG.debug( + "Creating new KeySelector for table '{}' with distribution mode '{}'", tableName, mode); + switch (mode) { + case NONE: + if (equalityFields.isEmpty()) { + return tableKeySelector(tableName, writeParallelism, maxWriteParallelism); + } else { + LOG.info( + "{}: Distribute rows by equality fields, because there are equality fields set", + tableName); + return equalityFieldKeySelector( + tableName, schema, equalityFields, writeParallelism, maxWriteParallelism); + } + + case HASH: + if (equalityFields.isEmpty()) { + if (spec.isUnpartitioned()) { + LOG.warn( + "{}: Fallback to use 'none' distribution mode, because there are no equality fields set " + + "and table is unpartitioned", + tableName); + return tableKeySelector(tableName, writeParallelism, maxWriteParallelism); + } else { + return partitionKeySelector( + tableName, schema, spec, writeParallelism, maxWriteParallelism); + } + } else { + if (spec.isUnpartitioned()) { + LOG.info( + "{}: Distribute rows by equality fields, because there are equality fields set " + + "and table is unpartitioned", + tableName); + return equalityFieldKeySelector( + tableName, schema, equalityFields, writeParallelism, maxWriteParallelism); + } else { + for (PartitionField partitionField : spec.fields()) { + Preconditions.checkState( + equalityFields.contains(partitionField.name()), + "%s: In 'hash' distribution mode with equality fields set, partition field '%s' " + + "should be included in equality fields: '%s'", + tableName, + partitionField, + schema.columns().stream() + .filter(c -> equalityFields.contains(c.name())) + .collect(Collectors.toList())); + } + return partitionKeySelector( + tableName, schema, spec, writeParallelism, maxWriteParallelism); + } + } + + case RANGE: + if (schema.identifierFieldIds().isEmpty()) { + LOG.warn( + "{}: Fallback to use 'none' distribution mode, because there are no equality fields set " + + "and {}='range' is not supported yet in flink", + tableName, + WRITE_DISTRIBUTION_MODE); + return tableKeySelector(tableName, writeParallelism, maxWriteParallelism); + } else { + LOG.info( + "{}: Distribute rows by equality fields, because there are equality fields set " + + "and {}='range' is not supported yet in flink", + tableName, + WRITE_DISTRIBUTION_MODE); + return equalityFieldKeySelector( + tableName, schema, equalityFields, writeParallelism, maxWriteParallelism); + } + + default: + throw new IllegalArgumentException( + tableName + ": Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + mode); + } + } + + private static KeySelector<RowData, Integer> equalityFieldKeySelector( + String tableName, + Schema schema, + Set<String> equalityFields, + int writeParallelism, + int maxWriteParallelism) { + return new TargetLimitedKeySelector( + new EqualityFieldKeySelector( + schema, + FlinkSchemaUtil.convert(schema), + DynamicSinkUtil.getEqualityFieldIds(equalityFields, schema)), + tableName, + writeParallelism, + maxWriteParallelism); + } + + private static KeySelector<RowData, Integer> partitionKeySelector( + String tableName, + Schema schema, + PartitionSpec spec, + int writeParallelism, + int maxWriteParallelism) { + KeySelector<RowData, String> inner = + new PartitionKeySelector(spec, schema, FlinkSchemaUtil.convert(schema)); + return new TargetLimitedKeySelector( + in -> inner.getKey(in).hashCode(), tableName, writeParallelism, maxWriteParallelism); + } + + private static KeySelector<RowData, Integer> tableKeySelector( + String tableName, int writeParallelism, int maxWriteParallelism) { + return new TargetLimitedKeySelector( + new RoundRobinKeySelector<>(writeParallelism), + tableName, + writeParallelism, + maxWriteParallelism); + } + + /** + * Generates a new key using the salt as a base, and reduces the target key range of the {@link + * #wrapped} {@link KeySelector} to {@link #writeParallelism}. + */ + private static class TargetLimitedKeySelector implements KeySelector<RowData, Integer> { + private final KeySelector<RowData, Integer> wrapped; + private final int writeParallelism; + private final int[] distinctKeys; + + @SuppressWarnings("checkstyle:ParameterAssignment") + TargetLimitedKeySelector( + KeySelector<RowData, Integer> wrapped, + String tableName, + int writeParallelism, + int maxWriteParallelism) { + if (writeParallelism > maxWriteParallelism) { + LOG.warn( + "{}: writeParallelism {} is greater than maxWriteParallelism {}. Capping writeParallelism at {}", + tableName, + writeParallelism, + maxWriteParallelism, + maxWriteParallelism); + writeParallelism = maxWriteParallelism; + } + this.wrapped = wrapped; + this.writeParallelism = writeParallelism; + this.distinctKeys = new int[writeParallelism]; + + // Ensures that the generated keys are always result in unique slotId + Set<Integer> targetSlots = Sets.newHashSetWithExpectedSize(writeParallelism); + int nextKey = tableName.hashCode(); + for (int i = 0; i < writeParallelism; ++i) { + int subtaskId = subtaskId(nextKey, writeParallelism, maxWriteParallelism); + while (targetSlots.contains(subtaskId)) { + ++nextKey; + subtaskId = subtaskId(nextKey, writeParallelism, maxWriteParallelism); + } + + targetSlots.add(subtaskId); + distinctKeys[i] = nextKey; + ++nextKey; + } + } + + @Override + public Integer getKey(RowData value) throws Exception { + return distinctKeys[ + DynamicSinkUtil.safeAbs(wrapped.getKey(value).hashCode()) % writeParallelism]; + } + + private static int subtaskId(int key, int writeParallelism, int maxWriteParallelism) { + return KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup( + maxWriteParallelism, + writeParallelism, + KeyGroupRangeAssignment.computeKeyGroupForKeyHash(key, maxWriteParallelism)); + } + } + + /** + * Generates evenly distributed keys between [0..{@link #maxTarget}) range using round-robin + * algorithm. + * + * @param <T> unused input for key generation + */ + private static class RoundRobinKeySelector<T> implements KeySelector<T, Integer> { + private final int maxTarget; + private int lastTarget = 0; + + RoundRobinKeySelector(int maxTarget) { + this.maxTarget = maxTarget; + } + + @Override + public Integer getKey(T value) { + lastTarget = (lastTarget + 1) % maxTarget; + return lastTarget; + } + } + + /** + * Cache key for the {@link KeySelector}. Only contains the {@link Schema} and the {@link + * PartitionSpec} if their ids are not provided. + */ + static class SelectorKey { + private final String tableName; + private final String branch; + private final Integer schemaId; + private final Integer specId; + private final Schema schema; + private final PartitionSpec spec; + private final Set<String> equalityFields; + + SelectorKey( + String tableName, + String branch, + @Nullable Integer tableSchemaId, + @Nullable Integer tableSpecId, + Schema schema, + PartitionSpec spec, + Set<String> equalityFields) { + this.tableName = tableName; + this.branch = branch; + this.schemaId = tableSchemaId; + this.specId = tableSpecId; + this.schema = tableSchemaId == null ? schema : null; + this.spec = tableSpecId == null ? spec : null; + this.equalityFields = equalityFields; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + SelectorKey that = (SelectorKey) other; + return Objects.equals(tableName, that.tableName) + && Objects.equals(branch, that.branch) + && Objects.equals(schemaId, that.schemaId) + && Objects.equals(specId, that.specId) + && Objects.equals(schema, that.schema) + && Objects.equals(spec, that.spec) + && Objects.equals(equalityFields, that.equalityFields); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, branch, schemaId, specId, schema, spec, equalityFields); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("tableName", tableName) + .add("branch", branch) + .add("schemaId", schemaId) + .add("specId", specId) + .add("schema", schema) + .add("spec", spec) + .add("equalityFields", equalityFields) + .toString(); + } + } + + @VisibleForTesting + Cache<SelectorKey, KeySelector<RowData, Integer>> getKeySelectorCache() { + return keySelectorCache; + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java new file mode 100644 index 0000000000..fe670c54eb --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.List; +import java.util.Map; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * A RowDataEvolver is responsible to change the input RowData to make it compatible with the target + * schema. This is done when + * + * <ol> + * <li>The input schema has fewer fields than the target schema. + * <li>The table types are wider than the input type. + * <li>The field order differs for source and target schema. + * </ol> + * + * <p>The resolution is as follows: + * + * <ol> + * <li>In the first case, we would add a null values for the missing field (if the field is + * optional). + * <li>In the second case, we would convert the data for the input field to a wider type, e.g. int + * (input type) => long (table type). + * <li>In the third case, we would rearrange the input data to match the target table. + * </ol> + */ +class RowDataEvolver { + private RowDataEvolver() {} + + public static RowData convert(RowData sourceData, Schema sourceSchema, Schema targetSchema) { + return convertStruct( + sourceData, FlinkSchemaUtil.convert(sourceSchema), FlinkSchemaUtil.convert(targetSchema)); + } + + private static Object convert(Object object, LogicalType sourceType, LogicalType targetType) { + if (object == null) { + return null; + } + + switch (targetType.getTypeRoot()) { + case BOOLEAN: + case INTEGER: + case FLOAT: + case VARCHAR: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case BINARY: + case VARBINARY: + return object; + case DOUBLE: + if (object instanceof Float) { + return ((Float) object).doubleValue(); + } else { + return object; + } + case BIGINT: + if (object instanceof Integer) { + return ((Integer) object).longValue(); + } else { + return object; + } + case DECIMAL: + DecimalType toDecimalType = (DecimalType) targetType; + DecimalData decimalData = (DecimalData) object; + if (((DecimalType) sourceType).getPrecision() == toDecimalType.getPrecision()) { + return object; + } else { + return DecimalData.fromBigDecimal( + decimalData.toBigDecimal(), toDecimalType.getPrecision(), toDecimalType.getScale()); + } + case TIMESTAMP_WITHOUT_TIME_ZONE: + if (object instanceof Integer) { + LocalDateTime dateTime = + LocalDateTime.of(LocalDate.ofEpochDay((Integer) object), LocalTime.MIN); + return TimestampData.fromLocalDateTime(dateTime); + } else { + return object; + } + case ROW: + return convertStruct((RowData) object, (RowType) sourceType, (RowType) targetType); + case ARRAY: + return convertArray((ArrayData) object, (ArrayType) sourceType, (ArrayType) targetType); + case MAP: + return convertMap((MapData) object, (MapType) sourceType, (MapType) targetType); + default: + throw new UnsupportedOperationException("Not a supported type: " + targetType); + } + } + + private static RowData convertStruct(RowData sourceData, RowType sourceType, RowType targetType) { + GenericRowData targetData = new GenericRowData(targetType.getFields().size()); + List<RowType.RowField> targetFields = targetType.getFields(); + for (int i = 0; i < targetFields.size(); i++) { + RowType.RowField targetField = targetFields.get(i); + + int sourceFieldId = sourceType.getFieldIndex(targetField.getName()); + if (sourceFieldId == -1) { + if (targetField.getType().isNullable()) { + targetData.setField(i, null); + } else { + throw new IllegalArgumentException( + String.format( + "Field %s in target schema %s is non-nullable but does not exist in source schema.", + i + 1, targetType)); + } + } else { + RowData.FieldGetter getter = + RowData.createFieldGetter(sourceType.getTypeAt(sourceFieldId), sourceFieldId); + targetData.setField( + i, + convert( + getter.getFieldOrNull(sourceData), + sourceType.getFields().get(sourceFieldId).getType(), + targetField.getType())); + } + } + + return targetData; + } + + private static ArrayData convertArray( + ArrayData sourceData, ArrayType sourceType, ArrayType targetType) { + LogicalType fromElementType = sourceType.getElementType(); + LogicalType toElementType = targetType.getElementType(); + ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(fromElementType); + Object[] convertedArray = new Object[sourceData.size()]; + for (int i = 0; i < convertedArray.length; i++) { + convertedArray[i] = + convert(elementGetter.getElementOrNull(sourceData, i), fromElementType, toElementType); + } + + return new GenericArrayData(convertedArray); + } + + private static MapData convertMap(MapData sourceData, MapType sourceType, MapType targetType) { + LogicalType fromMapKeyType = sourceType.getKeyType(); + LogicalType fromMapValueType = sourceType.getValueType(); + LogicalType toMapKeyType = targetType.getKeyType(); + LogicalType toMapValueType = targetType.getValueType(); + ArrayData keyArray = sourceData.keyArray(); + ArrayData valueArray = sourceData.valueArray(); + ArrayData.ElementGetter keyGetter = ArrayData.createElementGetter(fromMapKeyType); + ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(fromMapValueType); + Map<Object, Object> convertedMap = Maps.newLinkedHashMap(); + for (int i = 0; i < keyArray.size(); ++i) { + convertedMap.put( + convert(keyGetter.getElementOrNull(keyArray, i), fromMapKeyType, toMapKeyType), + convert(valueGetter.getElementOrNull(valueArray, i), fromMapValueType, toMapValueType)); + } + + return new GenericMapData(convertedMap); + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java index 0a43404d13..afd5b637e9 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java @@ -20,12 +20,12 @@ package org.apache.iceberg.flink.sink.dynamic; import java.io.IOException; import java.io.Serializable; -import java.util.List; import java.util.Objects; +import java.util.Set; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.hadoop.util.Sets; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; class WriteTarget implements Serializable { @@ -34,7 +34,7 @@ class WriteTarget implements Serializable { private final Integer schemaId; private final Integer specId; private final boolean upsertMode; - private final List<Integer> equalityFields; + private final Set<Integer> equalityFields; WriteTarget( String tableName, @@ -42,7 +42,7 @@ class WriteTarget implements Serializable { Integer schemaId, Integer specId, boolean upsertMode, - List<Integer> equalityFields) { + Set<Integer> equalityFields) { this.tableName = tableName; this.branch = branch != null ? branch : "main"; this.schemaId = schemaId; @@ -71,7 +71,7 @@ class WriteTarget implements Serializable { return upsertMode; } - List<Integer> equalityFields() { + Set<Integer> equalityFields() { return equalityFields; } @@ -94,12 +94,12 @@ class WriteTarget implements Serializable { view.readInt(), view.readInt(), view.readBoolean(), - readList(view)); + readSet(view)); } - private static List<Integer> readList(DataInputView view) throws IOException { + private static Set<Integer> readSet(DataInputView view) throws IOException { int numFields = view.readInt(); - List<Integer> equalityFields = Lists.newArrayList(); + Set<Integer> equalityFields = Sets.newHashSetWithExpectedSize(numFields); for (int i = 0; i < numFields; i++) { equalityFields.add(view.readInt()); } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java index 7df167ec32..a21c51c378 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java @@ -34,6 +34,7 @@ import java.nio.file.Paths; import java.time.OffsetDateTime; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -63,6 +64,7 @@ import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; @@ -100,7 +102,7 @@ public class TestDeltaTaskWriter extends TestBase { } private void testCdcEvents(boolean partitioned) throws IOException { - List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId()); + Set<Integer> equalityFieldIds = Sets.newHashSet(idFieldId()); TaskWriterFactory<RowData> taskWriterFactory = createTaskWriterFactory(equalityFieldIds); taskWriterFactory.initialize(1, 1); @@ -178,7 +180,7 @@ public class TestDeltaTaskWriter extends TestBase { private void testWritePureEqDeletes(boolean partitioned) throws IOException { createAndInitTable(partitioned); - List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId()); + Set<Integer> equalityFieldIds = Sets.newHashSet(idFieldId()); TaskWriterFactory<RowData> taskWriterFactory = createTaskWriterFactory(equalityFieldIds); taskWriterFactory.initialize(1, 1); @@ -207,7 +209,7 @@ public class TestDeltaTaskWriter extends TestBase { private void testAbort(boolean partitioned) throws IOException { createAndInitTable(partitioned); - List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId()); + Set<Integer> equalityFieldIds = Sets.newHashSet(idFieldId()); TaskWriterFactory<RowData> taskWriterFactory = createTaskWriterFactory(equalityFieldIds); taskWriterFactory.initialize(1, 1); @@ -247,7 +249,7 @@ public class TestDeltaTaskWriter extends TestBase { @TestTemplate public void testPartitionedTableWithDataAsKey() throws IOException { createAndInitTable(true); - List<Integer> equalityFieldIds = Lists.newArrayList(dataFieldId()); + Set<Integer> equalityFieldIds = Sets.newHashSet(dataFieldId()); TaskWriterFactory<RowData> taskWriterFactory = createTaskWriterFactory(equalityFieldIds); taskWriterFactory.initialize(1, 1); @@ -290,7 +292,7 @@ public class TestDeltaTaskWriter extends TestBase { @TestTemplate public void testPartitionedTableWithDataAndIdAsKey() throws IOException { createAndInitTable(true); - List<Integer> equalityFieldIds = Lists.newArrayList(dataFieldId(), idFieldId()); + Set<Integer> equalityFieldIds = Sets.newHashSet(dataFieldId(), idFieldId()); TaskWriterFactory<RowData> taskWriterFactory = createTaskWriterFactory(equalityFieldIds); taskWriterFactory.initialize(1, 1); @@ -325,7 +327,7 @@ public class TestDeltaTaskWriter extends TestBase { this.table = create(tableSchema, PartitionSpec.unpartitioned()); initTable(table); - List<Integer> equalityIds = ImmutableList.of(table.schema().findField("ts").fieldId()); + Set<Integer> equalityIds = ImmutableSet.of(table.schema().findField("ts").fieldId()); TaskWriterFactory<RowData> taskWriterFactory = createTaskWriterFactory(flinkType, equalityIds); taskWriterFactory.initialize(1, 1); @@ -383,7 +385,7 @@ public class TestDeltaTaskWriter extends TestBase { return SimpleDataUtil.actualRowSet(table, columns); } - private TaskWriterFactory<RowData> createTaskWriterFactory(List<Integer> equalityFieldIds) { + private TaskWriterFactory<RowData> createTaskWriterFactory(Set<Integer> equalityFieldIds) { return new RowDataTaskWriterFactory( SerializableTable.copyOf(table), FlinkSchemaUtil.convert(table.schema()), @@ -395,7 +397,7 @@ public class TestDeltaTaskWriter extends TestBase { } private TaskWriterFactory<RowData> createTaskWriterFactory( - RowType flinkType, List<Integer> equalityFieldIds) { + RowType flinkType, Set<Integer> equalityFieldIds) { return new RowDataTaskWriterFactory( SerializableTable.copyOf(table), flinkType, diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java index 243b7f9095..30782e8d41 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java @@ -80,7 +80,7 @@ abstract class DynamicRecordInternalSerializerTestBase return new DynamicRecordInternal[] { new DynamicRecordInternal( - TABLE, BRANCH, SCHEMA, rowData, SPEC, 42, false, Collections.emptyList()) + TABLE, BRANCH, SCHEMA, rowData, SPEC, 42, false, Collections.emptySet()) }; } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java index f4109a6476..13a06d3627 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java @@ -24,7 +24,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.jupiter.api.Test; class TestDynamicCommittableSerializer { @@ -33,7 +33,7 @@ class TestDynamicCommittableSerializer { void testRoundtrip() throws IOException { DynamicCommittable committable = new DynamicCommittable( - new WriteTarget("table", "branch", 42, 23, false, Lists.newArrayList(1, 2)), + new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), new byte[] {3, 4}, JobID.generate().toHexString(), new OperatorID().toHexString(), @@ -48,7 +48,7 @@ class TestDynamicCommittableSerializer { void testUnsupportedVersion() throws IOException { DynamicCommittable committable = new DynamicCommittable( - new WriteTarget("table", "branch", 42, 23, false, Lists.newArrayList(1, 2)), + new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), new byte[] {3, 4}, JobID.generate().toHexString(), new OperatorID().toHexString(), diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index d9129d6eac..99a5465362 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -41,8 +41,8 @@ import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -105,11 +105,11 @@ class TestDynamicCommitter { committerMetrics); WriteTarget writeTarget1 = - new WriteTarget(TABLE1, "branch", 42, 0, true, Lists.newArrayList(1, 2)); + new WriteTarget(TABLE1, "branch", 42, 0, true, Sets.newHashSet(1, 2)); WriteTarget writeTarget2 = - new WriteTarget(TABLE1, "branch2", 43, 0, true, Lists.newArrayList(1, 2)); + new WriteTarget(TABLE1, "branch2", 43, 0, true, Sets.newHashSet(1, 2)); WriteTarget writeTarget3 = - new WriteTarget(TABLE2, "branch2", 43, 0, true, Lists.newArrayList(1, 2)); + new WriteTarget(TABLE2, "branch2", 43, 0, true, Sets.newHashSet(1, 2)); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); @@ -120,21 +120,21 @@ class TestDynamicCommitter { byte[] deltaManifest1 = aggregator.writeToManifest( writeTarget1, - Lists.newArrayList( + Sets.newHashSet( new DynamicWriteResult( writeTarget1, WriteResult.builder().addDataFiles(DATA_FILE).build())), 0); byte[] deltaManifest2 = aggregator.writeToManifest( writeTarget2, - Lists.newArrayList( + Sets.newHashSet( new DynamicWriteResult( writeTarget2, WriteResult.builder().addDataFiles(DATA_FILE).build())), 0); byte[] deltaManifest3 = aggregator.writeToManifest( writeTarget3, - Lists.newArrayList( + Sets.newHashSet( new DynamicWriteResult( writeTarget3, WriteResult.builder().addDataFiles(DATA_FILE).build())), 0); @@ -155,7 +155,7 @@ class TestDynamicCommitter { new MockCommitRequest<>( new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId)); - dynamicCommitter.commit(Lists.newArrayList(commitRequest1, commitRequest2, commitRequest3)); + dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2, commitRequest3)); table1.refresh(); assertThat(table1.snapshots()).hasSize(2); @@ -238,7 +238,7 @@ class TestDynamicCommitter { committerMetrics); WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Lists.newArrayList(1, 2)); + new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); @@ -253,7 +253,7 @@ class TestDynamicCommitter { byte[] deltaManifest = aggregator.writeToManifest( writeTarget, - Lists.newArrayList( + Sets.newHashSet( new DynamicWriteResult( writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), checkpointId); @@ -262,7 +262,7 @@ class TestDynamicCommitter { new MockCommitRequest<>( new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); - dynamicCommitter.commit(Lists.newArrayList(commitRequest)); + dynamicCommitter.commit(Sets.newHashSet(commitRequest)); CommitRequest<DynamicCommittable> oldCommitRequest = new MockCommitRequest<>( @@ -270,7 +270,7 @@ class TestDynamicCommitter { writeTarget, deltaManifest, jobId, operatorId, checkpointId - 1)); // Old commits requests shouldn't affect the result - dynamicCommitter.commit(Lists.newArrayList(oldCommitRequest)); + dynamicCommitter.commit(Sets.newHashSet(oldCommitRequest)); table1.refresh(); assertThat(table1.snapshots()).hasSize(1); @@ -315,7 +315,7 @@ class TestDynamicCommitter { committerMetrics); WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Lists.newArrayList(1, 2)); + new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); @@ -330,7 +330,7 @@ class TestDynamicCommitter { byte[] deltaManifest = aggregator.writeToManifest( writeTarget, - Lists.newArrayList( + Sets.newHashSet( new DynamicWriteResult( writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), checkpointId); @@ -339,12 +339,12 @@ class TestDynamicCommitter { new MockCommitRequest<>( new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); - dynamicCommitter.commit(Lists.newArrayList(commitRequest)); + dynamicCommitter.commit(Sets.newHashSet(commitRequest)); byte[] overwriteManifest = aggregator.writeToManifest( writeTarget, - Lists.newArrayList( + Sets.newHashSet( new DynamicWriteResult( writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), checkpointId + 1); @@ -354,7 +354,7 @@ class TestDynamicCommitter { new DynamicCommittable( writeTarget, overwriteManifest, jobId, operatorId, checkpointId + 1)); - dynamicCommitter.commit(Lists.newArrayList(overwriteRequest)); + dynamicCommitter.commit(Sets.newHashSet(overwriteRequest)); table1.refresh(); assertThat(table1.snapshots()).hasSize(2); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java new file mode 100644 index 0000000000..aa483b3e53 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.apache.iceberg.flink.TestFixtures.TABLE; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import org.apache.flink.table.data.GenericRowData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class TestDynamicTableUpdateOperator { + + @RegisterExtension + private static final HadoopCatalogExtension CATALOG_EXTENSION = + new HadoopCatalogExtension(DATABASE, TABLE); + + private static final Schema SCHEMA1 = + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + + private static final Schema SCHEMA2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + @Test + void testDynamicTableUpdateOperatorNewTable() throws Exception { + int cacheMaximumSize = 10; + int cacheRefreshMs = 1000; + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier table = TableIdentifier.of(TABLE); + + assertThat(catalog.tableExists(table)).isFalse(); + DynamicTableUpdateOperator operator = + new DynamicTableUpdateOperator( + CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize, cacheRefreshMs); + operator.open(null); + + DynamicRecordInternal input = + new DynamicRecordInternal( + TABLE, + "branch", + SCHEMA1, + GenericRowData.of(1, "test"), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + DynamicRecordInternal output = operator.map(input); + + assertThat(catalog.tableExists(table)).isTrue(); + assertThat(input).isEqualTo(output); + } + + @Test + void testDynamicTableUpdateOperatorSchemaChange() throws Exception { + int cacheMaximumSize = 10; + int cacheRefreshMs = 1000; + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier table = TableIdentifier.of(TABLE); + + DynamicTableUpdateOperator operator = + new DynamicTableUpdateOperator( + CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize, cacheRefreshMs); + operator.open(null); + + catalog.createTable(table, SCHEMA1); + DynamicRecordInternal input = + new DynamicRecordInternal( + TABLE, + "branch", + SCHEMA2, + GenericRowData.of(1, "test"), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + DynamicRecordInternal output = operator.map(input); + + assertThat(catalog.loadTable(table).schema().sameSchema(SCHEMA2)).isTrue(); + assertThat(input).isEqualTo(output); + + // Process the same input again + DynamicRecordInternal output2 = operator.map(input); + assertThat(output2).isEqualTo(output); + assertThat(catalog.loadTable(table).schema().schemaId()).isEqualTo(output.schema().schemaId()); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java index 137b87bb17..713c67da17 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java @@ -24,11 +24,11 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.hadoop.util.Sets; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.io.WriteResult; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -49,12 +49,11 @@ class TestDynamicWriteResultAggregator { testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) { testHarness.open(); - WriteTarget writeTarget1 = - new WriteTarget("table", "branch", 42, 0, true, Lists.newArrayList()); + WriteTarget writeTarget1 = new WriteTarget("table", "branch", 42, 0, true, Sets.newHashSet()); DynamicWriteResult dynamicWriteResult1 = new DynamicWriteResult(writeTarget1, WriteResult.builder().build()); WriteTarget writeTarget2 = - new WriteTarget("table2", "branch", 42, 0, true, Lists.newArrayList(1, 2)); + new WriteTarget("table2", "branch", 42, 0, true, Sets.newHashSet(1, 2)); DynamicWriteResult dynamicWriteResult2 = new DynamicWriteResult(writeTarget2, WriteResult.builder().build()); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java index bede5d42b9..a3a9691107 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java @@ -23,13 +23,13 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hadoop.util.Sets; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.Test; class TestDynamicWriteResultSerializer { @@ -53,7 +53,7 @@ class TestDynamicWriteResultSerializer { void testRoundtrip() throws IOException { DynamicWriteResult dynamicWriteResult = new DynamicWriteResult( - new WriteTarget("table", "branch", 42, 23, false, Lists.newArrayList(1, 2)), + new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), WriteResult.builder().addDataFiles(DATA_FILE).build()); DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); @@ -71,7 +71,7 @@ class TestDynamicWriteResultSerializer { void testUnsupportedVersion() throws IOException { DynamicWriteResult dynamicWriteResult = new DynamicWriteResult( - new WriteTarget("table", "branch", 42, 23, false, Lists.newArrayList(1, 2)), + new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), WriteResult.builder().addDataFiles(DATA_FILE).build()); DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java index 0a723c9d57..0c223f3237 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java @@ -33,7 +33,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; import org.apache.iceberg.io.WriteResult; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; @@ -118,7 +118,7 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase { DynamicRecordInternal record = getDynamicRecordInternal(table1); record.setUpsertMode(true); - record.setEqualityFieldIds(Lists.newArrayList(1)); + record.setEqualityFieldIds(Sets.newHashSet(1)); dyamicWriter.write(record, null); dyamicWriter.prepareCommit(); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java new file mode 100644 index 0000000000..b8d86feb99 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.benmanes.caffeine.cache.Cache; +import java.util.Collections; +import java.util.Set; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +class TestHashKeyGenerator { + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + private static final String BRANCH = "main"; + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("default", "table"); + + @Test + void testRoundRobinWithDistributionModeNone() throws Exception { + int writeParallelism = 10; + int maxWriteParallelism = 2; + HashKeyGenerator generator = new HashKeyGenerator(1, maxWriteParallelism); + PartitionSpec spec = PartitionSpec.unpartitioned(); + + GenericRowData row = GenericRowData.of(1, StringData.fromString("z")); + int writeKey1 = + getWriteKey( + generator, spec, DistributionMode.NONE, writeParallelism, Collections.emptySet(), row); + int writeKey2 = + getWriteKey( + generator, spec, DistributionMode.NONE, writeParallelism, Collections.emptySet(), row); + int writeKey3 = + getWriteKey( + generator, spec, DistributionMode.NONE, writeParallelism, Collections.emptySet(), row); + int writeKey4 = + getWriteKey( + generator, spec, DistributionMode.NONE, writeParallelism, Collections.emptySet(), row); + + assertThat(writeKey1).isNotEqualTo(writeKey2); + assertThat(writeKey3).isEqualTo(writeKey1); + assertThat(writeKey4).isEqualTo(writeKey2); + + assertThat(getSubTaskId(writeKey1, writeParallelism, maxWriteParallelism)).isEqualTo(0); + assertThat(getSubTaskId(writeKey2, writeParallelism, maxWriteParallelism)).isEqualTo(5); + assertThat(getSubTaskId(writeKey3, writeParallelism, maxWriteParallelism)).isEqualTo(0); + assertThat(getSubTaskId(writeKey4, writeParallelism, maxWriteParallelism)).isEqualTo(5); + } + + @Test + void testBucketingWithDistributionModeHash() throws Exception { + int writeParallelism = 3; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(1, maxWriteParallelism); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + + GenericRowData row1 = GenericRowData.of(1, StringData.fromString("a")); + GenericRowData row2 = GenericRowData.of(1, StringData.fromString("b")); + GenericRowData row3 = GenericRowData.of(2, StringData.fromString("c")); + GenericRowData row4 = GenericRowData.of(2, StringData.fromString("d")); + + int writeKey1 = + getWriteKey( + generator, spec, DistributionMode.HASH, writeParallelism, Collections.emptySet(), row1); + int writeKey2 = + getWriteKey( + generator, spec, DistributionMode.HASH, writeParallelism, Collections.emptySet(), row2); + int writeKey3 = + getWriteKey( + generator, spec, DistributionMode.HASH, writeParallelism, Collections.emptySet(), row3); + int writeKey4 = + getWriteKey( + generator, spec, DistributionMode.HASH, writeParallelism, Collections.emptySet(), row4); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey3).isNotEqualTo(writeKey1); + assertThat(writeKey4).isEqualTo(writeKey3); + + assertThat(getSubTaskId(writeKey1, writeParallelism, maxWriteParallelism)).isEqualTo(0); + assertThat(getSubTaskId(writeKey2, writeParallelism, maxWriteParallelism)).isEqualTo(0); + assertThat(getSubTaskId(writeKey3, writeParallelism, maxWriteParallelism)).isEqualTo(1); + assertThat(getSubTaskId(writeKey4, writeParallelism, maxWriteParallelism)).isEqualTo(1); + } + + @Test + void testEqualityKeys() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData row1 = GenericRowData.of(1, StringData.fromString("foo")); + GenericRowData row2 = GenericRowData.of(1, StringData.fromString("bar")); + GenericRowData row3 = GenericRowData.of(2, StringData.fromString("baz")); + Set<String> equalityColumns = Collections.singleton("id"); + + int writeKey1 = + getWriteKey( + generator, + unpartitioned, + DistributionMode.NONE, + writeParallelism, + equalityColumns, + row1); + int writeKey2 = + getWriteKey( + generator, + unpartitioned, + DistributionMode.NONE, + writeParallelism, + equalityColumns, + row2); + int writeKey3 = + getWriteKey( + generator, + unpartitioned, + DistributionMode.NONE, + writeParallelism, + equalityColumns, + row3); + + assertThat(writeKey1).isEqualTo(writeKey2); + assertThat(writeKey2).isNotEqualTo(writeKey3); + + assertThat(getSubTaskId(writeKey1, writeParallelism, maxWriteParallelism)).isEqualTo(1); + assertThat(getSubTaskId(writeKey2, writeParallelism, maxWriteParallelism)).isEqualTo(1); + assertThat(getSubTaskId(writeKey3, writeParallelism, maxWriteParallelism)).isEqualTo(0); + } + + @Test + void testCapAtMaxWriteParallelism() throws Exception { + int writeParallelism = 10; + int maxWriteParallelism = 5; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + Set<Integer> writeKeys = Sets.newHashSet(); + for (int i = 0; i < 20; i++) { + GenericRowData row = GenericRowData.of(i, StringData.fromString("z")); + writeKeys.add( + getWriteKey( + generator, + unpartitioned, + DistributionMode.NONE, + writeParallelism, + Collections.emptySet(), + row)); + } + + assertThat(writeKeys).hasSize(maxWriteParallelism); + assertThat( + writeKeys.stream() + .map(key -> getSubTaskId(key, writeParallelism, writeParallelism)) + .distinct() + .count()) + .isEqualTo(maxWriteParallelism); + } + + @Test + void testHashModeWithoutEqualityFieldsFallsBackToNone() throws Exception { + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + Schema noIdSchema = new Schema(Types.NestedField.required(1, "x", Types.StringType.get())); + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + DynamicRecord record = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + noIdSchema, + GenericRowData.of(StringData.fromString("v")), + unpartitioned, + DistributionMode.HASH, + writeParallelism); + + int writeKey1 = generator.generateKey(record); + int writeKey2 = generator.generateKey(record); + int writeKey3 = generator.generateKey(record); + assertThat(writeKey1).isNotEqualTo(writeKey2); + assertThat(writeKey3).isEqualTo(writeKey1); + + assertThat(getSubTaskId(writeKey1, writeParallelism, maxWriteParallelism)).isEqualTo(1); + assertThat(getSubTaskId(writeKey2, writeParallelism, maxWriteParallelism)).isEqualTo(0); + assertThat(getSubTaskId(writeKey3, writeParallelism, maxWriteParallelism)).isEqualTo(1); + } + + @Test + void testSchemaSpecOverrides() throws Exception { + int maxCacheSize = 10; + int writeParallelism = 5; + int maxWriteParallelism = 10; + HashKeyGenerator generator = new HashKeyGenerator(maxCacheSize, maxWriteParallelism); + + DynamicRecord record = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + SCHEMA, + GenericRowData.of(1, StringData.fromString("foo")), + PartitionSpec.unpartitioned(), + DistributionMode.NONE, + writeParallelism); + + int writeKey1 = generator.generateKey(record); + int writeKey2 = generator.generateKey(record); + // Assert that we are bucketing via NONE (round-robin) + assertThat(writeKey1).isNotEqualTo(writeKey2); + + // Schema has different id + Schema overrideSchema = new Schema(42, SCHEMA.columns()); + // Spec has different id + PartitionSpec overrideSpec = PartitionSpec.builderFor(SCHEMA).withSpecId(42).build(); + RowData overrideData = GenericRowData.of(1L, StringData.fromString("foo")); + + // We get a new key selector for the schema which starts off on the same offset + assertThat(generator.generateKey(record, overrideSchema, null, null)).isEqualTo(writeKey1); + // We get a new key selector for the spec which starts off on the same offset + assertThat(generator.generateKey(record, null, overrideSpec, null)).isEqualTo(writeKey1); + // We get the same key selector which yields a different result for the overridden data + assertThat(generator.generateKey(record, null, null, overrideData)).isNotEqualTo(writeKey1); + } + + @Test + void testMultipleTables() throws Exception { + int maxCacheSize = 10; + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(maxCacheSize, maxWriteParallelism); + + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + + GenericRowData rowData = GenericRowData.of(1, StringData.fromString("foo")); + + DynamicRecord record1 = + new DynamicRecord( + TableIdentifier.of("a", "table"), + BRANCH, + SCHEMA, + rowData, + unpartitioned, + DistributionMode.HASH, + writeParallelism); + record1.setEqualityFields(Collections.singleton("id")); + DynamicRecord record2 = + new DynamicRecord( + TableIdentifier.of("my", "other", "table"), + BRANCH, + SCHEMA, + rowData, + unpartitioned, + DistributionMode.HASH, + writeParallelism); + record2.setEqualityFields(Collections.singleton("id")); + + // Consistent hashing for the same record due to HASH distribution mode + int writeKeyRecord1 = generator.generateKey(record1); + assertThat(writeKeyRecord1).isEqualTo(generator.generateKey(record1)); + int writeKeyRecord2 = generator.generateKey(record2); + assertThat(writeKeyRecord2).isEqualTo(generator.generateKey(record2)); + + // But the write keys are for different tables and should not be equal + assertThat(writeKeyRecord1).isNotEqualTo(writeKeyRecord2); + + assertThat(getSubTaskId(writeKeyRecord1, writeParallelism, maxWriteParallelism)).isEqualTo(1); + assertThat(getSubTaskId(writeKeyRecord2, writeParallelism, maxWriteParallelism)).isEqualTo(0); + } + + @Test + void testCaching() throws Exception { + int maxCacheSize = 1; + int writeParallelism = 2; + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(maxCacheSize, maxWriteParallelism); + Cache<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>> keySelectorCache = + generator.getKeySelectorCache(); + + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + DynamicRecord record = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + SCHEMA, + GenericRowData.of(1, StringData.fromString("foo")), + unpartitioned, + DistributionMode.NONE, + writeParallelism); + + int writeKey1 = generator.generateKey(record); + assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + + int writeKey2 = generator.generateKey(record); + assertThat(writeKey2).isNotEqualTo(writeKey1); + // Manually clean up because the cleanup is not always triggered + keySelectorCache.cleanUp(); + assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + + int writeKey3 = generator.generateKey(record); + // Manually clean up because the cleanup is not always triggered + keySelectorCache.cleanUp(); + assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + // We create a new key selector which will start off at the same position + assertThat(writeKey1).isEqualTo(writeKey3); + } + + private static int getWriteKey( + HashKeyGenerator generator, + PartitionSpec spec, + DistributionMode mode, + int writeParallelism, + Set<String> equalityFields, + GenericRowData row) + throws Exception { + DynamicRecord record = + new DynamicRecord(TABLE_IDENTIFIER, BRANCH, SCHEMA, row, spec, mode, writeParallelism); + record.setEqualityFields(equalityFields); + return generator.generateKey(record); + } + + private static int getSubTaskId(int writeKey1, int writeParallelism, int maxWriteParallelism) { + return KeyGroupRangeAssignment.assignKeyToParallelOperator( + writeKey1, maxWriteParallelism, writeParallelism); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java new file mode 100644 index 0000000000..2553575f18 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.math.BigDecimal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.DataGenerator; +import org.apache.iceberg.flink.DataGenerators; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Days; +import org.junit.jupiter.api.Test; + +class TestRowDataEvolver { + + static final Schema SCHEMA = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + static final Schema SCHEMA2 = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "onemore", Types.DoubleType.get())); + + @Test + void testPrimitiveTypes() { + DataGenerator generator = new DataGenerators.Primitives(); + assertThat( + RowDataEvolver.convert( + generator.generateFlinkRowData(), + generator.icebergSchema(), + generator.icebergSchema())) + .isEqualTo(generator.generateFlinkRowData()); + } + + @Test + void testAddColumn() { + assertThat(RowDataEvolver.convert(SimpleDataUtil.createRowData(1, "a"), SCHEMA, SCHEMA2)) + .isEqualTo(GenericRowData.of(1, StringData.fromString("a"), null)); + } + + @Test + void testAddRequiredColumn() { + Schema currentSchema = new Schema(Types.NestedField.optional(1, "id", Types.IntegerType.get())); + Schema targetSchema = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get())); + + assertThrows( + IllegalArgumentException.class, + () -> RowDataEvolver.convert(GenericRowData.of(42), currentSchema, targetSchema)); + } + + @Test + void testIntToLong() { + Schema schemaWithLong = + new Schema( + Types.NestedField.optional(2, "id", Types.LongType.get()), + Types.NestedField.optional(4, "data", Types.StringType.get())); + + assertThat( + RowDataEvolver.convert( + SimpleDataUtil.createRowData(1, "a"), SimpleDataUtil.SCHEMA, schemaWithLong)) + .isEqualTo(GenericRowData.of(1L, StringData.fromString("a"))); + } + + @Test + void testFloatToDouble() { + Schema schemaWithFloat = + new Schema(Types.NestedField.optional(1, "float2double", Types.FloatType.get())); + Schema schemaWithDouble = + new Schema(Types.NestedField.optional(2, "float2double", Types.DoubleType.get())); + + assertThat(RowDataEvolver.convert(GenericRowData.of(1.5f), schemaWithFloat, schemaWithDouble)) + .isEqualTo(GenericRowData.of(1.5d)); + } + + @Test + void testDateToTimestamp() { + Schema schemaWithFloat = + new Schema(Types.NestedField.optional(1, "date2timestamp", Types.DateType.get())); + Schema schemaWithDouble = + new Schema( + Types.NestedField.optional(2, "date2timestamp", Types.TimestampType.withoutZone())); + + DateTime time = new DateTime(2022, 1, 10, 0, 0, 0, 0, DateTimeZone.UTC); + int days = + Days.daysBetween(new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeZone.UTC), time).getDays(); + + assertThat(RowDataEvolver.convert(GenericRowData.of(days), schemaWithFloat, schemaWithDouble)) + .isEqualTo(GenericRowData.of(TimestampData.fromEpochMillis(time.getMillis()))); + } + + @Test + void testIncreasePrecision() { + Schema before = + new Schema(Types.NestedField.required(14, "decimal_field", Types.DecimalType.of(9, 2))); + Schema after = + new Schema(Types.NestedField.required(14, "decimal_field", Types.DecimalType.of(10, 2))); + + assertThat( + RowDataEvolver.convert( + GenericRowData.of(DecimalData.fromBigDecimal(new BigDecimal("-1.50"), 9, 2)), + before, + after)) + .isEqualTo(GenericRowData.of(DecimalData.fromBigDecimal(new BigDecimal("-1.50"), 10, 2))); + } + + @Test + void testStructAddOptionalFields() { + DataGenerator generator = new DataGenerators.StructOfPrimitive(); + RowData oldData = generator.generateFlinkRowData(); + Schema oldSchema = generator.icebergSchema(); + Types.NestedField structField = oldSchema.columns().get(1); + Schema newSchema = + new Schema( + oldSchema.columns().get(0), + Types.NestedField.required( + 10, + structField.name(), + Types.StructType.of( + required(101, "id", Types.IntegerType.get()), + optional(103, "optional", Types.StringType.get()), + required(102, "name", Types.StringType.get())))); + RowData newData = + GenericRowData.of( + StringData.fromString("row_id_value"), + GenericRowData.of(1, null, StringData.fromString("Jane"))); + + assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(newData); + } + + @Test + void testStructAddRequiredFieldsWithOptionalRoot() { + DataGenerator generator = new DataGenerators.StructOfPrimitive(); + RowData oldData = generator.generateFlinkRowData(); + Schema oldSchema = generator.icebergSchema(); + Types.NestedField structField = oldSchema.columns().get(1); + Schema newSchema = + new Schema( + oldSchema.columns().get(0), + Types.NestedField.optional( + 10, + "newFieldOptionalField", + Types.StructType.of( + Types.NestedField.optional( + structField.fieldId(), + structField.name(), + Types.StructType.of( + optional(101, "id", Types.IntegerType.get()), + // Required columns which leads to nulling the entire struct + required(103, "required", Types.StringType.get()), + required(102, "name", Types.StringType.get())))))); + + RowData expectedData = GenericRowData.of(StringData.fromString("row_id_value"), null); + + assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(expectedData); + } + + @Test + void testStructAddRequiredFields() { + DataGenerator generator = new DataGenerators.StructOfPrimitive(); + RowData oldData = generator.generateFlinkRowData(); + Schema oldSchema = generator.icebergSchema(); + Types.NestedField structField = oldSchema.columns().get(1); + Schema newSchema = + new Schema( + oldSchema.columns().get(0), + Types.NestedField.required( + 10, + structField.name(), + Types.StructType.of( + required(101, "id", Types.IntegerType.get()), + required(103, "required", Types.StringType.get()), + required(102, "name", Types.StringType.get())))); + + assertThrows( + IllegalArgumentException.class, + () -> RowDataEvolver.convert(oldData, oldSchema, newSchema)); + } + + @Test + void testMap() { + DataGenerator generator = new DataGenerators.MapOfPrimitives(); + RowData oldData = generator.generateFlinkRowData(); + Schema oldSchema = generator.icebergSchema(); + Types.NestedField mapField = oldSchema.columns().get(1); + Schema newSchema = + new Schema( + oldSchema.columns().get(0), + Types.NestedField.optional( + 10, + mapField.name(), + Types.MapType.ofRequired(101, 102, Types.StringType.get(), Types.LongType.get()))); + RowData newData = + GenericRowData.of( + StringData.fromString("row_id_value"), + new GenericMapData( + ImmutableMap.of( + StringData.fromString("Jane"), 1L, StringData.fromString("Joe"), 2L))); + + assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(newData); + } + + @Test + void testArray() { + DataGenerator generator = new DataGenerators.ArrayOfPrimitive(); + RowData oldData = generator.generateFlinkRowData(); + Schema oldSchema = generator.icebergSchema(); + Types.NestedField arrayField = oldSchema.columns().get(1); + Schema newSchema = + new Schema( + oldSchema.columns().get(0), + Types.NestedField.optional( + 10, arrayField.name(), Types.ListType.ofOptional(101, Types.LongType.get()))); + RowData newData = + GenericRowData.of( + StringData.fromString("row_id_value"), new GenericArrayData(new Long[] {1L, 2L, 3L})); + + assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(newData); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java index ce9054ad49..ef8380c216 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Set; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.iceberg.DataFile; @@ -46,6 +47,7 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -81,7 +83,7 @@ public class TestProjectMetaColumn { SimpleDataUtil.createInsert(1, "AAA"), SimpleDataUtil.createInsert(2, "BBB"), SimpleDataUtil.createInsert(3, "CCC")); - writeAndCommit(table, ImmutableList.of(), false, rows); + writeAndCommit(table, ImmutableSet.of(), false, rows); FlinkInputFormat input = FlinkSource.forRowData().tableLoader(TableLoader.fromHadoopTable(location)).buildFormat(); @@ -124,7 +126,7 @@ public class TestProjectMetaColumn { SimpleDataUtil.createInsert(2, "AAA"), SimpleDataUtil.createInsert(2, "BBB")); int eqFieldId = table.schema().findField("data").fieldId(); - writeAndCommit(table, ImmutableList.of(eqFieldId), true, rows); + writeAndCommit(table, ImmutableSet.of(eqFieldId), true, rows); FlinkInputFormat input = FlinkSource.forRowData().tableLoader(TableLoader.fromHadoopTable(location)).buildFormat(); @@ -147,8 +149,7 @@ public class TestProjectMetaColumn { } private void writeAndCommit( - Table table, List<Integer> eqFieldIds, boolean upsert, List<RowData> rows) - throws IOException { + Table table, Set<Integer> eqFieldIds, boolean upsert, List<RowData> rows) throws IOException { TaskWriter<RowData> writer = createTaskWriter(table, eqFieldIds, upsert); try (TaskWriter<RowData> io = writer) { for (RowData row : rows) { @@ -171,7 +172,7 @@ public class TestProjectMetaColumn { } private TaskWriter<RowData> createTaskWriter( - Table table, List<Integer> equalityFieldIds, boolean upsert) { + Table table, Set<Integer> equalityFieldIds, boolean upsert) { TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory( SerializableTable.copyOf(table),