This is an automated email from the ASF dual-hosted git repository.
jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7d0111dfab6 [FLINK-29114][connector][filesystem] Fix issue of file
overwriting caused by multiple writes to the same sink table and shared staging
directory
7d0111dfab6 is described below
commit 7d0111dfab640f2f590dd710d76de927c86cf83e
Author: Jane Chan <[email protected]>
AuthorDate: Wed Mar 13 17:50:49 2024 +0800
[FLINK-29114][connector][filesystem] Fix issue of file overwriting caused
by multiple writes to the same sink table and shared staging directory
This closes #24390
* Fix unstable TableSourceITCase#testTableHintWithLogicalTableScanReuse
* Moves the staging dir configuration into builder for easier testing
---------
Co-authored-by: Matthias Pohl <[email protected]>
---
.../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 15 +-
.../file/table/FileSystemOutputFormat.java | 55 +++-
.../connector/file/table/FileSystemTableSink.java | 15 +-
.../file/table/FileSystemOutputFormatTest.java | 283 ++++++++++++---------
.../flink/connectors/hive/HiveTableSink.java | 3 +-
5 files changed, 222 insertions(+), 149 deletions(-)
diff --git
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
index cafa384fe7f..9ec54ebede1 100644
---
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
+++
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
@@ -679,17 +679,18 @@ Method
<org.apache.flink.connector.file.table.DynamicPartitionWriter.write(java.
Method
<org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader,
org.apache.flink.connector.file.src.FileSourceSplit)> calls method
<org.apache.flink.table.utils.PartitionPathUtils.convertStringToInternalValue(java.lang.String,
org.apache.flink.table.types.DataType)> in
(FileInfoExtractorBulkFormat.java:156)
Method
<org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader,
org.apache.flink.connector.file.src.FileSourceSplit)> calls method
<org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)>
in (FileInfoExtractorBulkFormat.java:140)
Method
<org.apache.flink.connector.file.table.FileSystemCommitter.commitPartitionsWithFiles(java.util.Map)>
calls method
<org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)>
in (FileSystemCommitter.java:146)
-Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object,
java.lang.String)> in (FileSystemOutputFormat.java:288)
-Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object,
java.lang.String)> in (FileSystemOutputFormat.java:289)
-Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object,
java.lang.String)> in (FileSystemOutputFormat.java:290)
-Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object,
java.lang.String)> in (FileSystemOutputFormat.java:291)
-Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object,
java.lang.String)> in (FileSystemOutputFormat.java:292)
+Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object,
java.lang.String)> in (FileSystemOutputFormat.java:324)
+Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object,
java.lang.String)> in (FileSystemOutputFormat.java:325)
+Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object,
java.lang.String)> in (FileSystemOutputFormat.java:326)
+Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object,
java.lang.String)> in (FileSystemOutputFormat.java:327)
+Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object,
java.lang.String)> in (FileSystemOutputFormat.java:328)
Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)>
has parameter of type
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in
(FileSystemOutputFormat.java:0)
-Method
<org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData,
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)>
calls method
<org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)>
in (FileSystemTableSink.java:566)
+Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setStagingPath(org.apache.flink.core.fs.Path)>
is annotated with <org.apache.flink.annotation.VisibleForTesting> in
(FileSystemOutputFormat.java:291)
+Method
<org.apache.flink.connector.file.table.FileSystemOutputFormat.createStagingDirectory(org.apache.flink.core.fs.Path)>
calls method <org.apache.flink.util.Preconditions.checkState(boolean,
java.lang.String, [Ljava.lang.Object;)> in (FileSystemOutputFormat.java:109)
+Method
<org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData,
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)>
calls method
<org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)>
in (FileSystemTableSink.java:553)
Method
<org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream,
org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)>
calls method <org.apache.flink.api.dag.Transformation.setParallelism(int,
boolean)> in (FileSystemTableSink.java:208)
Method
<org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream,
org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)>
calls method
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()>
in (FileSystemTableSink.java:189)
Method
<org.apache.flink.connector.file.table.FileSystemTableSink.createStreamingSink(org.apache.flink.table.connector.ProviderContext,
org.apache.flink.streaming.api.datastream.DataStream,
org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)>
calls method
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()>
in (FileSystemTableSink.java:233)
-Method
<org.apache.flink.connector.file.table.FileSystemTableSink.toStagingPath()>
calls method <org.apache.flink.util.Preconditions.checkState(boolean,
java.lang.Object)> in (FileSystemTableSink.java:380)
Method
<org.apache.flink.connector.file.table.FileSystemTableSource.listPartitions()>
calls method
<org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndPaths(org.apache.flink.core.fs.FileSystem,
org.apache.flink.core.fs.Path, int)> in (FileSystemTableSource.java:328)
Method <org.apache.flink.connector.file.table.FileSystemTableSource.paths()>
has return type <[Lorg.apache.flink.core.fs.Path;> in
(FileSystemTableSource.java:0)
Method <org.apache.flink.connector.file.table.FileSystemTableSource.paths()>
references method
<org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)>
in (FileSystemTableSource.java:295)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
index f866b8ec230..3b73e95e497 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
@@ -19,6 +19,7 @@
package org.apache.flink.connector.file.table;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
@@ -28,12 +29,14 @@ import org.apache.flink.core.fs.Path;
import
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -56,7 +59,7 @@ public class FileSystemOutputFormat<T>
private final TableMetaStoreFactory msFactory;
private final boolean overwrite;
private final boolean isToLocal;
- private final Path tmpPath;
+ private final Path stagingPath;
private final String[] partitionColumns;
private final boolean dynamicGrouped;
private final LinkedHashMap<String, String> staticPartitions;
@@ -74,7 +77,7 @@ public class FileSystemOutputFormat<T>
TableMetaStoreFactory msFactory,
boolean overwrite,
boolean isToLocal,
- Path tmpPath,
+ Path stagingPath,
String[] partitionColumns,
boolean dynamicGrouped,
LinkedHashMap<String, String> staticPartitions,
@@ -87,7 +90,7 @@ public class FileSystemOutputFormat<T>
this.msFactory = msFactory;
this.overwrite = overwrite;
this.isToLocal = isToLocal;
- this.tmpPath = tmpPath;
+ this.stagingPath = stagingPath;
this.partitionColumns = partitionColumns;
this.dynamicGrouped = dynamicGrouped;
this.staticPartitions = staticPartitions;
@@ -96,6 +99,22 @@ public class FileSystemOutputFormat<T>
this.outputFileConfig = outputFileConfig;
this.identifier = identifier;
this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
+
+ createStagingDirectory(this.stagingPath);
+ }
+
+ private static void createStagingDirectory(Path stagingPath) {
+ try {
+ final FileSystem stagingFileSystem = stagingPath.getFileSystem();
+ Preconditions.checkState(
+ !stagingFileSystem.exists(stagingPath),
+ "Staging dir %s already exists",
+ stagingPath);
+ stagingFileSystem.mkdirs(stagingPath);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "An IO error occurred while accessing the staging
FileSystem.", e);
+ }
}
@Override
@@ -108,7 +127,7 @@ public class FileSystemOutputFormat<T>
Thread.currentThread().getContextClassLoader(),
() -> {
try {
- return
fsFactory.create(tmpPath.toUri());
+ return
fsFactory.create(stagingPath.toUri());
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -120,7 +139,7 @@ public class FileSystemOutputFormat<T>
fsFactory,
msFactory,
overwrite,
- tmpPath,
+ stagingPath,
partitionColumns.length,
isToLocal,
identifier,
@@ -141,7 +160,7 @@ public class FileSystemOutputFormat<T>
throw new TableException("Exception in finalizeGlobal", e);
} finally {
try {
- fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
+ fsFactory.create(stagingPath.toUri()).delete(stagingPath,
true);
} catch (IOException ignore) {
}
}
@@ -158,7 +177,7 @@ public class FileSystemOutputFormat<T>
PartitionTempFileManager fileManager =
new PartitionTempFileManager(
fsFactory,
- tmpPath,
+ stagingPath,
context.getTaskNumber(),
context.getAttemptNumber(),
outputFileConfig);
@@ -203,7 +222,7 @@ public class FileSystemOutputFormat<T>
private String[] partitionColumns;
private OutputFormatFactory<T> formatFactory;
private TableMetaStoreFactory metaStoreFactory;
- private Path tmpPath;
+ private Path stagingPath;
private LinkedHashMap<String, String> staticPartitions = new
LinkedHashMap<>();
private boolean dynamicGrouped = false;
@@ -258,11 +277,23 @@ public class FileSystemOutputFormat<T>
return this;
}
- public Builder<T> setTempPath(Path tmpPath) {
- this.tmpPath = tmpPath;
+ public Builder<T> setPath(Path parentPath) {
+ this.stagingPath = toStagingPath(parentPath);
return this;
}
+ @VisibleForTesting
+ Builder<T> setStagingPath(Path stagingPath) {
+ this.stagingPath = stagingPath;
+ return this;
+ }
+
+ private Path toStagingPath(Path parentPath) {
+ return new Path(
+ parentPath,
+ String.format(".staging_%d_%s",
System.currentTimeMillis(), UUID.randomUUID()));
+ }
+
public Builder<T> setPartitionComputer(PartitionComputer<T> computer) {
this.computer = computer;
return this;
@@ -288,7 +319,7 @@ public class FileSystemOutputFormat<T>
checkNotNull(partitionColumns, "partitionColumns should not be
null");
checkNotNull(formatFactory, "formatFactory should not be null");
checkNotNull(metaStoreFactory, "metaStoreFactory should not be
null");
- checkNotNull(tmpPath, "tmpPath should not be null");
+ checkNotNull(stagingPath, "stagingPath should not be null");
checkNotNull(computer, "partitionComputer should not be null");
return new FileSystemOutputFormat<>(
@@ -296,7 +327,7 @@ public class FileSystemOutputFormat<T>
metaStoreFactory,
overwrite,
isToLocal,
- tmpPath,
+ stagingPath,
partitionColumns,
dynamicGrouped,
staticPartitions,
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
index b61c0cd08db..e088a51664f 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
@@ -184,7 +184,7 @@ public class FileSystemTableSink extends
AbstractFileSystemTable
.setMetaStoreFactory(new EmptyMetaStoreFactory(path))
.setOverwrite(overwrite)
.setStaticPartitions(staticPartitions)
- .setTempPath(toStagingPath())
+ .setPath(path)
.setOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID())
@@ -373,19 +373,6 @@ public class FileSystemTableSink extends
AbstractFileSystemTable
};
}
- private Path toStagingPath() {
- Path stagingDir = new Path(path, ".staging_" +
System.currentTimeMillis());
- try {
- FileSystem fs = stagingDir.getFileSystem();
- Preconditions.checkState(
- fs.exists(stagingDir) || fs.mkdirs(stagingDir),
- "Failed to create staging dir " + stagingDir);
- return stagingDir;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
@SuppressWarnings("unchecked")
private OutputFormatFactory<RowData> createOutputFormatFactory(Context
sinkContext) {
Object writer = createWriter(sinkContext);
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
index 964304d5ae7..85062cf215e 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
@@ -28,6 +28,8 @@ import
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowUtils;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -37,24 +39,41 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.entry;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link FileSystemOutputFormat}. */
class FileSystemOutputFormatTest {
- @TempDir private java.nio.file.Path tmpPath;
@TempDir private java.nio.file.Path outputPath;
+ @TempDir private java.nio.file.Path stagingBaseDir;
+
private final TestingFinalizationContext finalizationContext = new
TestingFinalizationContext();
+ private static final Supplier<List<StreamRecord<Row>>>
DEFAULT_INPUT_SUPPLIER =
+ () ->
+ Arrays.asList(
+ new StreamRecord<>(Row.of("a1", 1, "p1"), 1L),
+ new StreamRecord<>(Row.of("a2", 2, "p1"), 1L),
+ new StreamRecord<>(Row.of("a2", 2, "p2"), 1L),
+ new StreamRecord<>(Row.of("a3", 3, "p1"), 1L));
+
+ private static final Supplier<List<String>> DEFAULT_OUTPUT_SUPPLIER =
+ () ->
+ Collections.singletonList(
+ createFileContent("a1,1,p1", "a2,2,p1", "a2,2,p2",
"a3,3,p1"));
+
private static Map<File, String> getFileContentByPath(java.nio.file.Path
directory)
throws IOException {
Map<File, String> contents = new HashMap<>(4);
@@ -70,6 +89,10 @@ class FileSystemOutputFormatTest {
return contents;
}
+ private static String createFileContent(String... rows) {
+ return Arrays.stream(rows).collect(Collectors.joining("\n", "", "\n"));
+ }
+
@BeforeEach
void before() {
RowUtils.USE_LEGACY_TO_STRING = true;
@@ -83,7 +106,7 @@ class FileSystemOutputFormatTest {
@Test
void testClosingWithoutInput() throws Exception {
try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
- createSink(false, false, false, new LinkedHashMap<>(), new
AtomicReference<>())) {
+ createTestHarness(createSinkFormat(false, false, false, new
LinkedHashMap<>()))) {
testHarness.setup();
testHarness.open();
}
@@ -91,149 +114,181 @@ class FileSystemOutputFormatTest {
@Test
void testNonPartition() throws Exception {
- AtomicReference<FileSystemOutputFormat<Row>> ref = new
AtomicReference<>();
- try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
- createSink(false, false, false, new LinkedHashMap<>(), ref)) {
- writeUnorderedRecords(testHarness);
- assertThat(getFileContentByPath(tmpPath)).hasSize(1);
- }
-
- ref.get().finalizeGlobal(finalizationContext);
- Map<File, String> content = getFileContentByPath(outputPath);
- assertThat(content.values())
- .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" +
"a3,3,p1\n");
- }
-
- private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row,
Object> testHarness)
- throws Exception {
- testHarness.setup();
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"),
1L));
- testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"),
1L));
- testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"),
1L));
- testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"),
1L));
+ checkWriteAndCommit(
+ false,
+ false,
+ false,
+ new LinkedHashMap<>(),
+ DEFAULT_INPUT_SUPPLIER,
+ DEFAULT_OUTPUT_SUPPLIER);
}
@Test
void testOverrideNonPartition() throws Exception {
testNonPartition();
-
- AtomicReference<FileSystemOutputFormat<Row>> ref = new
AtomicReference<>();
- try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
- createSink(true, false, false, new LinkedHashMap<>(), ref)) {
- writeUnorderedRecords(testHarness);
- assertThat(getFileContentByPath(tmpPath)).hasSize(1);
- }
-
- ref.get().finalizeGlobal(finalizationContext);
- Map<File, String> content = getFileContentByPath(outputPath);
- assertThat(content).hasSize(1);
- assertThat(content.values())
- .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" +
"a3,3,p1\n");
- assertThat(new File(tmpPath.toUri())).doesNotExist();
+ checkWriteAndCommit(
+ true,
+ false,
+ false,
+ new LinkedHashMap<>(),
+ DEFAULT_INPUT_SUPPLIER,
+ DEFAULT_OUTPUT_SUPPLIER);
}
@Test
void testStaticPartition() throws Exception {
- AtomicReference<FileSystemOutputFormat<Row>> ref = new
AtomicReference<>();
LinkedHashMap<String, String> staticParts = new LinkedHashMap<>();
staticParts.put("c", "p1");
- try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
- createSink(false, true, false, staticParts, ref)) {
- testHarness.setup();
- testHarness.open();
- testHarness.processElement(new StreamRecord<>(Row.of("a1", 1),
1L));
- testHarness.processElement(new StreamRecord<>(Row.of("a2", 2),
1L));
- testHarness.processElement(new StreamRecord<>(Row.of("a2", 2),
1L));
- testHarness.processElement(new StreamRecord<>(Row.of("a3", 3),
1L));
- assertThat(getFileContentByPath(tmpPath)).hasSize(1);
- }
-
- ref.get().finalizeGlobal(finalizationContext);
- Map<File, String> content = getFileContentByPath(outputPath);
- assertThat(content).hasSize(1);
-
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
- assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" +
"a2,2\n" + "a3,3\n");
- assertThat(new File(tmpPath.toUri())).doesNotExist();
+ checkWriteAndCommit(
+ false,
+ true,
+ false,
+ staticParts,
+ () ->
+ Arrays.asList(
+ new StreamRecord<>(Row.of("a1", 1), 1L),
+ new StreamRecord<>(Row.of("a2", 2), 1L),
+ new StreamRecord<>(Row.of("a2", 2), 1L),
+ new StreamRecord<>(Row.of("a3", 3), 1L)),
+ () ->
+ Collections.singletonMap(
+ "c=p1", createFileContent("a1,1", "a2,2",
"a2,2", "a3,3")));
}
@Test
void testDynamicPartition() throws Exception {
- AtomicReference<FileSystemOutputFormat<Row>> ref = new
AtomicReference<>();
- try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
- createSink(false, true, false, new LinkedHashMap<>(), ref)) {
- writeUnorderedRecords(testHarness);
- assertThat(getFileContentByPath(tmpPath)).hasSize(2);
- }
-
- ref.get().finalizeGlobal(finalizationContext);
- Map<File, String> content = getFileContentByPath(outputPath);
- Map<String, String> sortedContent = new TreeMap<>();
- content.forEach((file, s) ->
sortedContent.put(file.getParentFile().getName(), s));
-
- assertThat(sortedContent).hasSize(2);
- assertThat(sortedContent)
- .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"),
entry("c=p2", "a2,2\n"));
- assertThat(new File(tmpPath.toUri())).doesNotExist();
+ checkWriteAndCommit(
+ false,
+ true,
+ false,
+ new LinkedHashMap<>(),
+ DEFAULT_INPUT_SUPPLIER,
+ () ->
+ ImmutableMap.of(
+ "c=p1",
+ createFileContent("a1,1", "a2,2", "a3,3"),
+ "c=p2",
+ createFileContent("a2,2")));
}
@Test
void testGroupedDynamicPartition() throws Exception {
- AtomicReference<FileSystemOutputFormat<Row>> ref = new
AtomicReference<>();
+ checkWriteAndCommit(
+ false,
+ true,
+ true,
+ new LinkedHashMap<>(),
+ () ->
+ Arrays.asList(
+ new StreamRecord<>(Row.of("a1", 1, "p1"), 1L),
+ new StreamRecord<>(Row.of("a2", 2, "p1"), 1L),
+ new StreamRecord<>(Row.of("a3", 3, "p1"), 1L),
+ new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)),
+ () ->
+ ImmutableMap.of(
+ "c=p1",
+ createFileContent("a1,1", "a2,2", "a3,3"),
+ "c=p2",
+ createFileContent("a2,2")));
+ }
+
+ @Test
+ void testGetUniqueStagingDirectory() throws IOException {
+ final Path alreadyExistingStagingDir = new
Path(outputPath.toFile().getAbsolutePath());
+
assertThat(alreadyExistingStagingDir.getFileSystem().exists(alreadyExistingStagingDir))
+ .as("The staging folder should already exist.")
+ .isTrue();
+
+ final FileSystemOutputFormat.Builder<Row> builder =
+ new FileSystemOutputFormat.Builder<Row>()
+ .setPartitionColumns(new String[0])
+ .setFormatFactory(TextOutputFormat::new)
+ .setMetaStoreFactory(
+ new
FileSystemCommitterTest.TestMetaStoreFactory(
+ new
Path(outputPath.toFile().getAbsolutePath())))
+ .setPartitionComputer(
+ new RowPartitionComputer("default", new
String[0], new String[0]))
+ .setStagingPath(alreadyExistingStagingDir);
+
+ assertThatThrownBy(builder::build)
+ .as("Reusing a folder should cause an error.")
+ .isInstanceOf(IllegalStateException.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void checkWriteAndCommit(
+ boolean override,
+ boolean partitioned,
+ boolean dynamicGrouped,
+ LinkedHashMap<String, String> staticPartitions,
+ Supplier<List<StreamRecord<Row>>> inputSupplier,
+ Supplier<?> outputSupplier)
+ throws Exception {
+ Object expectedOutput = outputSupplier.get();
+ int expectedFileNum =
+ (partitioned)
+ ? ((Map<String, String>) expectedOutput).size()
+ : ((List<String>) expectedOutput).size();
+ FileSystemOutputFormat<Row> outputFormat =
+ createSinkFormat(override, partitioned, dynamicGrouped,
staticPartitions);
try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
- createSink(false, true, true, new LinkedHashMap<>(), ref)) {
+ createTestHarness(outputFormat)) {
testHarness.setup();
testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(Row.of("a1", 1,
"p1"), 1L));
- testHarness.processElement(new StreamRecord<>(Row.of("a2", 2,
"p1"), 1L));
- testHarness.processElement(new StreamRecord<>(Row.of("a3", 3,
"p1"), 1L));
- testHarness.processElement(new StreamRecord<>(Row.of("a2", 2,
"p2"), 1L));
- assertThat(getFileContentByPath(tmpPath)).hasSize(2);
+ for (StreamRecord<Row> record : inputSupplier.get()) {
+ testHarness.processElement(record);
+ }
+
assertThat(getFileContentByPath(stagingBaseDir)).hasSize(expectedFileNum);
}
- ref.get().finalizeGlobal(finalizationContext);
- Map<File, String> content = getFileContentByPath(outputPath);
- Map<String, String> sortedContent = new TreeMap<>();
- content.forEach((file, s) ->
sortedContent.put(file.getParentFile().getName(), s));
-
- assertThat(sortedContent).hasSize(2);
- assertThat(sortedContent.get("c=p1")).isEqualTo("a1,1\n" + "a2,2\n" +
"a3,3\n");
- assertThat(sortedContent.get("c=p2")).isEqualTo("a2,2\n");
- assertThat(new File(tmpPath.toUri())).doesNotExist();
+ outputFormat.finalizeGlobal(finalizationContext);
+ assertThat(stagingBaseDir).isEmptyDirectory();
+
+ Map<File, String> fileToContent = getFileContentByPath(outputPath);
+ assertThat(fileToContent).hasSize(expectedFileNum);
+ if (partitioned) {
+ Map<String, String> partitionToContent =
+ fileToContent.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ e ->
e.getKey().getParentFile().getName(),
+ Map.Entry::getValue));
+
+ assertThat(partitionToContent)
+ .containsExactlyInAnyOrderEntriesOf((Map<String, String>)
expectedOutput);
+ } else {
+ assertThat(fileToContent.values())
+ .containsExactlyInAnyOrderElementsOf((List<String>)
expectedOutput);
+ }
}
- private OneInputStreamOperatorTestHarness<Row, Object> createSink(
+ private FileSystemOutputFormat<Row> createSinkFormat(
boolean override,
boolean partition,
boolean dynamicGrouped,
- LinkedHashMap<String, String> staticPartitions,
- AtomicReference<FileSystemOutputFormat<Row>> sinkRef)
- throws Exception {
+ LinkedHashMap<String, String> staticPartitions) {
String[] columnNames = new String[] {"a", "b", "c"};
String[] partitionColumns = partition ? new String[] {"c"} : new
String[0];
+ Path path = new Path(outputPath.toString());
+ TableMetaStoreFactory msFactory = new
FileSystemCommitterTest.TestMetaStoreFactory(path);
+ return new FileSystemOutputFormat.Builder<Row>()
+ .setMetaStoreFactory(msFactory)
+ .setPath(new Path(stagingBaseDir.toString()))
+ .setOverwrite(override)
+ .setPartitionColumns(partitionColumns)
+ .setPartitionComputer(
+ new RowPartitionComputer("default", columnNames,
partitionColumns))
+ .setFormatFactory(TextOutputFormat::new)
+ .setDynamicGrouped(dynamicGrouped)
+ .setStaticPartitions(staticPartitions)
+ .build();
+ }
- TableMetaStoreFactory msFactory =
- new FileSystemCommitterTest.TestMetaStoreFactory(new
Path(outputPath.toString()));
- FileSystemOutputFormat<Row> sink =
- new FileSystemOutputFormat.Builder<Row>()
- .setMetaStoreFactory(msFactory)
- .setTempPath(new Path(tmpPath.toString()))
- .setOverwrite(override)
- .setPartitionColumns(partitionColumns)
- .setPartitionComputer(
- new RowPartitionComputer("default",
columnNames, partitionColumns))
- .setFormatFactory(TextOutputFormat::new)
- .setDynamicGrouped(dynamicGrouped)
- .setStaticPartitions(staticPartitions)
- .build();
-
- sinkRef.set(sink);
-
+ private OneInputStreamOperatorTestHarness<Row, Object> createTestHarness(
+ FileSystemOutputFormat<Row> outputFormat) throws Exception {
return new OneInputStreamOperatorTestHarness<>(
- new StreamSink<>(new OutputFormatSinkFunction<>(sink)),
+ new StreamSink<>(new OutputFormatSinkFunction<>(outputFormat)),
// test parallelism
3,
3,
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index f55fed51d38..d7e9a0587a4 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -607,8 +607,7 @@ public class HiveTableSink implements DynamicTableSink,
SupportsPartitioning, Su
builder.setOverwrite(overwrite);
builder.setIsToLocal(isToLocal);
builder.setStaticPartitions(staticPartitionSpec);
- builder.setTempPath(
- new
org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
+ builder.setPath(new org.apache.flink.core.fs.Path(stagingParentDir));
builder.setOutputFileConfig(fileNaming);
builder.setIdentifier(identifier);
builder.setPartitionCommitPolicyFactory(