This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 68651a3498 Flink: Dynamic Iceberg Sink: Add sink / core processing logic / benchmarking (#13304) 68651a3498 is described below commit 68651a34987ebd66422fe79ceaff9f987f32d6d6 Author: Maximilian Michels <m...@apache.org> AuthorDate: Tue Jun 17 17:20:15 2025 +0200 Flink: Dynamic Iceberg Sink: Add sink / core processing logic / benchmarking (#13304) --- ...namicRecordSerializerDeserializerBenchmark.java | 138 ++++ .../org/apache/iceberg/flink/FlinkConfParser.java | 7 + .../org/apache/iceberg/flink/FlinkWriteConf.java | 4 + .../org/apache/iceberg/flink/sink/FlinkSink.java | 49 +- .../org/apache/iceberg/flink/sink/IcebergSink.java | 49 +- .../org/apache/iceberg/flink/sink/SinkUtil.java | 59 +- .../flink/sink/dynamic/DynamicCommitter.java | 3 +- .../flink/sink/dynamic/DynamicIcebergSink.java | 406 ++++++++++ .../flink/sink/dynamic/DynamicRecordGenerator.java | 34 + .../flink/sink/dynamic/DynamicRecordProcessor.java | 171 +++++ .../flink/sink/dynamic/HashKeyGenerator.java | 16 +- .../org/apache/iceberg/flink/SimpleDataUtil.java | 22 +- .../org/apache/iceberg/flink/TestFlinkFilters.java | 1 - .../java/org/apache/iceberg/flink/TestHelpers.java | 2 +- .../flink/sink/TestFlinkIcebergSinkBase.java | 5 +- .../flink/sink/dynamic/TestDynamicIcebergSink.java | 831 +++++++++++++++++++++ .../sink/dynamic/TestDynamicIcebergSinkPerf.java | 245 ++++++ 17 files changed, 1933 insertions(+), 109 deletions(-) diff --git a/flink/v2.0/flink/src/jmh/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordSerializerDeserializerBenchmark.java b/flink/v2.0/flink/src/jmh/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordSerializerDeserializerBenchmark.java new file mode 100644 index 0000000000..d7c3a7b32b --- /dev/null +++ b/flink/v2.0/flink/src/jmh/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordSerializerDeserializerBenchmark.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public class DynamicRecordSerializerDeserializerBenchmark { + private static final int SAMPLE_SIZE = 100_000; + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name2", Types.StringType.get()), + Types.NestedField.required(3, "name3", Types.StringType.get()), + Types.NestedField.required(4, "name4", Types.StringType.get()), + Types.NestedField.required(5, "name5", Types.StringType.get()), + Types.NestedField.required(6, "name6", Types.StringType.get()), + Types.NestedField.required(7, "name7", Types.StringType.get()), + Types.NestedField.required(8, "name8", Types.StringType.get()), + Types.NestedField.required(9, "name9", Types.StringType.get())); + + private List<DynamicRecordInternal> rows = Lists.newArrayListWithExpectedSize(SAMPLE_SIZE); + private DynamicRecordInternalType type; + + public static void main(String[] args) throws RunnerException { + Options options = + new OptionsBuilder() + .include(DynamicRecordSerializerDeserializerBenchmark.class.getSimpleName()) + .build(); + new Runner(options).run(); + } + + @Setup + public void setupBenchmark() throws IOException { + List<Record> records = RandomGenericData.generate(SCHEMA, SAMPLE_SIZE, 1L); + this.rows = + records.stream() + .map( + r -> + new DynamicRecordInternal( + "t", + "main", + SCHEMA, + RowDataConverter.convert(SCHEMA, r), + PartitionSpec.unpartitioned(), + 1, + false, + Collections.emptySet())) + .collect(Collectors.toList()); + + File warehouse = Files.createTempFile("perf-bench", null).toFile(); + CatalogLoader catalogLoader = + CatalogLoader.hadoop( + "hadoop", + new Configuration(), + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getPath())); + this.type = new DynamicRecordInternalType(catalogLoader, true, 100); + } + + @Benchmark + @Threads(1) + public void testSerialize(Blackhole blackhole) throws IOException { + TypeSerializer<DynamicRecordInternal> serializer = + type.createSerializer((SerializerConfig) null); + DataOutputSerializer outputView = new DataOutputSerializer(1024); + for (int i = 0; i < SAMPLE_SIZE; ++i) { + serializer.serialize(rows.get(i), outputView); + } + } + + @Benchmark + @Threads(1) + public void testSerializeAndDeserialize(Blackhole blackhole) throws IOException { + TypeSerializer<DynamicRecordInternal> serializer = + type.createSerializer((SerializerConfig) null); + + DataOutputSerializer outputView = new DataOutputSerializer(1024); + for (int i = 0; i < SAMPLE_SIZE; ++i) { + serializer.serialize(rows.get(i), outputView); + serializer.deserialize(new DataInputDeserializer(outputView.getSharedBuffer())); + } + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java index d6d2fd92f4..e0672811cf 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.util.TimeUtils; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @Internal @@ -43,6 +44,12 @@ public class FlinkConfParser { this.readableConfig = readableConfig; } + FlinkConfParser(Map<String, String> options, ReadableConfig readableConfig) { + this.tableProperties = ImmutableMap.of(); + this.options = options; + this.readableConfig = readableConfig; + } + public BooleanConfParser booleanConf() { return new BooleanConfParser(); } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 12ad7989c3..222a1e8104 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -55,6 +55,10 @@ public class FlinkWriteConf { this.confParser = new FlinkConfParser(table, writeOptions, readableConfig); } + public FlinkWriteConf(Map<String, String> writeOptions, ReadableConfig readableConfig) { + this.confParser = new FlinkConfParser(writeOptions, readableConfig); + } + public boolean overwriteMode() { return confParser .booleanConf() diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index c42e4a015b..8da97df037 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -18,12 +18,6 @@ */ package org.apache.iceberg.flink.sink; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import java.io.IOException; @@ -722,51 +716,10 @@ public class FlinkSink { flinkRowType, flinkWriteConf.targetDataFileSize(), format, - writeProperties(initTable, format, flinkWriteConf), + SinkUtil.writeProperties(format, flinkWriteConf, initTable), equalityFieldIds, flinkWriteConf.upsertMode()); return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory); } - - /** - * Based on the {@link FileFormat} overwrites the table level compression properties for the table - * write. - * - * @param table The table to get the table level settings - * @param format The FileFormat to use - * @param conf The write configuration - * @return The properties to use for writing - */ - private static Map<String, String> writeProperties( - Table table, FileFormat format, FlinkWriteConf conf) { - Map<String, String> writeProperties = Maps.newHashMap(table.properties()); - - switch (format) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); - String parquetCompressionLevel = conf.parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - - break; - case AVRO: - writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); - String avroCompressionLevel = conf.avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); - } - - break; - case ORC: - writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); - break; - default: - throw new IllegalArgumentException(String.format("Unknown file format %s", format)); - } - - return writeProperties; - } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 8f33a8e58d..9ab7a7730c 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -18,12 +18,6 @@ */ package org.apache.iceberg.flink.sink; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import java.io.IOException; @@ -649,7 +643,7 @@ public class IcebergSink table, snapshotSummary, uidSuffix, - writeProperties(table, flinkWriteConf.dataFileFormat(), flinkWriteConf), + SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, table), toFlinkRowType(table.schema(), tableSchema), tableSupplier, flinkWriteConf, @@ -728,47 +722,6 @@ public class IcebergSink } } - /** - * Based on the {@link FileFormat} overwrites the table level compression properties for the table - * write. - * - * @param table The table to get the table level settings - * @param format The FileFormat to use - * @param conf The write configuration - * @return The properties to use for writing - */ - private static Map<String, String> writeProperties( - Table table, FileFormat format, FlinkWriteConf conf) { - Map<String, String> writeProperties = Maps.newHashMap(table.properties()); - - switch (format) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); - String parquetCompressionLevel = conf.parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - - break; - case AVRO: - writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); - String avroCompressionLevel = conf.avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); - } - - break; - case ORC: - writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); - break; - default: - throw new IllegalArgumentException(String.format("Unknown file format %s", format)); - } - - return writeProperties; - } - private DataStream<RowData> distributeDataStream(DataStream<RowData> input) { DistributionMode mode = flinkWriteConf.distributionMode(); Schema schema = table.schema(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java index 3f60b45a1f..b3a9ac6ba2 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java @@ -18,17 +18,30 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; + import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class SinkUtil { +@Internal +public class SinkUtil { private static final long INITIAL_CHECKPOINT_ID = -1L; @@ -90,4 +103,48 @@ class SinkUtil { return lastCommittedCheckpointId; } + + /** + * Based on the {@link FileFormat} overwrites the table level compression properties for the table + * write. + * + * @param format The FileFormat to use + * @param conf The write configuration + * @param table The table to get the table level settings + * @return The properties to use for writing + */ + public static Map<String, String> writeProperties( + FileFormat format, FlinkWriteConf conf, @Nullable Table table) { + Map<String, String> writeProperties = Maps.newHashMap(); + if (table != null) { + writeProperties.putAll(table.properties()); + } + + switch (format) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); + String parquetCompressionLevel = conf.parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + + break; + case AVRO: + writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); + String avroCompressionLevel = conf.avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); + } + + break; + case ORC: + writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); + break; + default: + throw new IllegalArgumentException(String.format("Unknown file format %s", format)); + } + + return writeProperties; + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 3e051dc5d5..8f9ce802d1 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -325,7 +325,8 @@ class DynamicCommitter implements Committer<DynamicCommittable> { } } - private void commitOperation( + @VisibleForTesting + void commitOperation( Table table, String branch, SnapshotUpdate<?> operation, diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java new file mode 100644 index 0000000000..8d62e93a30 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.SupportsCommitter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkWriteConf; +import org.apache.iceberg.flink.FlinkWriteOptions; +import org.apache.iceberg.flink.sink.IcebergSink; +import org.apache.iceberg.flink.sink.SinkUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Dynamic version of the IcebergSink which supports: + * + * <ul> + * <li>Writing to any number of tables (No more 1:1 sink/topic relationship). + * <li>Creating and updating tables based on the user-supplied routing. + * <li>Updating the schema and partition spec of tables based on the user-supplied specification. + * </ul> + */ +@Experimental +public class DynamicIcebergSink + implements Sink<DynamicRecordInternal>, + SupportsPreWriteTopology<DynamicRecordInternal>, + SupportsCommitter<DynamicCommittable>, + SupportsPreCommitTopology<DynamicWriteResult, DynamicCommittable>, + SupportsPostCommitTopology<DynamicCommittable> { + + private final CatalogLoader catalogLoader; + private final Map<String, String> snapshotProperties; + private final String uidPrefix; + private final String sinkId; + private final Map<String, String> writeProperties; + private final transient FlinkWriteConf flinkWriteConf; + private final FileFormat dataFileFormat; + private final long targetDataFileSize; + private final boolean overwriteMode; + private final int workerPoolSize; + private final int cacheMaximumSize; + + DynamicIcebergSink( + CatalogLoader catalogLoader, + Map<String, String> snapshotProperties, + String uidPrefix, + Map<String, String> writeProperties, + FlinkWriteConf flinkWriteConf, + int cacheMaximumSize) { + this.catalogLoader = catalogLoader; + this.snapshotProperties = snapshotProperties; + this.uidPrefix = uidPrefix; + this.writeProperties = writeProperties; + this.flinkWriteConf = flinkWriteConf; + this.dataFileFormat = flinkWriteConf.dataFileFormat(); + this.targetDataFileSize = flinkWriteConf.targetDataFileSize(); + this.overwriteMode = flinkWriteConf.overwriteMode(); + this.workerPoolSize = flinkWriteConf.workerPoolSize(); + this.cacheMaximumSize = cacheMaximumSize; + // We generate a random UUID every time when a sink is created. + // This is used to separate files generated by different sinks writing the same table. + // Also used to generate the aggregator operator name + this.sinkId = UUID.randomUUID().toString(); + } + + @Override + public SinkWriter<DynamicRecordInternal> createWriter(WriterInitContext context) { + return new DynamicWriter( + catalogLoader.loadCatalog(), + dataFileFormat, + targetDataFileSize, + writeProperties, + cacheMaximumSize, + new DynamicWriterMetrics(context.metricGroup()), + context.getTaskInfo().getIndexOfThisSubtask(), + context.getTaskInfo().getAttemptNumber()); + } + + @Override + public Committer<DynamicCommittable> createCommitter(CommitterInitContext context) { + DynamicCommitterMetrics metrics = new DynamicCommitterMetrics(context.metricGroup()); + return new DynamicCommitter( + catalogLoader.loadCatalog(), + snapshotProperties, + overwriteMode, + workerPoolSize, + sinkId, + metrics); + } + + @Override + public SimpleVersionedSerializer<DynamicCommittable> getCommittableSerializer() { + return new DynamicCommittableSerializer(); + } + + @Override + public void addPostCommitTopology( + DataStream<CommittableMessage<DynamicCommittable>> committables) {} + + @Override + public DataStream<DynamicRecordInternal> addPreWriteTopology( + DataStream<DynamicRecordInternal> inputDataStream) { + return distributeDataStream(inputDataStream); + } + + @Override + public DataStream<CommittableMessage<DynamicCommittable>> addPreCommitTopology( + DataStream<CommittableMessage<DynamicWriteResult>> writeResults) { + TypeInformation<CommittableMessage<DynamicCommittable>> typeInformation = + CommittableMessageTypeInfo.of(this::getCommittableSerializer); + + return writeResults + .keyBy( + committable -> { + if (committable instanceof CommittableSummary) { + return "__summary"; + } else { + CommittableWithLineage<DynamicWriteResult> result = + (CommittableWithLineage<DynamicWriteResult>) committable; + return result.getCommittable().key().tableName(); + } + }) + .transform( + prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"), + typeInformation, + new DynamicWriteResultAggregator(catalogLoader)) + .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology")); + } + + @Override + public SimpleVersionedSerializer<DynamicWriteResult> getWriteResultSerializer() { + return new DynamicWriteResultSerializer(); + } + + public static class Builder<T> { + private DataStream<T> input; + private DynamicRecordGenerator<T> generator; + private CatalogLoader catalogLoader; + private String uidPrefix = null; + private final Map<String, String> writeOptions = Maps.newHashMap(); + private final Map<String, String> snapshotSummary = Maps.newHashMap(); + private ReadableConfig readableConfig = new Configuration(); + private boolean immediateUpdate = false; + private int cacheMaximumSize = 100; + private long cacheRefreshMs = 1_000; + + Builder() {} + + public Builder<T> forInput(DataStream<T> inputStream) { + this.input = inputStream; + return this; + } + + public Builder<T> generator(DynamicRecordGenerator<T> inputGenerator) { + this.generator = inputGenerator; + return this; + } + + /** + * The catalog loader is used for loading tables in {@link DynamicCommitter} lazily, we need + * this loader because {@link Table} is not serializable and could not just use the loaded table + * from Builder#table in the remote task manager. + * + * @param newCatalogLoader to load iceberg table inside tasks. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder<T> catalogLoader(CatalogLoader newCatalogLoader) { + this.catalogLoader = newCatalogLoader; + return this; + } + + /** + * Set the write properties for IcebergSink. View the supported properties in {@link + * FlinkWriteOptions} + */ + public Builder<T> set(String property, String value) { + writeOptions.put(property, value); + return this; + } + + /** + * Set the write properties for IcebergSink. View the supported properties in {@link + * FlinkWriteOptions} + */ + public Builder<T> setAll(Map<String, String> properties) { + writeOptions.putAll(properties); + return this; + } + + public Builder<T> overwrite(boolean newOverwrite) { + writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite)); + return this; + } + + public Builder<T> flinkConf(ReadableConfig config) { + this.readableConfig = config; + return this; + } + + /** + * Configuring the write parallel number for iceberg stream writer. + * + * @param newWriteParallelism the number of parallel iceberg stream writer. + * @return {@link DynamicIcebergSink.Builder} to connect the iceberg table. + */ + public Builder<T> writeParallelism(int newWriteParallelism) { + writeOptions.put( + FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism)); + return this; + } + + /** + * Set the uid prefix for IcebergSink operators. Note that IcebergSink internally consists of + * multiple operators (like writer, committer, aggregator) Actual operator uid will be appended + * with a suffix like "uidPrefix-writer". + * + * <p>If provided, this prefix is also applied to operator names. + * + * <p>Flink auto generates operator uid if not set explicitly. It is a recommended <a + * href="https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/"> + * best-practice to set uid for all operators</a> before deploying to production. Flink has an + * option to {@code pipeline.auto-generate-uid=false} to disable auto-generation and force + * explicit setting of all operator uid. + * + * <p>Be careful with setting this for an existing job, because now we are changing the operator + * uid from an auto-generated one to this new value. When deploying the change with a + * checkpoint, Flink won't be able to restore the previous IcebergSink operator state (more + * specifically the committer operator state). You need to use {@code --allowNonRestoredState} + * to ignore the previous sink state. During restore IcebergSink state is used to check if last + * commit was actually successful or not. {@code --allowNonRestoredState} can lead to data loss + * if the Iceberg commit failed in the last completed checkpoint. + * + * @param newPrefix prefix for Flink sink operator uid and name + * @return {@link Builder} to connect the iceberg table. + */ + public Builder<T> uidPrefix(String newPrefix) { + this.uidPrefix = newPrefix; + return this; + } + + public Builder<T> snapshotProperties(Map<String, String> properties) { + snapshotSummary.putAll(properties); + return this; + } + + public Builder<T> setSnapshotProperty(String property, String value) { + snapshotSummary.put(property, value); + return this; + } + + public Builder<T> toBranch(String branch) { + writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch); + return this; + } + + public Builder<T> immediateTableUpdate(boolean newImmediateUpdate) { + this.immediateUpdate = newImmediateUpdate; + return this; + } + + /** Maximum size of the caches used in Dynamic Sink for table data and serializers. */ + public Builder<T> cacheMaxSize(int maxSize) { + this.cacheMaximumSize = maxSize; + return this; + } + + /** Maximum interval for cache items renewals. */ + public Builder<T> cacheRefreshMs(long refreshMs) { + this.cacheRefreshMs = refreshMs; + return this; + } + + private String operatorName(String suffix) { + return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; + } + + private DynamicIcebergSink build() { + + Preconditions.checkArgument( + generator != null, "Please use withGenerator() to convert the input DataStream."); + Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be null"); + + FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig); + Map<String, String> writeProperties = + SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, null); + uidPrefix = Optional.ofNullable(uidPrefix).orElse(""); + + return instantiateSink(writeProperties, flinkWriteConf); + } + + @VisibleForTesting + DynamicIcebergSink instantiateSink( + Map<String, String> writeProperties, FlinkWriteConf flinkWriteConf) { + return new DynamicIcebergSink( + catalogLoader, + snapshotSummary, + uidPrefix, + writeProperties, + flinkWriteConf, + cacheMaximumSize); + } + + /** + * Append the iceberg sink operators to write records to iceberg table. + * + * @return {@link DataStreamSink} for sink. + */ + public DataStreamSink<DynamicRecordInternal> append() { + DynamicRecordInternalType type = + new DynamicRecordInternalType(catalogLoader, false, cacheMaximumSize); + DynamicIcebergSink sink = build(); + SingleOutputStreamOperator<DynamicRecordInternal> converted = + input + .process( + new DynamicRecordProcessor<>( + generator, catalogLoader, immediateUpdate, cacheMaximumSize, cacheRefreshMs)) + .uid(prefixIfNotNull(uidPrefix, "-generator")) + .name(operatorName("generator")) + .returns(type); + + DataStreamSink<DynamicRecordInternal> rowDataDataStreamSink = + converted + .getSideOutput( + new OutputTag<>( + DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM, + new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize))) + .keyBy((KeySelector<DynamicRecordInternal, String>) DynamicRecordInternal::tableName) + .map(new DynamicTableUpdateOperator(catalogLoader, cacheMaximumSize, cacheRefreshMs)) + .uid(prefixIfNotNull(uidPrefix, "-updater")) + .name(operatorName("Updater")) + .returns(type) + .union(converted) + .sinkTo(sink) + .uid(prefixIfNotNull(uidPrefix, "-sink")); + if (sink.flinkWriteConf.writeParallelism() != null) { + rowDataDataStreamSink.setParallelism(sink.flinkWriteConf.writeParallelism()); + } + + return rowDataDataStreamSink; + } + } + + DataStream<DynamicRecordInternal> distributeDataStream(DataStream<DynamicRecordInternal> input) { + return input.keyBy(DynamicRecordInternal::writerKey); + } + + private static String prefixIfNotNull(String uidPrefix, String suffix) { + return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; + } + + /** + * Initialize a {@link IcebergSink.Builder} to export the data from input data stream with {@link + * RowData}s into iceberg table. + * + * @param input the source input data stream with {@link RowData}s. + * @return {@link IcebergSink.Builder} to connect the iceberg table. + */ + public static <T> Builder<T> forInput(DataStream<T> input) { + return new Builder<T>().forInput(input); + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java new file mode 100644 index 0000000000..637dd1307d --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.Serializable; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.util.Collector; + +/** A generator to yield {@link DynamicRecord} from the provided input. */ +public interface DynamicRecordGenerator<T> extends Serializable { + default void open(OpenContext openContext) throws Exception {} + + /** + * Takes the user-defined input and yields zero, one, or multiple {@link DynamicRecord}s using the + * {@link Collector}. + */ + void convert(T inputRecord, Collector<DynamicRecord> out) throws Exception; +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java new file mode 100644 index 0000000000..bc569633cc --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.flink.CatalogLoader; + +@Internal +class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal> + implements Collector<DynamicRecord> { + @VisibleForTesting + static final String DYNAMIC_TABLE_UPDATE_STREAM = "dynamic-table-update-stream"; + + private final DynamicRecordGenerator<T> generator; + private final CatalogLoader catalogLoader; + private final boolean immediateUpdate; + private final int cacheMaximumSize; + private final long cacheRefreshMs; + + private transient TableMetadataCache tableCache; + private transient HashKeyGenerator hashKeyGenerator; + private transient TableUpdater updater; + private transient OutputTag<DynamicRecordInternal> updateStream; + private transient Collector<DynamicRecordInternal> collector; + private transient Context context; + + DynamicRecordProcessor( + DynamicRecordGenerator<T> generator, + CatalogLoader catalogLoader, + boolean immediateUpdate, + int cacheMaximumSize, + long cacheRefreshMs) { + this.generator = generator; + this.catalogLoader = catalogLoader; + this.immediateUpdate = immediateUpdate; + this.cacheMaximumSize = cacheMaximumSize; + this.cacheRefreshMs = cacheRefreshMs; + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + Catalog catalog = catalogLoader.loadCatalog(); + this.tableCache = new TableMetadataCache(catalog, cacheMaximumSize, cacheRefreshMs); + this.hashKeyGenerator = + new HashKeyGenerator( + cacheMaximumSize, getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks()); + if (immediateUpdate) { + updater = new TableUpdater(tableCache, catalog); + } else { + updateStream = + new OutputTag<>( + DYNAMIC_TABLE_UPDATE_STREAM, + new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize)) {}; + } + + generator.open(openContext); + } + + @Override + public void processElement(T element, Context ctx, Collector<DynamicRecordInternal> out) + throws Exception { + this.context = ctx; + this.collector = out; + generator.convert(element, this); + } + + @Override + public void collect(DynamicRecord data) { + boolean exists = tableCache.exists(data.tableIdentifier()).f0; + String foundBranch = exists ? tableCache.branch(data.tableIdentifier(), data.branch()) : null; + + Tuple2<Schema, CompareSchemasVisitor.Result> foundSchema = + exists + ? tableCache.schema(data.tableIdentifier(), data.schema()) + : TableMetadataCache.NOT_FOUND; + + PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(), data.spec()) : null; + + if (!exists + || foundBranch == null + || foundSpec == null + || foundSchema.f1 == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { + if (immediateUpdate) { + Tuple3<Schema, CompareSchemasVisitor.Result, PartitionSpec> newData = + updater.update(data.tableIdentifier(), data.branch(), data.schema(), data.spec()); + emit(collector, data, newData.f0, newData.f1, newData.f2); + } else { + int writerKey = + hashKeyGenerator.generateKey( + data, + foundSchema.f0 != null ? foundSchema.f0 : data.schema(), + foundSpec != null ? foundSpec : data.spec(), + data.rowData()); + context.output( + updateStream, + new DynamicRecordInternal( + data.tableIdentifier().toString(), + data.branch(), + data.schema(), + data.rowData(), + data.spec(), + writerKey, + data.upsertMode(), + DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), data.schema()))); + } + } else { + emit(collector, data, foundSchema.f0, foundSchema.f1, foundSpec); + } + } + + private void emit( + Collector<DynamicRecordInternal> out, + DynamicRecord data, + Schema schema, + CompareSchemasVisitor.Result result, + PartitionSpec spec) { + RowData rowData = + result == CompareSchemasVisitor.Result.SAME + ? data.rowData() + : RowDataEvolver.convert(data.rowData(), data.schema(), schema); + int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData); + String tableName = data.tableIdentifier().toString(); + out.collect( + new DynamicRecordInternal( + tableName, + data.branch(), + schema, + rowData, + spec, + writerKey, + data.upsertMode(), + DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), schema))); + } + + @Override + public void close() { + try { + super.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index 6cb1f46089..d0909e0605 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -77,8 +77,7 @@ class HashKeyGenerator { DynamicRecord dynamicRecord, @Nullable Schema tableSchema, @Nullable PartitionSpec tableSpec, - @Nullable RowData overrideRowData) - throws Exception { + @Nullable RowData overrideRowData) { String tableIdent = dynamicRecord.tableIdentifier().toString(); SelectorKey cacheKey = new SelectorKey( @@ -89,8 +88,8 @@ class HashKeyGenerator { dynamicRecord.schema(), dynamicRecord.spec(), dynamicRecord.equalityFields()); - return keySelectorCache - .get( + KeySelector<RowData, Integer> keySelector = + keySelectorCache.get( cacheKey, k -> getKeySelector( @@ -101,8 +100,13 @@ class HashKeyGenerator { dynamicRecord.distributionMode(), DistributionMode.NONE), MoreObjects.firstNonNull( dynamicRecord.equalityFields(), Collections.emptySet()), - dynamicRecord.writeParallelism())) - .getKey(overrideRowData != null ? overrideRowData : dynamicRecord.rowData()); + dynamicRecord.writeParallelism())); + try { + return keySelector.getKey( + overrideRowData != null ? overrideRowData : dynamicRecord.rowData()); + } catch (Exception e) { + throw new RuntimeException(e); + } } private KeySelector<RowData, Integer> getKeySelector( diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 7f53215a5e..0d39a665cf 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -82,12 +82,19 @@ public class SimpleDataUtil { Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())); + public static final Schema SCHEMA2 = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "extra", Types.StringType.get())); + public static final TableSchema FLINK_SCHEMA = TableSchema.builder().field("id", DataTypes.INT()).field("data", DataTypes.STRING()).build(); public static final RowType ROW_TYPE = (RowType) FLINK_SCHEMA.toRowDataType().getLogicalType(); public static final Record RECORD = GenericRecord.create(SCHEMA); + public static final Record RECORD2 = GenericRecord.create(SCHEMA2); public static Table createTable( String path, Map<String, String> properties, boolean partitioned) { @@ -107,6 +114,14 @@ public class SimpleDataUtil { return record; } + public static Record createRecord(Integer id, String data, String extra) { + Record record = RECORD2.copy(); + record.setField("id", id); + record.setField("data", data); + record.setField("extra", extra); + return record; + } + public static RowData createRowData(Integer id, String data) { return GenericRowData.of(id, StringData.fromString(data)); } @@ -224,7 +239,12 @@ public class SimpleDataUtil { for (RowData row : rows) { Integer id = row.isNullAt(0) ? null : row.getInt(0); String data = row.isNullAt(1) ? null : row.getString(1).toString(); - records.add(createRecord(id, data)); + if (row.getArity() == 2) { + records.add(createRecord(id, data)); + } else { + String extra = row.isNullAt(2) ? null : row.getString(2).toString(); + records.add(createRecord(id, data, extra)); + } } return records; } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java index 8188a8bcdc..b47a7920fe 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java @@ -427,7 +427,6 @@ public class TestFlinkFilters { unresolvedCall.getChildren().stream() .map(e -> (ResolvedExpression) e.accept(this)) .collect(Collectors.toList()); - // TODO mxm false? return new CallExpression( false, unresolvedCall.getFunctionIdentifier().orElse(null), diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 960849fb4f..d8d3c5dc24 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -132,7 +132,7 @@ public class TestHelpers { .collect(Collectors.toList()); } - private static List<Row> convertRecordToRow(List<Record> expectedRecords, Schema schema) { + public static List<Row> convertRecordToRow(List<Record> expectedRecords, Schema schema) { List<Row> expected = Lists.newArrayList(); @SuppressWarnings("unchecked") DataStructureConverter<RowData, Row> converter = diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java index 9513cd1e48..de098f826d 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java @@ -21,6 +21,7 @@ package org.apache.iceberg.flink.sink; import static org.apache.iceberg.flink.TestFixtures.DATABASE; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -64,8 +65,8 @@ public class TestFlinkIcebergSinkBase { protected Table table; protected StreamExecutionEnvironment env; - protected BoundedTestSource<Row> createBoundedSource(List<Row> rows) { - return new BoundedTestSource<>(rows.toArray(new Row[0])); + protected <T> BoundedTestSource<T> createBoundedSource(List<T> rows) { + return new BoundedTestSource<>(Collections.singletonList(rows)); } protected List<Row> createRows(String prefix) { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java new file mode 100644 index 0000000000..f94990cc15 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -0,0 +1,831 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.io.IOException; +import java.io.Serializable; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.legacy.api.TableSchema; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.flink.util.ExceptionUtils; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotUpdate; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.FlinkWriteConf; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.flink.sink.CommitSummary; +import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; +import org.apache.iceberg.inmemory.InMemoryInputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase { + + private static long seed; + + @BeforeEach + void before() { + env = + StreamExecutionEnvironment.getExecutionEnvironment( + MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(2); + seed = 0; + } + + private static class DynamicIcebergDataImpl implements Serializable { + Row rowProvided; + Row rowExpected; + Schema schemaProvided; + Schema schemaExpected; + String tableName; + String branch; + PartitionSpec partitionSpec; + boolean upsertMode; + Set<String> equalityFields; + + private DynamicIcebergDataImpl( + Schema schemaProvided, String tableName, String branch, PartitionSpec partitionSpec) { + this( + schemaProvided, + schemaProvided, + tableName, + branch, + partitionSpec, + false, + Collections.emptySet(), + false); + } + + private DynamicIcebergDataImpl( + Schema schemaProvided, + Schema schemaExpected, + String tableName, + String branch, + PartitionSpec partitionSpec) { + this( + schemaProvided, + schemaExpected, + tableName, + branch, + partitionSpec, + false, + Collections.emptySet(), + false); + } + + private DynamicIcebergDataImpl( + Schema schemaProvided, + String tableName, + String branch, + PartitionSpec partitionSpec, + boolean upsertMode, + Set<String> equalityFields, + boolean isDuplicate) { + this( + schemaProvided, + schemaProvided, + tableName, + branch, + partitionSpec, + upsertMode, + equalityFields, + isDuplicate); + } + + private DynamicIcebergDataImpl( + Schema schemaProvided, + Schema schemaExpected, + String tableName, + String branch, + PartitionSpec partitionSpec, + boolean upsertMode, + Set<String> equalityFields, + boolean isDuplicate) { + this.rowProvided = randomRow(schemaProvided, isDuplicate ? seed : ++seed); + this.rowExpected = isDuplicate ? null : rowProvided; + this.schemaProvided = schemaProvided; + this.schemaExpected = schemaExpected; + this.tableName = tableName; + this.branch = branch; + this.partitionSpec = partitionSpec; + this.upsertMode = upsertMode; + this.equalityFields = equalityFields; + } + } + + private static class Generator implements DynamicRecordGenerator<DynamicIcebergDataImpl> { + + @Override + public void convert(DynamicIcebergDataImpl row, Collector<DynamicRecord> out) { + TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE, row.tableName); + String branch = row.branch; + Schema schema = row.schemaProvided; + PartitionSpec spec = row.partitionSpec; + DynamicRecord dynamicRecord = + new DynamicRecord( + tableIdentifier, + branch, + schema, + converter(schema).toInternal(row.rowProvided), + spec, + spec.isPartitioned() ? DistributionMode.HASH : DistributionMode.NONE, + 10); + dynamicRecord.setUpsertMode(row.upsertMode); + dynamicRecord.setEqualityFields(row.equalityFields); + out.collect(dynamicRecord); + } + } + + private static DataFormatConverters.RowConverter converter(Schema schema) { + RowType rowType = FlinkSchemaUtil.convert(schema); + TableSchema tableSchema = FlinkSchemaUtil.toSchema(rowType); + return new DataFormatConverters.RowConverter(tableSchema.getFieldDataTypes()); + } + + @Test + void testWrite() throws Exception { + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned())); + + runTest(rows); + } + + @Test + void testWritePartitioned() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); + + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec)); + + runTest(rows); + } + + @Test + void testWritePartitionedAdjustSchemaIdsInSpec() throws Exception { + Schema schema = + new Schema( + // Use zero-based schema field ids + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("id", 10).build(); + Schema schema2 = + new Schema( + // Use zero-based schema field ids + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get()), + Types.NestedField.optional(2, "extra", Types.StringType.get())); + PartitionSpec spec2 = PartitionSpec.builderFor(schema2).bucket("extra", 23).build(); + + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl(schema, "t1", "main", spec), + new DynamicIcebergDataImpl(schema, "t1", "main", spec), + new DynamicIcebergDataImpl(schema, "t1", "main", spec), + new DynamicIcebergDataImpl(schema2, "t1", "main", spec2), + new DynamicIcebergDataImpl(schema2, "t1", "main", spec2)); + + runTest(rows); + } + + @Test + void testSchemaEvolutionFieldOrderChanges() throws Exception { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + Schema expectedSchema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + Schema schema2 = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "extra", Types.StringType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + Schema expectedSchema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(3, "extra", Types.StringType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + schema, expectedSchema, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + schema, expectedSchema, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + schema, expectedSchema, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + schema2, expectedSchema2, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + schema2, expectedSchema2, "t1", "main", PartitionSpec.unpartitioned())); + + for (DynamicIcebergDataImpl row : rows) { + if (row.schemaExpected == expectedSchema) { + // We manually adjust the expected Row to match the second expected schema + row.rowExpected = Row.of(row.rowProvided.getField(0), null, row.rowProvided.getField(1)); + } + } + + runTest(rows); + } + + @Test + void testMultipleTables() throws Exception { + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t2", "main", PartitionSpec.unpartitioned())); + + runTest(rows); + } + + @Test + void testMultipleTablesPartitioned() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); + + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t2", "main", spec)); + + runTest(rows); + } + + @Test + void testSchemaEvolutionAddField() throws Exception { + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA2, "t1", "main", PartitionSpec.unpartitioned())); + + runTest(rows, this.env, 1); + } + + @Test + void testRowEvolutionNullMissingOptionalField() throws Exception { + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA2, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned())); + + runTest(rows, this.env, 1); + } + + @Test + void testSchemaEvolutionNonBackwardsCompatible() throws Exception { + Schema backwardsIncompatibleSchema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + // Required column is missing in this schema + Schema erroringSchema = + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + backwardsIncompatibleSchema, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + erroringSchema, "t1", "main", PartitionSpec.unpartitioned())); + + try { + runTest(rows, StreamExecutionEnvironment.getExecutionEnvironment(), 1); + fail(); + } catch (JobExecutionException e) { + assertThat( + ExceptionUtils.findThrowable( + e, + t -> + t.getMessage() + .contains( + "Field 2 in target schema ROW<`id` INT NOT NULL, `data` STRING NOT NULL> is non-nullable but does not exist in source schema."))) + .isNotEmpty(); + } + } + + @Test + void testPartitionSpecEvolution() throws Exception { + PartitionSpec spec1 = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); + PartitionSpec spec2 = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 5).identity("data").build(); + + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec2), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec1), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec2), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec2)); + + runTest(rows); + } + + @Test + void testMultipleBranches() throws Exception { + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "branch1", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned())); + + runTest(rows); + } + + @Test + void testWriteMultipleTablesWithSchemaChanges() throws Exception { + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t2", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA2, "t2", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t2", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA2, "t2", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned())); + + runTest(rows); + } + + @Test + void testUpsert() throws Exception { + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + // Insert one rows + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + false), + // Remaining rows are duplicates + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + true), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + true), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + true)); + + executeDynamicSink(rows, env, true, 1, null); + + try (CloseableIterable<Record> iterable = + IcebergGenerics.read( + CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1"))) + .build()) { + List<Record> records = Lists.newArrayList(); + for (Record record : iterable) { + records.add(record); + } + + assertThat(records.size()).isEqualTo(1); + Record actual = records.get(0); + DynamicIcebergDataImpl input = rows.get(0); + assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0)); + assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1)); + // There is an additional _pos field which gets added + } + } + + @Test + void testCommitFailedBeforeOrAfterCommit() throws Exception { + // Configure a Restart strategy to allow recovery + Configuration configuration = new Configuration(); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 2); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ZERO); + env.configure(configuration); + + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t2", "main", PartitionSpec.unpartitioned())); + + FailBeforeAndAfterCommit.reset(); + final CommitHook commitHook = new FailBeforeAndAfterCommit(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse(); + + executeDynamicSink(rows, env, true, 1, commitHook); + + assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); + } + + @Test + void testCommitConcurrency() throws Exception { + + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t2", "main", PartitionSpec.unpartitioned())); + + TableIdentifier tableIdentifier = TableIdentifier.of("default", "t1"); + Catalog catalog = CATALOG_EXTENSION.catalog(); + catalog.createTable(tableIdentifier, new Schema()); + + final CommitHook commitHook = new AppendRightBeforeCommit(tableIdentifier.toString()); + + executeDynamicSink(rows, env, true, 1, commitHook); + } + + interface CommitHook extends Serializable { + void beforeCommit(); + + void duringCommit(); + + void afterCommit(); + } + + private static class FailBeforeAndAfterCommit implements CommitHook { + + static boolean failedBeforeCommit; + static boolean failedAfterCommit; + + @Override + public void beforeCommit() { + if (!failedBeforeCommit) { + failedBeforeCommit = true; + throw new RuntimeException("Failing before commit"); + } + } + + @Override + public void duringCommit() {} + + @Override + public void afterCommit() { + if (!failedAfterCommit) { + failedAfterCommit = true; + throw new RuntimeException("Failing before commit"); + } + } + + static void reset() { + failedBeforeCommit = false; + failedAfterCommit = false; + } + } + + private static class AppendRightBeforeCommit implements CommitHook { + + final String tableIdentifier; + + private AppendRightBeforeCommit(String tableIdentifier) { + this.tableIdentifier = tableIdentifier; + } + + @Override + public void beforeCommit() {} + + @Override + public void duringCommit() { + // Create a conflict + Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(tableIdentifier)); + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withInputFile(new InMemoryInputFile(new byte[] {1, 2, 3})) + .withFormat(FileFormat.AVRO) + .withRecordCount(3) + .build(); + table.newAppend().appendFile(dataFile).commit(); + } + + @Override + public void afterCommit() {} + } + + private void runTest(List<DynamicIcebergDataImpl> dynamicData) throws Exception { + runTest(dynamicData, this.env, 2); + } + + private void runTest( + List<DynamicIcebergDataImpl> dynamicData, StreamExecutionEnvironment env, int parallelism) + throws Exception { + runTest(dynamicData, env, true, parallelism); + runTest(dynamicData, env, false, parallelism); + } + + private void runTest( + List<DynamicIcebergDataImpl> dynamicData, + StreamExecutionEnvironment env, + boolean immediateUpdate, + int parallelism) + throws Exception { + executeDynamicSink(dynamicData, env, immediateUpdate, parallelism, null); + verifyResults(dynamicData); + } + + private void executeDynamicSink( + List<DynamicIcebergDataImpl> dynamicData, + StreamExecutionEnvironment env, + boolean immediateUpdate, + int parallelism, + @Nullable CommitHook commitHook) + throws Exception { + DataStream<DynamicIcebergDataImpl> dataStream = + env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new TypeHint<>() {})); + env.setParallelism(parallelism); + + if (commitHook != null) { + new CommitHookEnabledDynamicIcebergSink(commitHook) + .forInput(dataStream) + .generator(new Generator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .writeParallelism(parallelism) + .immediateTableUpdate(immediateUpdate) + .setSnapshotProperty("commit.retry.num-retries", "0") + .append(); + } else { + DynamicIcebergSink.forInput(dataStream) + .generator(new Generator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .writeParallelism(parallelism) + .immediateTableUpdate(immediateUpdate) + .append(); + } + + // Write the data + env.execute("Test Iceberg DataStream"); + } + + static class CommitHookEnabledDynamicIcebergSink<T> extends DynamicIcebergSink.Builder<T> { + private final CommitHook commitHook; + + CommitHookEnabledDynamicIcebergSink(CommitHook commitHook) { + this.commitHook = commitHook; + } + + @Override + DynamicIcebergSink instantiateSink( + Map<String, String> writeProperties, FlinkWriteConf flinkWriteConf) { + return new CommitHookDynamicIcebergSink( + commitHook, + CATALOG_EXTENSION.catalogLoader(), + Collections.emptyMap(), + "uidPrefix", + writeProperties, + flinkWriteConf, + 100); + } + } + + static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { + + private final CommitHook commitHook; + + CommitHookDynamicIcebergSink( + CommitHook commitHook, + CatalogLoader catalogLoader, + Map<String, String> snapshotProperties, + String uidPrefix, + Map<String, String> writeProperties, + FlinkWriteConf flinkWriteConf, + int cacheMaximumSize) { + super( + catalogLoader, + snapshotProperties, + uidPrefix, + writeProperties, + flinkWriteConf, + cacheMaximumSize); + this.commitHook = commitHook; + } + + @Override + public Committer<DynamicCommittable> createCommitter(CommitterInitContext context) { + // return super.createCommitter(context); + return new CommitHookEnabledDynamicCommitter( + commitHook, + CATALOG_EXTENSION.catalogLoader().loadCatalog(), + Collections.emptyMap(), + false, + 10, + "sinkId", + new DynamicCommitterMetrics(context.metricGroup())); + } + } + + static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { + private final CommitHook commitHook; + + CommitHookEnabledDynamicCommitter( + CommitHook commitHook, + Catalog catalog, + Map<String, String> snapshotProperties, + boolean replacePartitions, + int workerPoolSize, + String sinkId, + DynamicCommitterMetrics committerMetrics) { + super( + catalog, snapshotProperties, replacePartitions, workerPoolSize, sinkId, committerMetrics); + this.commitHook = commitHook; + } + + @Override + public void commit(Collection<CommitRequest<DynamicCommittable>> commitRequests) + throws IOException, InterruptedException { + commitHook.beforeCommit(); + super.commit(commitRequests); + commitHook.afterCommit(); + } + + @Override + void commitOperation( + Table table, + String branch, + SnapshotUpdate<?> operation, + CommitSummary summary, + String description, + String newFlinkJobId, + String operatorId, + long checkpointId) { + commitHook.duringCommit(); + super.commitOperation( + table, branch, operation, summary, description, newFlinkJobId, operatorId, checkpointId); + } + } + + private void verifyResults(List<DynamicIcebergDataImpl> dynamicData) throws IOException { + // Calculate the expected result + Map<Tuple2<String, String>, List<RowData>> expectedData = Maps.newHashMap(); + Map<String, Schema> expectedSchema = Maps.newHashMap(); + dynamicData.forEach( + r -> { + Schema oldSchema = expectedSchema.get(r.tableName); + if (oldSchema == null || oldSchema.columns().size() < r.schemaProvided.columns().size()) { + expectedSchema.put(r.tableName, r.schemaExpected); + } + }); + + dynamicData.forEach( + r -> { + List<RowData> data = + expectedData.computeIfAbsent( + Tuple2.of(r.tableName, r.branch), unused -> Lists.newArrayList()); + data.addAll( + convertToRowData(expectedSchema.get(r.tableName), ImmutableList.of(r.rowExpected))); + }); + + // Check the expected result + int count = dynamicData.size(); + for (Map.Entry<Tuple2<String, String>, List<RowData>> e : expectedData.entrySet()) { + SimpleDataUtil.assertTableRows( + CATALOG_EXTENSION + .catalogLoader() + .loadCatalog() + .loadTable(TableIdentifier.of(DATABASE, e.getKey().f0)), + e.getValue(), + e.getKey().f1); + count -= e.getValue().size(); + } + + // Found every record + assertThat(count).isZero(); + } + + private List<RowData> convertToRowData(Schema schema, List<Row> rows) { + DataFormatConverters.RowConverter converter = converter(schema); + return rows.stream() + .map( + r -> { + Row updateRow = r; + // We need conversion to generate the missing columns + if (r.getArity() != schema.columns().size()) { + updateRow = new Row(schema.columns().size()); + for (int i = 0; i < r.getArity(); ++i) { + updateRow.setField(i, r.getField(i)); + } + } + return converter.toInternal(updateRow); + }) + .collect(Collectors.toList()); + } + + private static Row randomRow(Schema schema, long seedOverride) { + return TestHelpers.convertRecordToRow( + RandomGenericData.generate(schema, 1, seedOverride), schema) + .get(0); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java new file mode 100644 index 0000000000..6e943efb62 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.apache.iceberg.flink.TestFixtures.TABLE; +import static org.apache.iceberg.flink.sink.dynamic.DynamicCommitter.MAX_CONTINUOUS_EMPTY_COMMITS; + +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.util.Collector; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.sink.IcebergSink; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Performance test class to compare {@link DynamicIcebergSink} against {@link IcebergSink} to + * measure and compare their throughput. + * + * <p>The test dynamically generates input for multiple tables, then writes to these tables. For the + * DynamicSink, a single sink is used to write all tables. For the IcebergSink, one sink is used per + * table. The test logs the written record counts and elapsed time based on the Iceberg snapshot + * metadata. + * + * <h2>Usage</h2> + * + * <ul> + * <li>Set the SAMPLE_SIZE, RECORD_SIZE, and TABLE_NUM. + * <li>Run the unit tests and review logs for performance results. + * </ul> + * + * <p>Note: This test is disabled by default and should be enabled manually when performance testing + * is needed. It is not intended as a standard unit test. + */ +@Disabled("Please enable manually for performance testing.") +class TestDynamicIcebergSinkPerf { + private static final Logger LOG = LoggerFactory.getLogger(TestDynamicIcebergSinkPerf.class); + + @RegisterExtension + protected static final HadoopCatalogExtension CATALOG_EXTENSION = + new HadoopCatalogExtension(DATABASE, TABLE); + + private static final int SAMPLE_SIZE = 50_000; + private static final int RECORD_SIZE = 5_000_000; + private static final int TABLE_NUM = 3; + private static final int PARALLELISM = 2; + private static final int WRITE_PARALLELISM = 2; + private static final TableIdentifier[] IDENTIFIERS = new TableIdentifier[TABLE_NUM]; + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name2", Types.StringType.get()), + Types.NestedField.required(3, "name3", Types.StringType.get()), + Types.NestedField.required(4, "name4", Types.StringType.get()), + Types.NestedField.required(5, "name5", Types.StringType.get()), + Types.NestedField.required(6, "name6", Types.StringType.get()), + Types.NestedField.required(7, "name7", Types.StringType.get()), + Types.NestedField.required(8, "name8", Types.StringType.get()), + Types.NestedField.required(9, "name9", Types.StringType.get())); + private static final List<Integer> RANGE = + IntStream.range(0, RECORD_SIZE).boxed().collect(Collectors.toList()); + + private static List<DynamicRecord> rows; + private StreamExecutionEnvironment env; + + @BeforeEach + void before() { + for (int i = 0; i < TABLE_NUM; ++i) { + // So the table name hash difference is bigger than 1 + IDENTIFIERS[i] = TableIdentifier.of(DATABASE, TABLE + "_" + (i * 13)); + + Table table = + CATALOG_EXTENSION + .catalog() + .createTable( + IDENTIFIERS[i], + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of(MAX_CONTINUOUS_EMPTY_COMMITS, "100000")); + + table.manageSnapshots().createBranch("main").commit(); + } + + List<Record> records = RandomGenericData.generate(SCHEMA, SAMPLE_SIZE, 1L); + rows = Lists.newArrayListWithCapacity(records.size()); + for (int i = 0; i < records.size(); ++i) { + rows.add( + new DynamicRecord( + IDENTIFIERS[i % TABLE_NUM], + "main", + SCHEMA, + RowDataConverter.convert(SCHEMA, records.get(i)), + PartitionSpec.unpartitioned(), + DistributionMode.NONE, + WRITE_PARALLELISM)); + } + + Configuration configuration = MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG; + configuration.setString("rest.flamegraph.enabled", "true"); + env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration) + .enableCheckpointing(100) + .setParallelism(PARALLELISM) + .setMaxParallelism(PARALLELISM); + env.getConfig().enableObjectReuse(); + } + + @AfterEach + void after() { + for (TableIdentifier identifier : IDENTIFIERS) { + CATALOG_EXTENSION.catalog().dropTable(identifier); + } + } + + private static class IdBasedGenerator implements DynamicRecordGenerator<Integer> { + + @Override + public void convert(Integer id, Collector<DynamicRecord> out) { + out.collect(rows.get(id % SAMPLE_SIZE)); + } + } + + @Test + void testDynamicSink() throws Exception { + // So we make sure that the writer threads are the same for the 2 tests + env.setMaxParallelism(PARALLELISM * TABLE_NUM * 2); + env.setParallelism(PARALLELISM * TABLE_NUM * 2); + runTest( + s -> { + DynamicIcebergSink.forInput(s) + .generator(new IdBasedGenerator()) + .immediateTableUpdate(true) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .append(); + }); + } + + @Test + void testIcebergSink() throws Exception { + runTest( + s -> { + for (int i = 0; i < IDENTIFIERS.length; ++i) { + TableLoader tableLoader = + TableLoader.fromCatalog(CATALOG_EXTENSION.catalogLoader(), IDENTIFIERS[i]); + final int finalInt = i; + IcebergSink.forRowData( + s.flatMap( + (FlatMapFunction<Integer, RowData>) + (input, collector) -> { + if (input % TABLE_NUM == finalInt) { + collector.collect(rows.get(input % SAMPLE_SIZE).rowData()); + } + }) + .returns(InternalTypeInfo.of(FlinkSchemaUtil.convert(SCHEMA))) + .rebalance()) + .tableLoader(tableLoader) + .uidSuffix("Uid" + i) + .writeParallelism(WRITE_PARALLELISM) + .append(); + } + }); + } + + private void runTest(Consumer<DataStream<Integer>> sink) throws Exception { + DataStream<Integer> dataStream = + env.addSource( + new BoundedTestSource<>( + ImmutableList.of( + RANGE, RANGE, RANGE, RANGE, RANGE, RANGE, RANGE, RANGE, RANGE, RANGE), + true), + TypeInformation.of(Integer.class)); + + sink.accept(dataStream); + + long before = System.currentTimeMillis(); + env.execute(); + + for (TableIdentifier identifier : IDENTIFIERS) { + Table table = CATALOG_EXTENSION.catalog().loadTable(identifier); + for (Snapshot snapshot : table.snapshots()) { + long records = 0; + for (DataFile dataFile : snapshot.addedDataFiles(table.io())) { + records += dataFile.recordCount(); + } + + LOG.info( + "TEST RESULT: For table {} snapshot {} written {} records in {} ms", + identifier, + snapshot.snapshotId(), + records, + snapshot.timestampMillis() - before); + before = snapshot.timestampMillis(); + } + } + } +}