This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new fa4890e0f9 Flink: Backport Prevent recreation of
ManifestOutputFileFactory during flushing (#14385)
fa4890e0f9 is described below
commit fa4890e0f923ae1f9fbf0065d2430e21e77174f3
Author: Maximilian Michels <[email protected]>
AuthorDate: Tue Oct 21 12:02:19 2025 +0200
Flink: Backport Prevent recreation of ManifestOutputFileFactory during
flushing (#14385)
backports #14358
---
flink/v1.20/build.gradle | 3 -
.../iceberg/flink/sink/FlinkManifestUtil.java | 14 ++-
.../flink/sink/ManifestOutputFileFactory.java | 11 +-
.../flink/sink/dynamic/DynamicIcebergSink.java | 2 +-
.../sink/dynamic/DynamicWriteResultAggregator.java | 34 ++++---
.../iceberg/flink/sink/dynamic/DynamicWriter.java | 5 +
.../iceberg/flink/sink/TestFlinkManifest.java | 2 +-
.../flink/sink/TestManifestOutputFileFactory.java | 112 +++++++++++++++++++++
.../flink/sink/dynamic/TestDynamicCommitter.java | 12 ++-
.../dynamic/TestDynamicWriteResultAggregator.java | 92 ++++++++++++++++-
.../flink/sink/dynamic/TestDynamicWriter.java | 32 ++++++
flink/v2.0/build.gradle | 3 -
.../iceberg/flink/sink/FlinkManifestUtil.java | 14 ++-
.../flink/sink/ManifestOutputFileFactory.java | 11 +-
.../flink/sink/dynamic/DynamicIcebergSink.java | 2 +-
.../sink/dynamic/DynamicWriteResultAggregator.java | 34 ++++---
.../iceberg/flink/sink/dynamic/DynamicWriter.java | 5 +
.../iceberg/flink/sink/TestFlinkManifest.java | 2 +-
.../flink/sink/TestManifestOutputFileFactory.java | 112 +++++++++++++++++++++
.../flink/sink/dynamic/TestDynamicCommitter.java | 12 ++-
.../dynamic/TestDynamicWriteResultAggregator.java | 92 ++++++++++++++++-
.../flink/sink/dynamic/TestDynamicWriter.java | 32 ++++++
22 files changed, 580 insertions(+), 58 deletions(-)
diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle
index d0bbe7a898..3591bf37b1 100644
--- a/flink/v1.20/build.gradle
+++ b/flink/v1.20/build.gradle
@@ -68,9 +68,6 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
implementation libs.datasketches
- // for caching in DynamicSink
- implementation libs.caffeine
-
testImplementation libs.flink120.connector.test.utils
testImplementation libs.flink120.core
testImplementation libs.flink120.runtime
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 13affd8484..0eeedf2659 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -74,7 +74,19 @@ public class FlinkManifestUtil {
int subTaskId,
long attemptNumber) {
return new ManifestOutputFileFactory(
- tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber);
+ tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, null);
+ }
+
+ public static ManifestOutputFileFactory createOutputFileFactory(
+ Supplier<Table> tableSupplier,
+ Map<String, String> tableProps,
+ String flinkJobId,
+ String operatorUniqueId,
+ int subTaskId,
+ long attemptNumber,
+ String suffix) {
+ return new ManifestOutputFileFactory(
+ tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, suffix);
}
/**
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
index 6ba87bea30..30c0e11a25 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
@@ -22,6 +22,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
@@ -43,6 +44,7 @@ public class ManifestOutputFileFactory {
private final int subTaskId;
private final long attemptNumber;
private final AtomicInteger fileCount = new AtomicInteger(0);
+ @Nullable private final String suffix;
ManifestOutputFileFactory(
Supplier<Table> tableSupplier,
@@ -50,26 +52,29 @@ public class ManifestOutputFileFactory {
String flinkJobId,
String operatorUniqueId,
int subTaskId,
- long attemptNumber) {
+ long attemptNumber,
+ @Nullable String suffix) {
this.tableSupplier = tableSupplier;
this.props = props;
this.flinkJobId = flinkJobId;
this.operatorUniqueId = operatorUniqueId;
this.subTaskId = subTaskId;
this.attemptNumber = attemptNumber;
+ this.suffix = suffix;
}
private String generatePath(long checkpointId) {
return FileFormat.AVRO.addExtension(
String.format(
Locale.ROOT,
- "%s-%s-%05d-%d-%d-%05d",
+ "%s-%s-%05d-%d-%d-%05d%s",
flinkJobId,
operatorUniqueId,
subTaskId,
attemptNumber,
checkpointId,
- fileCount.incrementAndGet()));
+ fileCount.incrementAndGet(),
+ suffix != null ? "-" + suffix : ""));
}
public OutputFile create(long checkpointId) {
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index 9547de78d6..e90fe4d6b1 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -168,7 +168,7 @@ public class DynamicIcebergSink
.transform(
prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
typeInformation,
- new DynamicWriteResultAggregator(catalogLoader))
+ new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
.uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index 58ba183dfc..b92d32fcc4 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -18,12 +18,10 @@
*/
package org.apache.iceberg.flink.sink.dynamic;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
-import java.time.Duration;
import java.util.Collection;
import java.util.Map;
+import java.util.UUID;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -58,20 +56,21 @@ class DynamicWriteResultAggregator
CommittableMessage<DynamicWriteResult>,
CommittableMessage<DynamicCommittable>> {
private static final Logger LOG =
LoggerFactory.getLogger(DynamicWriteResultAggregator.class);
private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
- private static final Duration CACHE_EXPIRATION_DURATION =
Duration.ofMinutes(1);
private final CatalogLoader catalogLoader;
+ private final int cacheMaximumSize;
private transient Map<WriteTarget, Collection<DynamicWriteResult>> results;
- private transient Cache<String, Map<Integer, PartitionSpec>> specs;
- private transient Cache<String, ManifestOutputFileFactory>
outputFileFactories;
+ private transient Map<String, Map<Integer, PartitionSpec>> specs;
+ private transient Map<String, ManifestOutputFileFactory> outputFileFactories;
private transient String flinkJobId;
private transient String operatorId;
private transient int subTaskId;
private transient int attemptId;
private transient Catalog catalog;
- DynamicWriteResultAggregator(CatalogLoader catalogLoader) {
+ DynamicWriteResultAggregator(CatalogLoader catalogLoader, int
cacheMaximumSize) {
this.catalogLoader = catalogLoader;
+ this.cacheMaximumSize = cacheMaximumSize;
}
@Override
@@ -81,10 +80,8 @@ class DynamicWriteResultAggregator
this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
this.results = Maps.newHashMap();
- this.specs =
-
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
- this.outputFileFactories =
-
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
+ this.specs = new LRUCache<>(cacheMaximumSize);
+ this.outputFileFactories = new LRUCache<>(cacheMaximumSize);
this.catalog = catalogLoader.loadCatalog();
}
@@ -163,18 +160,27 @@ class DynamicWriteResultAggregator
}
private ManifestOutputFileFactory outputFileFactory(String tableName) {
- return outputFileFactories.get(
+ return outputFileFactories.computeIfAbsent(
tableName,
unused -> {
Table table = catalog.loadTable(TableIdentifier.parse(tableName));
specs.put(tableName, table.specs());
+ // Make sure to append an identifier to avoid file clashes in case
the factory was to get
+ // re-created during a checkpoint, i.e. due to cache eviction.
+ String fileSuffix = UUID.randomUUID().toString();
return FlinkManifestUtil.createOutputFileFactory(
- () -> table, table.properties(), flinkJobId, operatorId,
subTaskId, attemptId);
+ () -> table,
+ table.properties(),
+ flinkJobId,
+ operatorId,
+ subTaskId,
+ attemptId,
+ fileSuffix);
});
}
private PartitionSpec spec(String tableName, int specId) {
- Map<Integer, PartitionSpec> knownSpecs = specs.getIfPresent(tableName);
+ Map<Integer, PartitionSpec> knownSpecs = specs.get(tableName);
if (knownSpecs != null) {
PartitionSpec spec = knownSpecs.get(specId);
if (spec != null) {
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index ae24efafa6..e99e6e72da 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -210,4 +210,9 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
DynamicWriterMetrics getMetrics() {
return metrics;
}
+
+ @VisibleForTesting
+ Map<WriteTarget, RowDataTaskWriterFactory> getTaskWriterFactories() {
+ return taskWriterFactories;
+ }
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 10197ddfaf..c6dc984513 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -134,7 +134,7 @@ public class TestFlinkManifest {
ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION,
userProvidedFolder.getAbsolutePath() + "///");
ManifestOutputFileFactory factory =
- new ManifestOutputFileFactory(() -> table, props, flinkJobId,
operatorId, 1, 1);
+ new ManifestOutputFileFactory(() -> table, props, flinkJobId,
operatorId, 1, 1, null);
List<DataFile> dataFiles = generateDataFiles(5);
DeltaManifests deltaManifests =
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
new file mode 100644
index 0000000000..654248fcab
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class TestManifestOutputFileFactory {
+
+ @RegisterExtension
+ static final HadoopCatalogExtension CATALOG_EXTENSION = new
HadoopCatalogExtension("db", "table");
+
+ private Table table;
+
+ @BeforeEach
+ void before() throws IOException {
+ table =
CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new
Schema());
+ }
+
+ @Test
+ public void testFileNameFormat() {
+ String flinkJobId = "job123";
+ String operatorUniqueId = "operator456";
+ int subTaskId = 7;
+ long attemptNumber = 2;
+ long checkpointId = 100;
+ Map<String, String> props = table.properties();
+
+ ManifestOutputFileFactory factory =
+ new ManifestOutputFileFactory(
+ () -> table, props, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, null);
+
+ String file1 = new File(factory.create(checkpointId).location()).getName();
+ assertThat(file1).isEqualTo("job123-operator456-00007-2-100-00001.avro");
+
+ String file2 = new File(factory.create(checkpointId).location()).getName();
+ assertThat(file2).isEqualTo("job123-operator456-00007-2-100-00002.avro");
+
+ String file3 = new File(factory.create(checkpointId +
1).location()).getName();
+ assertThat(file3).isEqualTo("job123-operator456-00007-2-101-00003.avro");
+ }
+
+ @Test
+ public void testFileNameFormatWithSuffix() {
+ String flinkJobId = "job123";
+ String operatorUniqueId = "operator456";
+ int subTaskId = 7;
+ long attemptNumber = 2;
+ long checkpointId = 100;
+ Map<String, String> props = table.properties();
+
+ ManifestOutputFileFactory factory =
+ new ManifestOutputFileFactory(
+ () -> table, props, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, "suffix");
+
+ String file1 = new File(factory.create(checkpointId).location()).getName();
+
assertThat(file1).isEqualTo("job123-operator456-00007-2-100-00001-suffix.avro");
+
+ String file2 = new File(factory.create(checkpointId).location()).getName();
+
assertThat(file2).isEqualTo("job123-operator456-00007-2-100-00002-suffix.avro");
+ }
+
+ @Test
+ public void testSuffixedFileNamesWithRecreatedFactory() {
+ String flinkJobId = "test-job";
+ String operatorUniqueId = "test-operator";
+ int subTaskId = 0;
+ long attemptNumber = 1;
+ long checkpointId = 1;
+ Map<String, String> props = table.properties();
+
+ ManifestOutputFileFactory factory1 =
+ new ManifestOutputFileFactory(
+ () -> table, props, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, "suffix1");
+ String file1 = new
File(factory1.create(checkpointId).location()).getName();
+
assertThat(file1).isEqualTo("test-job-test-operator-00000-1-1-00001-suffix1.avro");
+
+ ManifestOutputFileFactory factory2 =
+ new ManifestOutputFileFactory(
+ () -> table, props, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, "suffix2");
+ String file2 = new
File(factory2.create(checkpointId).location()).getName();
+
assertThat(file2).isEqualTo("test-job-test-operator-00000-1-1-00001-suffix2.avro");
+
+ assertThat(file1).isNotEqualTo(file2);
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index f5387aee88..7894428a78 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -68,6 +68,8 @@ class TestDynamicCommitter {
Catalog catalog;
+ final int cacheMaximumSize = 10;
+
private static final DataFile DATA_FILE =
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-1.parquet")
@@ -155,7 +157,7 @@ class TestDynamicCommitter {
new WriteTarget(TABLE2, "branch2", 43, 0, true, Sets.newHashSet(1, 2));
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();
@@ -281,7 +283,7 @@ class TestDynamicCommitter {
new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();
@@ -339,7 +341,7 @@ class TestDynamicCommitter {
assertThat(table.snapshots()).isEmpty();
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();
@@ -454,7 +456,7 @@ class TestDynamicCommitter {
assertThat(table.snapshots()).isEmpty();
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();
@@ -613,7 +615,7 @@ class TestDynamicCommitter {
new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
index 713c67da17..d3f4385d97 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
@@ -20,15 +20,26 @@ package org.apache.iceberg.flink.sink.dynamic;
import static org.assertj.core.api.Assertions.assertThat;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.hadoop.util.Sets;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.sink.DeltaManifests;
+import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -37,13 +48,22 @@ class TestDynamicWriteResultAggregator {
@RegisterExtension
static final HadoopCatalogExtension CATALOG_EXTENSION = new
HadoopCatalogExtension("db", "table");
+ final int cacheMaximumSize = 1;
+
+ private static final DataFile DATA_FILE =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-1.parquet")
+ .withFileSizeInBytes(100)
+ .withRecordCount(1)
+ .build();
+
@Test
void testAggregator() throws Exception {
CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new
Schema());
CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new
Schema());
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
try (OneInputStreamOperatorTestHarness<
CommittableMessage<DynamicWriteResult>,
CommittableMessage<DynamicCommittable>>
testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) {
@@ -79,4 +99,74 @@ class TestDynamicWriteResultAggregator {
assertThat(testHarness.getRecordOutput()).hasSize(4);
}
}
+
+ @Test
+ void testPreventOutputFileFactoryCacheEvictionDuringFlush() throws Exception
{
+ CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new
Schema());
+
+ // Disable caching of ManifestOutputFileFactory.
+ final int zeroCacheSize = 0;
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
zeroCacheSize);
+ try (OneInputStreamOperatorTestHarness<
+ CommittableMessage<DynamicWriteResult>,
CommittableMessage<DynamicCommittable>>
+ testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) {
+ testHarness.open();
+
+ WriteTarget writeTarget1 =
+ new WriteTarget("table", "branch", 42, 0, false, Sets.newHashSet());
+ DynamicWriteResult dynamicWriteResult1 =
+ new DynamicWriteResult(
+ writeTarget1,
WriteResult.builder().addDataFiles(DATA_FILE).build());
+
+ // Different WriteTarget
+ WriteTarget writeTarget2 =
+ new WriteTarget("table", "branch2", 23, 0, true, Sets.newHashSet());
+ DynamicWriteResult dynamicWriteResult2 =
+ new DynamicWriteResult(
+ writeTarget2,
WriteResult.builder().addDataFiles(DATA_FILE).build());
+
+ CommittableWithLineage<DynamicWriteResult> committable1 =
+ new CommittableWithLineage<>(dynamicWriteResult1, 0, 0);
+ testHarness.processElement(new StreamRecord<>(committable1));
+
+ CommittableWithLineage<DynamicWriteResult> committable2 =
+ new CommittableWithLineage<>(dynamicWriteResult2, 0, 0);
+ testHarness.processElement(new StreamRecord<>(committable2));
+
+ assertThat(testHarness.getOutput()).isEmpty();
+
+ testHarness.prepareSnapshotPreBarrier(1L);
+ List<StreamRecord<CommittableMessage<DynamicCommittable>>> output =
+ Lists.newArrayList(testHarness.getRecordOutput().iterator());
+
+ assertThat(testHarness.getOutput()).hasSize(3);
+
assertThat(output.get(0).getValue()).isInstanceOf(CommittableSummary.class);
+
assertThat(output.get(1).getValue()).isInstanceOf(CommittableWithLineage.class);
+
assertThat(output.get(2).getValue()).isInstanceOf(CommittableWithLineage.class);
+
+ // There should be two unique file paths, despite the cache being
disabled.
+ Set<String> manifestPaths = getManifestPaths(output);
+ assertThat(manifestPaths).hasSize(2);
+ }
+ }
+
+ private static Set<String> getManifestPaths(
+ List<StreamRecord<CommittableMessage<DynamicCommittable>>> messages)
throws IOException {
+ Set<String> manifestPaths = Sets.newHashSet();
+
+ for (StreamRecord<CommittableMessage<DynamicCommittable>> record :
messages) {
+ CommittableMessage<DynamicCommittable> message = record.getValue();
+ if (message instanceof CommittableWithLineage) {
+ DeltaManifests deltaManifests =
+ SimpleVersionedSerialization.readVersionAndDeSerialize(
+ DeltaManifestsSerializer.INSTANCE,
+ (((CommittableWithLineage<DynamicCommittable>)
message).getCommittable())
+ .manifest());
+ deltaManifests.manifests().forEach(manifest ->
manifestPaths.add(manifest.path()));
+ }
+ }
+
+ return manifestPaths;
+ }
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 89dd4f2259..689fd20483 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -202,6 +202,38 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
"Equality field columns shouldn't be empty when configuring to use
UPSERT data.");
}
+ @Test
+ void testUniqueFileSuffixOnFactoryRecreation() throws Exception {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA);
+
+ DynamicWriter dynamicWriter = createDynamicWriter(catalog);
+ DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
+
+ dynamicWriter.write(record1, null);
+ dynamicWriter.prepareCommit();
+
+ File dataDir1 = new File(URI.create(table1.location()).getPath(), "data");
+ File[] files = dataDir1.listFiles((dir, name) -> !name.startsWith("."));
+ assertThat(files).isNotNull().hasSize(1);
+ File firstFile = files[0];
+
+ // Clear cache which must create new unique files names for the output
files
+ dynamicWriter.getTaskWriterFactories().clear();
+
+ dynamicWriter.write(record1, null);
+ dynamicWriter.prepareCommit();
+
+ files =
+ dataDir1.listFiles(
+ (dir, name) -> !name.startsWith(".") &&
!name.equals(firstFile.getName()));
+ assertThat(files).isNotNull().hasSize(1);
+ File secondFile = files[0];
+
+ // File names must be different
+ assertThat(firstFile.getName()).isNotEqualTo(secondFile.getName());
+ }
+
private static @Nonnull DynamicWriter createDynamicWriter(
Catalog catalog, Map<String, String> properties) {
DynamicWriter dynamicWriter =
diff --git a/flink/v2.0/build.gradle b/flink/v2.0/build.gradle
index 01fbc1a4fe..5907f41b35 100644
--- a/flink/v2.0/build.gradle
+++ b/flink/v2.0/build.gradle
@@ -68,9 +68,6 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
implementation libs.datasketches
- // for caching in DynamicSink
- implementation libs.caffeine
-
testImplementation libs.flink20.connector.test.utils
testImplementation libs.flink20.core
testImplementation libs.flink20.runtime
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 13affd8484..0eeedf2659 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -74,7 +74,19 @@ public class FlinkManifestUtil {
int subTaskId,
long attemptNumber) {
return new ManifestOutputFileFactory(
- tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber);
+ tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, null);
+ }
+
+ public static ManifestOutputFileFactory createOutputFileFactory(
+ Supplier<Table> tableSupplier,
+ Map<String, String> tableProps,
+ String flinkJobId,
+ String operatorUniqueId,
+ int subTaskId,
+ long attemptNumber,
+ String suffix) {
+ return new ManifestOutputFileFactory(
+ tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, suffix);
}
/**
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
index 6ba87bea30..30c0e11a25 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
@@ -22,6 +22,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
@@ -43,6 +44,7 @@ public class ManifestOutputFileFactory {
private final int subTaskId;
private final long attemptNumber;
private final AtomicInteger fileCount = new AtomicInteger(0);
+ @Nullable private final String suffix;
ManifestOutputFileFactory(
Supplier<Table> tableSupplier,
@@ -50,26 +52,29 @@ public class ManifestOutputFileFactory {
String flinkJobId,
String operatorUniqueId,
int subTaskId,
- long attemptNumber) {
+ long attemptNumber,
+ @Nullable String suffix) {
this.tableSupplier = tableSupplier;
this.props = props;
this.flinkJobId = flinkJobId;
this.operatorUniqueId = operatorUniqueId;
this.subTaskId = subTaskId;
this.attemptNumber = attemptNumber;
+ this.suffix = suffix;
}
private String generatePath(long checkpointId) {
return FileFormat.AVRO.addExtension(
String.format(
Locale.ROOT,
- "%s-%s-%05d-%d-%d-%05d",
+ "%s-%s-%05d-%d-%d-%05d%s",
flinkJobId,
operatorUniqueId,
subTaskId,
attemptNumber,
checkpointId,
- fileCount.incrementAndGet()));
+ fileCount.incrementAndGet(),
+ suffix != null ? "-" + suffix : ""));
}
public OutputFile create(long checkpointId) {
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index 2715a01608..db48be7977 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -168,7 +168,7 @@ public class DynamicIcebergSink
.transform(
prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
typeInformation,
- new DynamicWriteResultAggregator(catalogLoader))
+ new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
.uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index 58ba183dfc..b92d32fcc4 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -18,12 +18,10 @@
*/
package org.apache.iceberg.flink.sink.dynamic;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
-import java.time.Duration;
import java.util.Collection;
import java.util.Map;
+import java.util.UUID;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -58,20 +56,21 @@ class DynamicWriteResultAggregator
CommittableMessage<DynamicWriteResult>,
CommittableMessage<DynamicCommittable>> {
private static final Logger LOG =
LoggerFactory.getLogger(DynamicWriteResultAggregator.class);
private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
- private static final Duration CACHE_EXPIRATION_DURATION =
Duration.ofMinutes(1);
private final CatalogLoader catalogLoader;
+ private final int cacheMaximumSize;
private transient Map<WriteTarget, Collection<DynamicWriteResult>> results;
- private transient Cache<String, Map<Integer, PartitionSpec>> specs;
- private transient Cache<String, ManifestOutputFileFactory>
outputFileFactories;
+ private transient Map<String, Map<Integer, PartitionSpec>> specs;
+ private transient Map<String, ManifestOutputFileFactory> outputFileFactories;
private transient String flinkJobId;
private transient String operatorId;
private transient int subTaskId;
private transient int attemptId;
private transient Catalog catalog;
- DynamicWriteResultAggregator(CatalogLoader catalogLoader) {
+ DynamicWriteResultAggregator(CatalogLoader catalogLoader, int
cacheMaximumSize) {
this.catalogLoader = catalogLoader;
+ this.cacheMaximumSize = cacheMaximumSize;
}
@Override
@@ -81,10 +80,8 @@ class DynamicWriteResultAggregator
this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
this.results = Maps.newHashMap();
- this.specs =
-
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
- this.outputFileFactories =
-
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
+ this.specs = new LRUCache<>(cacheMaximumSize);
+ this.outputFileFactories = new LRUCache<>(cacheMaximumSize);
this.catalog = catalogLoader.loadCatalog();
}
@@ -163,18 +160,27 @@ class DynamicWriteResultAggregator
}
private ManifestOutputFileFactory outputFileFactory(String tableName) {
- return outputFileFactories.get(
+ return outputFileFactories.computeIfAbsent(
tableName,
unused -> {
Table table = catalog.loadTable(TableIdentifier.parse(tableName));
specs.put(tableName, table.specs());
+ // Make sure to append an identifier to avoid file clashes in case
the factory was to get
+ // re-created during a checkpoint, i.e. due to cache eviction.
+ String fileSuffix = UUID.randomUUID().toString();
return FlinkManifestUtil.createOutputFileFactory(
- () -> table, table.properties(), flinkJobId, operatorId,
subTaskId, attemptId);
+ () -> table,
+ table.properties(),
+ flinkJobId,
+ operatorId,
+ subTaskId,
+ attemptId,
+ fileSuffix);
});
}
private PartitionSpec spec(String tableName, int specId) {
- Map<Integer, PartitionSpec> knownSpecs = specs.getIfPresent(tableName);
+ Map<Integer, PartitionSpec> knownSpecs = specs.get(tableName);
if (knownSpecs != null) {
PartitionSpec spec = knownSpecs.get(specId);
if (spec != null) {
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index ae24efafa6..e99e6e72da 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -210,4 +210,9 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
DynamicWriterMetrics getMetrics() {
return metrics;
}
+
+ @VisibleForTesting
+ Map<WriteTarget, RowDataTaskWriterFactory> getTaskWriterFactories() {
+ return taskWriterFactories;
+ }
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 10197ddfaf..c6dc984513 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -134,7 +134,7 @@ public class TestFlinkManifest {
ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION,
userProvidedFolder.getAbsolutePath() + "///");
ManifestOutputFileFactory factory =
- new ManifestOutputFileFactory(() -> table, props, flinkJobId,
operatorId, 1, 1);
+ new ManifestOutputFileFactory(() -> table, props, flinkJobId,
operatorId, 1, 1, null);
List<DataFile> dataFiles = generateDataFiles(5);
DeltaManifests deltaManifests =
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
new file mode 100644
index 0000000000..654248fcab
--- /dev/null
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestManifestOutputFileFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class TestManifestOutputFileFactory {
+
+ @RegisterExtension
+ static final HadoopCatalogExtension CATALOG_EXTENSION = new
HadoopCatalogExtension("db", "table");
+
+ private Table table;
+
+ @BeforeEach
+ void before() throws IOException {
+ table =
CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new
Schema());
+ }
+
+ @Test
+ public void testFileNameFormat() {
+ String flinkJobId = "job123";
+ String operatorUniqueId = "operator456";
+ int subTaskId = 7;
+ long attemptNumber = 2;
+ long checkpointId = 100;
+ Map<String, String> props = table.properties();
+
+ ManifestOutputFileFactory factory =
+ new ManifestOutputFileFactory(
+ () -> table, props, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, null);
+
+ String file1 = new File(factory.create(checkpointId).location()).getName();
+ assertThat(file1).isEqualTo("job123-operator456-00007-2-100-00001.avro");
+
+ String file2 = new File(factory.create(checkpointId).location()).getName();
+ assertThat(file2).isEqualTo("job123-operator456-00007-2-100-00002.avro");
+
+ String file3 = new File(factory.create(checkpointId +
1).location()).getName();
+ assertThat(file3).isEqualTo("job123-operator456-00007-2-101-00003.avro");
+ }
+
+ @Test
+ public void testFileNameFormatWithSuffix() {
+ String flinkJobId = "job123";
+ String operatorUniqueId = "operator456";
+ int subTaskId = 7;
+ long attemptNumber = 2;
+ long checkpointId = 100;
+ Map<String, String> props = table.properties();
+
+ ManifestOutputFileFactory factory =
+ new ManifestOutputFileFactory(
+ () -> table, props, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, "suffix");
+
+ String file1 = new File(factory.create(checkpointId).location()).getName();
+
assertThat(file1).isEqualTo("job123-operator456-00007-2-100-00001-suffix.avro");
+
+ String file2 = new File(factory.create(checkpointId).location()).getName();
+
assertThat(file2).isEqualTo("job123-operator456-00007-2-100-00002-suffix.avro");
+ }
+
+ @Test
+ public void testSuffixedFileNamesWithRecreatedFactory() {
+ String flinkJobId = "test-job";
+ String operatorUniqueId = "test-operator";
+ int subTaskId = 0;
+ long attemptNumber = 1;
+ long checkpointId = 1;
+ Map<String, String> props = table.properties();
+
+ ManifestOutputFileFactory factory1 =
+ new ManifestOutputFileFactory(
+ () -> table, props, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, "suffix1");
+ String file1 = new
File(factory1.create(checkpointId).location()).getName();
+
assertThat(file1).isEqualTo("test-job-test-operator-00000-1-1-00001-suffix1.avro");
+
+ ManifestOutputFileFactory factory2 =
+ new ManifestOutputFileFactory(
+ () -> table, props, flinkJobId, operatorUniqueId, subTaskId,
attemptNumber, "suffix2");
+ String file2 = new
File(factory2.create(checkpointId).location()).getName();
+
assertThat(file2).isEqualTo("test-job-test-operator-00000-1-1-00001-suffix2.avro");
+
+ assertThat(file1).isNotEqualTo(file2);
+ }
+}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index f5387aee88..7894428a78 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -68,6 +68,8 @@ class TestDynamicCommitter {
Catalog catalog;
+ final int cacheMaximumSize = 10;
+
private static final DataFile DATA_FILE =
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-1.parquet")
@@ -155,7 +157,7 @@ class TestDynamicCommitter {
new WriteTarget(TABLE2, "branch2", 43, 0, true, Sets.newHashSet(1, 2));
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();
@@ -281,7 +283,7 @@ class TestDynamicCommitter {
new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();
@@ -339,7 +341,7 @@ class TestDynamicCommitter {
assertThat(table.snapshots()).isEmpty();
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();
@@ -454,7 +456,7 @@ class TestDynamicCommitter {
assertThat(table.snapshots()).isEmpty();
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();
@@ -613,7 +615,7 @@ class TestDynamicCommitter {
new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2));
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
index 713c67da17..d3f4385d97 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java
@@ -20,15 +20,26 @@ package org.apache.iceberg.flink.sink.dynamic;
import static org.assertj.core.api.Assertions.assertThat;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.hadoop.util.Sets;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.sink.DeltaManifests;
+import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -37,13 +48,22 @@ class TestDynamicWriteResultAggregator {
@RegisterExtension
static final HadoopCatalogExtension CATALOG_EXTENSION = new
HadoopCatalogExtension("db", "table");
+ final int cacheMaximumSize = 1;
+
+ private static final DataFile DATA_FILE =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-1.parquet")
+ .withFileSizeInBytes(100)
+ .withRecordCount(1)
+ .build();
+
@Test
void testAggregator() throws Exception {
CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new
Schema());
CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new
Schema());
DynamicWriteResultAggregator aggregator =
- new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
try (OneInputStreamOperatorTestHarness<
CommittableMessage<DynamicWriteResult>,
CommittableMessage<DynamicCommittable>>
testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) {
@@ -79,4 +99,74 @@ class TestDynamicWriteResultAggregator {
assertThat(testHarness.getRecordOutput()).hasSize(4);
}
}
+
+ @Test
+ void testPreventOutputFileFactoryCacheEvictionDuringFlush() throws Exception
{
+ CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new
Schema());
+
+ // Disable caching of ManifestOutputFileFactory.
+ final int zeroCacheSize = 0;
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
zeroCacheSize);
+ try (OneInputStreamOperatorTestHarness<
+ CommittableMessage<DynamicWriteResult>,
CommittableMessage<DynamicCommittable>>
+ testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) {
+ testHarness.open();
+
+ WriteTarget writeTarget1 =
+ new WriteTarget("table", "branch", 42, 0, false, Sets.newHashSet());
+ DynamicWriteResult dynamicWriteResult1 =
+ new DynamicWriteResult(
+ writeTarget1,
WriteResult.builder().addDataFiles(DATA_FILE).build());
+
+ // Different WriteTarget
+ WriteTarget writeTarget2 =
+ new WriteTarget("table", "branch2", 23, 0, true, Sets.newHashSet());
+ DynamicWriteResult dynamicWriteResult2 =
+ new DynamicWriteResult(
+ writeTarget2,
WriteResult.builder().addDataFiles(DATA_FILE).build());
+
+ CommittableWithLineage<DynamicWriteResult> committable1 =
+ new CommittableWithLineage<>(dynamicWriteResult1, 0, 0);
+ testHarness.processElement(new StreamRecord<>(committable1));
+
+ CommittableWithLineage<DynamicWriteResult> committable2 =
+ new CommittableWithLineage<>(dynamicWriteResult2, 0, 0);
+ testHarness.processElement(new StreamRecord<>(committable2));
+
+ assertThat(testHarness.getOutput()).isEmpty();
+
+ testHarness.prepareSnapshotPreBarrier(1L);
+ List<StreamRecord<CommittableMessage<DynamicCommittable>>> output =
+ Lists.newArrayList(testHarness.getRecordOutput().iterator());
+
+ assertThat(testHarness.getOutput()).hasSize(3);
+
assertThat(output.get(0).getValue()).isInstanceOf(CommittableSummary.class);
+
assertThat(output.get(1).getValue()).isInstanceOf(CommittableWithLineage.class);
+
assertThat(output.get(2).getValue()).isInstanceOf(CommittableWithLineage.class);
+
+ // There should be two unique file paths, despite the cache being
disabled.
+ Set<String> manifestPaths = getManifestPaths(output);
+ assertThat(manifestPaths).hasSize(2);
+ }
+ }
+
+ private static Set<String> getManifestPaths(
+ List<StreamRecord<CommittableMessage<DynamicCommittable>>> messages)
throws IOException {
+ Set<String> manifestPaths = Sets.newHashSet();
+
+ for (StreamRecord<CommittableMessage<DynamicCommittable>> record :
messages) {
+ CommittableMessage<DynamicCommittable> message = record.getValue();
+ if (message instanceof CommittableWithLineage) {
+ DeltaManifests deltaManifests =
+ SimpleVersionedSerialization.readVersionAndDeSerialize(
+ DeltaManifestsSerializer.INSTANCE,
+ (((CommittableWithLineage<DynamicCommittable>)
message).getCommittable())
+ .manifest());
+ deltaManifests.manifests().forEach(manifest ->
manifestPaths.add(manifest.path()));
+ }
+ }
+
+ return manifestPaths;
+ }
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 89dd4f2259..689fd20483 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -202,6 +202,38 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
"Equality field columns shouldn't be empty when configuring to use
UPSERT data.");
}
+ @Test
+ void testUniqueFileSuffixOnFactoryRecreation() throws Exception {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA);
+
+ DynamicWriter dynamicWriter = createDynamicWriter(catalog);
+ DynamicRecordInternal record1 = getDynamicRecordInternal(table1);
+
+ dynamicWriter.write(record1, null);
+ dynamicWriter.prepareCommit();
+
+ File dataDir1 = new File(URI.create(table1.location()).getPath(), "data");
+ File[] files = dataDir1.listFiles((dir, name) -> !name.startsWith("."));
+ assertThat(files).isNotNull().hasSize(1);
+ File firstFile = files[0];
+
+ // Clear cache which must create new unique files names for the output
files
+ dynamicWriter.getTaskWriterFactories().clear();
+
+ dynamicWriter.write(record1, null);
+ dynamicWriter.prepareCommit();
+
+ files =
+ dataDir1.listFiles(
+ (dir, name) -> !name.startsWith(".") &&
!name.equals(firstFile.getName()));
+ assertThat(files).isNotNull().hasSize(1);
+ File secondFile = files[0];
+
+ // File names must be different
+ assertThat(firstFile.getName()).isNotEqualTo(secondFile.getName());
+ }
+
private static @Nonnull DynamicWriter createDynamicWriter(
Catalog catalog, Map<String, String> properties) {
DynamicWriter dynamicWriter =