This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 3615fae615 Flink:Backport DynamicSink support dvs to Flink 2.0 and
1.20 (#14623)
3615fae615 is described below
commit 3615fae6158e76e1ccf61c03ca5e91f5cfc538dd
Author: GuoYu <[email protected]>
AuthorDate: Wed Nov 19 20:47:45 2025 +0800
Flink:Backport DynamicSink support dvs to Flink 2.0 and 1.20 (#14623)
---
.../flink/sink/dynamic/DynamicCommitter.java | 20 +-
.../sink/dynamic/DynamicWriteResultAggregator.java | 36 ++--
.../iceberg/flink/sink/dynamic/DynamicWriter.java | 5 -
.../flink/sink/dynamic/TestDynamicCommitter.java | 137 ++++++++++++++
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 205 +++++++++++++++++++++
.../flink/sink/dynamic/DynamicCommitter.java | 20 +-
.../sink/dynamic/DynamicWriteResultAggregator.java | 36 ++--
.../iceberg/flink/sink/dynamic/DynamicWriter.java | 5 -
.../flink/sink/dynamic/TestDynamicCommitter.java | 137 ++++++++++++++
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 205 +++++++++++++++++++++
10 files changed, 766 insertions(+), 40 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index d60b42b95d..61b20cb27b 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
@@ -39,6 +41,7 @@ import org.apache.iceberg.SnapshotAncestryValidator;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
@@ -52,6 +55,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
@@ -214,9 +218,23 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
DeltaManifests deltaManifests =
SimpleVersionedSerialization.readVersionAndDeSerialize(
DeltaManifestsSerializer.INSTANCE,
committable.getCommittable().manifest());
+
+ WriteResult writeResult =
+ FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(),
table.specs());
+ if (TableUtil.formatVersion(table) > 2) {
+ for (DeleteFile deleteFile : writeResult.deleteFiles()) {
+ if (deleteFile.content() == FileContent.POSITION_DELETES) {
+ Preconditions.checkArgument(
+ ContentFileUtil.isDV(deleteFile),
+ "Can't add position delete file to the %s table.
Concurrent table upgrade to V3 is not supported.",
+ table.name());
+ }
+ }
+ }
+
pendingResults
.computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
- .add(FlinkManifestUtil.readCompletedFiles(deltaManifests,
table.io(), table.specs()));
+ .add(writeResult);
manifests.addAll(deltaManifests.manifests());
}
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index 77bd2a0f97..927491fa89 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
@@ -32,6 +33,7 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
@@ -61,7 +63,8 @@ class DynamicWriteResultAggregator
private final int cacheMaximumSize;
private transient Map<WriteTarget, Collection<DynamicWriteResult>> results;
private transient Map<String, Map<Integer, PartitionSpec>> specs;
- private transient Map<String, ManifestOutputFileFactory> outputFileFactories;
+ private transient Map<String, Tuple2<ManifestOutputFileFactory, Integer>>
+ outputFileFactoriesAndFormatVersions;
private transient String flinkJobId;
private transient String operatorId;
private transient int subTaskId;
@@ -81,7 +84,7 @@ class DynamicWriteResultAggregator
this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
this.results = Maps.newHashMap();
this.specs = new LRUCache<>(cacheMaximumSize);
- this.outputFileFactories = new LRUCache<>(cacheMaximumSize);
+ this.outputFileFactoriesAndFormatVersions = new
LRUCache<>(cacheMaximumSize);
this.catalog = catalogLoader.loadCatalog();
}
@@ -137,12 +140,14 @@ class DynamicWriteResultAggregator
writeResults.forEach(w -> builder.add(w.writeResult()));
WriteResult result = builder.build();
+ Tuple2<ManifestOutputFileFactory, Integer> outputFileFactoryAndVersion =
+ outputFileFactoryAndFormatVersion(key.tableName());
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
result,
- () -> outputFileFactory(key.tableName()).create(checkpointId),
+ () -> outputFileFactoryAndVersion.f0.create(checkpointId),
spec(key.tableName(), key.specId()),
- 2);
+ outputFileFactoryAndVersion.f1);
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
@@ -160,8 +165,9 @@ class DynamicWriteResultAggregator
}
}
- private ManifestOutputFileFactory outputFileFactory(String tableName) {
- return outputFileFactories.computeIfAbsent(
+ private Tuple2<ManifestOutputFileFactory, Integer>
outputFileFactoryAndFormatVersion(
+ String tableName) {
+ return outputFileFactoriesAndFormatVersions.computeIfAbsent(
tableName,
unused -> {
Table table = catalog.loadTable(TableIdentifier.parse(tableName));
@@ -169,14 +175,16 @@ class DynamicWriteResultAggregator
// Make sure to append an identifier to avoid file clashes in case
the factory was to get
// re-created during a checkpoint, i.e. due to cache eviction.
String fileSuffix = UUID.randomUUID().toString();
- return FlinkManifestUtil.createOutputFileFactory(
- () -> table,
- table.properties(),
- flinkJobId,
- operatorId,
- subTaskId,
- attemptId,
- fileSuffix);
+ ManifestOutputFileFactory outputFileFactory =
+ FlinkManifestUtil.createOutputFileFactory(
+ () -> table,
+ table.properties(),
+ flinkJobId,
+ operatorId,
+ subTaskId,
+ attemptId,
+ fileSuffix);
+ return Tuple2.of(outputFileFactory, TableUtil.formatVersion(table));
});
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index 5ed9da8623..c2a3032858 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -118,10 +117,6 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when
configuring to use UPSERT data.");
- Preconditions.checkArgument(
- !(TableUtil.formatVersion(table) > 2),
- "Dynamic Sink writer does not support upsert
mode in tables (V3+)");
-
if (!table.spec().isUnpartitioned()) {
for (PartitionField partitionField :
table.spec().fields()) {
Preconditions.checkState(
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 3dbec22794..1497458e60 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -44,6 +44,9 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.HadoopCatalogExtension;
@@ -338,6 +341,140 @@ class TestDynamicCommitter {
.build());
}
+ @Test
+ void testCommitDeleteInDifferentFormatVersion() throws Exception {
+ Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table1.snapshots()).isEmpty();
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+ DynamicCommitter dynamicCommitter =
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ WriteTarget writeTarget =
+ new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId = 10;
+
+ byte[] deltaManifest =
+ aggregator.writeToManifest(
+ writeTarget,
+ Sets.newHashSet(
+ new DynamicWriteResult(
+ writeTarget,
+ WriteResult.builder()
+ .addDataFiles(DATA_FILE)
+ .addDeleteFiles(DELETE_FILE)
+ .build())),
+ checkpointId);
+
+ CommitRequest<DynamicCommittable> commitRequest =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget, deltaManifest, jobId,
operatorId, checkpointId));
+
+ // Upgrade the table version
+ UpdateProperties updateApi = table1.updateProperties();
+ updateApi.set(
+ TableProperties.FORMAT_VERSION,
String.valueOf(TableUtil.formatVersion(table1) + 1));
+ updateApi.commit();
+
+ assertThatThrownBy(() ->
dynamicCommitter.commit(Sets.newHashSet(commitRequest)))
+ .hasMessage(
+ "Can't add position delete file to the %s table. Concurrent table
upgrade to V3 is not supported.",
+ table1.name())
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void testCommitOnlyDataInDifferentFormatVersion() throws Exception {
+ Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table1.snapshots()).isEmpty();
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+ DynamicCommitter dynamicCommitter =
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ WriteTarget writeTarget =
+ new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId = 10;
+
+ byte[] deltaManifest =
+ aggregator.writeToManifest(
+ writeTarget,
+ Sets.newHashSet(
+ new DynamicWriteResult(
+ writeTarget,
WriteResult.builder().addDataFiles(DATA_FILE).build())),
+ checkpointId);
+
+ CommitRequest<DynamicCommittable> commitRequest =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget, deltaManifest, jobId,
operatorId, checkpointId));
+
+ dynamicCommitter.commit(Sets.newHashSet(commitRequest));
+
+ // Upgrade the table version
+ UpdateProperties updateApi = table1.updateProperties();
+ updateApi.set(
+ TableProperties.FORMAT_VERSION,
String.valueOf(TableUtil.formatVersion(table1) + 1));
+ updateApi.commit();
+
+ table1.refresh();
+ assertThat(table1.snapshots()).hasSize(1);
+ Snapshot first = Iterables.getFirst(table1.snapshots(), null);
+ assertThat(first.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+ }
+
@Test
void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 777082dcb9..b660d8e285 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -59,6 +59,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
@@ -76,6 +77,7 @@ import
org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.FailBeforeAndA
import org.apache.iceberg.inmemory.InMemoryInputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -527,6 +529,209 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
}
}
+ @Test
+ void testUpsertV3() throws Exception {
+ ImmutableMap<String, String> properties =
ImmutableMap.of(TableProperties.FORMAT_VERSION, "3");
+ CATALOG_EXTENSION
+ .catalog()
+ .createTable(
+ TableIdentifier.of(DATABASE, "t1"),
+ SimpleDataUtil.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ null,
+ properties);
+
+ List<DynamicIcebergDataImpl> rows =
+ Lists.newArrayList(
+ // Insert one rows
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ false),
+ // Remaining rows are duplicates
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true));
+
+ executeDynamicSink(rows, env, true, 1, null);
+
+ try (CloseableIterable<Record> iterable =
+ IcebergGenerics.read(
+
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1")))
+ .build()) {
+ List<Record> records = Lists.newArrayList();
+ for (Record record : iterable) {
+ records.add(record);
+ }
+
+ assertThat(records).hasSize(1);
+ Record actual = records.get(0);
+ DynamicIcebergDataImpl input = rows.get(0);
+ assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+ assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+ }
+ }
+
+ @Test
+ void testMultiFormatVersion() throws Exception {
+ ImmutableMap<String, String> properties =
ImmutableMap.of(TableProperties.FORMAT_VERSION, "3");
+ CATALOG_EXTENSION
+ .catalog()
+ .createTable(
+ TableIdentifier.of(DATABASE, "t1"),
+ SimpleDataUtil.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ null,
+ properties);
+
+ ImmutableMap<String, String> properties1 =
ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+ CATALOG_EXTENSION
+ .catalog()
+ .createTable(
+ TableIdentifier.of(DATABASE, "t2"),
+ SimpleDataUtil.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ null,
+ properties1);
+
+ List<DynamicIcebergDataImpl> rowsForTable1 =
+ Lists.newArrayList(
+ // Insert one rows
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ false),
+ // Remaining rows are duplicates
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true));
+
+ List<DynamicIcebergDataImpl> rowsForTable2 =
+ Lists.newArrayList(
+ // Insert one rows
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t2",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ false),
+ // Remaining rows are duplicates
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t2",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t2",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t2",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true));
+
+ List<DynamicIcebergDataImpl> rows = Lists.newArrayList();
+ rows.addAll(rowsForTable1);
+ rows.addAll(rowsForTable2);
+
+ executeDynamicSink(rows, env, true, 1, null);
+
+ try (CloseableIterable<Record> iterable =
+ IcebergGenerics.read(
+
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1")))
+ .build()) {
+ List<Record> records = Lists.newArrayList();
+ for (Record record : iterable) {
+ records.add(record);
+ }
+
+ assertThat(records).hasSize(1);
+ Record actual = records.get(0);
+ DynamicIcebergDataImpl input = rowsForTable1.get(0);
+ assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+ assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+ }
+
+ try (CloseableIterable<Record> iterable =
+ IcebergGenerics.read(
+
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t2")))
+ .build()) {
+ List<Record> records = Lists.newArrayList();
+ for (Record record : iterable) {
+ records.add(record);
+ }
+
+ assertThat(records).hasSize(1);
+ Record actual = records.get(0);
+ DynamicIcebergDataImpl input = rowsForTable2.get(0);
+ assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+ assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+ }
+ }
+
@Test
void testCommitFailedBeforeOrAfterCommit() throws Exception {
// Configure a Restart strategy to allow recovery
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index 54d506b663..d5774b66af 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
@@ -38,6 +40,7 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.CommitSummary;
@@ -50,6 +53,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
@@ -211,9 +215,23 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
DeltaManifests deltaManifests =
SimpleVersionedSerialization.readVersionAndDeSerialize(
DeltaManifestsSerializer.INSTANCE,
committable.getCommittable().manifest());
+
+ WriteResult writeResult =
+ FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(),
table.specs());
+ if (TableUtil.formatVersion(table) > 2) {
+ for (DeleteFile deleteFile : writeResult.deleteFiles()) {
+ if (deleteFile.content() == FileContent.POSITION_DELETES) {
+ Preconditions.checkArgument(
+ ContentFileUtil.isDV(deleteFile),
+ "Can't add position delete file to the %s table.
Concurrent table upgrade to V3 is not supported.",
+ table.name());
+ }
+ }
+ }
+
pendingResults
.computeIfAbsent(e.getKey(), unused -> Lists.newArrayList())
- .add(FlinkManifestUtil.readCompletedFiles(deltaManifests,
table.io(), table.specs()));
+ .add(writeResult);
manifests.addAll(deltaManifests.manifests());
}
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index 77bd2a0f97..927491fa89 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
@@ -32,6 +33,7 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
@@ -61,7 +63,8 @@ class DynamicWriteResultAggregator
private final int cacheMaximumSize;
private transient Map<WriteTarget, Collection<DynamicWriteResult>> results;
private transient Map<String, Map<Integer, PartitionSpec>> specs;
- private transient Map<String, ManifestOutputFileFactory> outputFileFactories;
+ private transient Map<String, Tuple2<ManifestOutputFileFactory, Integer>>
+ outputFileFactoriesAndFormatVersions;
private transient String flinkJobId;
private transient String operatorId;
private transient int subTaskId;
@@ -81,7 +84,7 @@ class DynamicWriteResultAggregator
this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
this.results = Maps.newHashMap();
this.specs = new LRUCache<>(cacheMaximumSize);
- this.outputFileFactories = new LRUCache<>(cacheMaximumSize);
+ this.outputFileFactoriesAndFormatVersions = new
LRUCache<>(cacheMaximumSize);
this.catalog = catalogLoader.loadCatalog();
}
@@ -137,12 +140,14 @@ class DynamicWriteResultAggregator
writeResults.forEach(w -> builder.add(w.writeResult()));
WriteResult result = builder.build();
+ Tuple2<ManifestOutputFileFactory, Integer> outputFileFactoryAndVersion =
+ outputFileFactoryAndFormatVersion(key.tableName());
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
result,
- () -> outputFileFactory(key.tableName()).create(checkpointId),
+ () -> outputFileFactoryAndVersion.f0.create(checkpointId),
spec(key.tableName(), key.specId()),
- 2);
+ outputFileFactoryAndVersion.f1);
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
@@ -160,8 +165,9 @@ class DynamicWriteResultAggregator
}
}
- private ManifestOutputFileFactory outputFileFactory(String tableName) {
- return outputFileFactories.computeIfAbsent(
+ private Tuple2<ManifestOutputFileFactory, Integer>
outputFileFactoryAndFormatVersion(
+ String tableName) {
+ return outputFileFactoriesAndFormatVersions.computeIfAbsent(
tableName,
unused -> {
Table table = catalog.loadTable(TableIdentifier.parse(tableName));
@@ -169,14 +175,16 @@ class DynamicWriteResultAggregator
// Make sure to append an identifier to avoid file clashes in case
the factory was to get
// re-created during a checkpoint, i.e. due to cache eviction.
String fileSuffix = UUID.randomUUID().toString();
- return FlinkManifestUtil.createOutputFileFactory(
- () -> table,
- table.properties(),
- flinkJobId,
- operatorId,
- subTaskId,
- attemptId,
- fileSuffix);
+ ManifestOutputFileFactory outputFileFactory =
+ FlinkManifestUtil.createOutputFileFactory(
+ () -> table,
+ table.properties(),
+ flinkJobId,
+ operatorId,
+ subTaskId,
+ attemptId,
+ fileSuffix);
+ return Tuple2.of(outputFileFactory, TableUtil.formatVersion(table));
});
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index 5ed9da8623..c2a3032858 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -118,10 +117,6 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when
configuring to use UPSERT data.");
- Preconditions.checkArgument(
- !(TableUtil.formatVersion(table) > 2),
- "Dynamic Sink writer does not support upsert
mode in tables (V3+)");
-
if (!table.spec().isUnpartitioned()) {
for (PartitionField partitionField :
table.spec().fields()) {
Preconditions.checkState(
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 7894428a78..d2c688e28c 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -43,6 +43,9 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.HadoopCatalogExtension;
@@ -335,6 +338,140 @@ class TestDynamicCommitter {
.build());
}
+ @Test
+ void testCommitDeleteInDifferentFormatVersion() throws Exception {
+ Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table1.snapshots()).isEmpty();
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+ DynamicCommitter dynamicCommitter =
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ WriteTarget writeTarget =
+ new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId = 10;
+
+ byte[] deltaManifest =
+ aggregator.writeToManifest(
+ writeTarget,
+ Sets.newHashSet(
+ new DynamicWriteResult(
+ writeTarget,
+ WriteResult.builder()
+ .addDataFiles(DATA_FILE)
+ .addDeleteFiles(DELETE_FILE)
+ .build())),
+ checkpointId);
+
+ CommitRequest<DynamicCommittable> commitRequest =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget, deltaManifest, jobId,
operatorId, checkpointId));
+
+ // Upgrade the table version
+ UpdateProperties updateApi = table1.updateProperties();
+ updateApi.set(
+ TableProperties.FORMAT_VERSION,
String.valueOf(TableUtil.formatVersion(table1) + 1));
+ updateApi.commit();
+
+ assertThatThrownBy(() ->
dynamicCommitter.commit(Sets.newHashSet(commitRequest)))
+ .hasMessage(
+ "Can't add position delete file to the %s table. Concurrent table
upgrade to V3 is not supported.",
+ table1.name())
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void testCommitOnlyDataInDifferentFormatVersion() throws Exception {
+ Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table1.snapshots()).isEmpty();
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+ DynamicCommitter dynamicCommitter =
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ WriteTarget writeTarget =
+ new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId = 10;
+
+ byte[] deltaManifest =
+ aggregator.writeToManifest(
+ writeTarget,
+ Sets.newHashSet(
+ new DynamicWriteResult(
+ writeTarget,
WriteResult.builder().addDataFiles(DATA_FILE).build())),
+ checkpointId);
+
+ CommitRequest<DynamicCommittable> commitRequest =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget, deltaManifest, jobId,
operatorId, checkpointId));
+
+ dynamicCommitter.commit(Sets.newHashSet(commitRequest));
+
+ // Upgrade the table version
+ UpdateProperties updateApi = table1.updateProperties();
+ updateApi.set(
+ TableProperties.FORMAT_VERSION,
String.valueOf(TableUtil.formatVersion(table1) + 1));
+ updateApi.commit();
+
+ table1.refresh();
+ assertThat(table1.snapshots()).hasSize(1);
+ Snapshot first = Iterables.getFirst(table1.snapshots(), null);
+ assertThat(first.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+ }
+
@Test
void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 20fae212b4..f0cc46df46 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
@@ -73,6 +74,7 @@ import
org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.FailBeforeAndA
import org.apache.iceberg.inmemory.InMemoryInputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -522,6 +524,209 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
}
}
+ @Test
+ void testUpsertV3() throws Exception {
+ ImmutableMap<String, String> properties =
ImmutableMap.of(TableProperties.FORMAT_VERSION, "3");
+ CATALOG_EXTENSION
+ .catalog()
+ .createTable(
+ TableIdentifier.of(DATABASE, "t1"),
+ SimpleDataUtil.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ null,
+ properties);
+
+ List<DynamicIcebergDataImpl> rows =
+ Lists.newArrayList(
+ // Insert one rows
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ false),
+ // Remaining rows are duplicates
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true));
+
+ executeDynamicSink(rows, env, true, 1, null);
+
+ try (CloseableIterable<Record> iterable =
+ IcebergGenerics.read(
+
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1")))
+ .build()) {
+ List<Record> records = Lists.newArrayList();
+ for (Record record : iterable) {
+ records.add(record);
+ }
+
+ assertThat(records).hasSize(1);
+ Record actual = records.get(0);
+ DynamicIcebergDataImpl input = rows.get(0);
+ assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+ assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+ }
+ }
+
+ @Test
+ void testMultiFormatVersion() throws Exception {
+ ImmutableMap<String, String> properties =
ImmutableMap.of(TableProperties.FORMAT_VERSION, "3");
+ CATALOG_EXTENSION
+ .catalog()
+ .createTable(
+ TableIdentifier.of(DATABASE, "t1"),
+ SimpleDataUtil.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ null,
+ properties);
+
+ ImmutableMap<String, String> properties1 =
ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+ CATALOG_EXTENSION
+ .catalog()
+ .createTable(
+ TableIdentifier.of(DATABASE, "t2"),
+ SimpleDataUtil.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ null,
+ properties1);
+
+ List<DynamicIcebergDataImpl> rowsForTable1 =
+ Lists.newArrayList(
+ // Insert one rows
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ false),
+ // Remaining rows are duplicates
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true));
+
+ List<DynamicIcebergDataImpl> rowsForTable2 =
+ Lists.newArrayList(
+ // Insert one rows
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t2",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ false),
+ // Remaining rows are duplicates
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t2",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t2",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t2",
+ "main",
+ PartitionSpec.unpartitioned(),
+ true,
+ Sets.newHashSet("id"),
+ true));
+
+ List<DynamicIcebergDataImpl> rows = Lists.newArrayList();
+ rows.addAll(rowsForTable1);
+ rows.addAll(rowsForTable2);
+
+ executeDynamicSink(rows, env, true, 1, null);
+
+ try (CloseableIterable<Record> iterable =
+ IcebergGenerics.read(
+
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1")))
+ .build()) {
+ List<Record> records = Lists.newArrayList();
+ for (Record record : iterable) {
+ records.add(record);
+ }
+
+ assertThat(records).hasSize(1);
+ Record actual = records.get(0);
+ DynamicIcebergDataImpl input = rowsForTable1.get(0);
+ assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+ assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+ }
+
+ try (CloseableIterable<Record> iterable =
+ IcebergGenerics.read(
+
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t2")))
+ .build()) {
+ List<Record> records = Lists.newArrayList();
+ for (Record record : iterable) {
+ records.add(record);
+ }
+
+ assertThat(records).hasSize(1);
+ Record actual = records.get(0);
+ DynamicIcebergDataImpl input = rowsForTable2.get(0);
+ assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
+ assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
+ }
+ }
+
@Test
void testCommitFailedBeforeOrAfterCommit() throws Exception {
// Configure a Restart strategy to allow recovery