Repository: flink Updated Branches: refs/heads/master b5b4fb9bc -> b12acea2a
[FLINK-9603] Fix part counter loop in BucketingSink to account for part suffix. 1. all logic, that is responsible for path assembly moved into method; 2. test logic of part file indexing, when in-progress/ pending/ final part files already exists in bucket; 3. test the same logic, when part file has suffix This closes #6176. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b12acea2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b12acea2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b12acea2 Branch: refs/heads/master Commit: b12acea2afcd75a9183aca549fea426e86bfc5de Parents: b5b4fb9 Author: Rinat Sharipov <r.shari...@cleverdata.ru> Authored: Sun Jun 17 15:03:54 2018 +0200 Committer: kkloudas <kklou...@gmail.com> Committed: Mon Jul 9 13:01:29 2018 +0200 ---------------------------------------------------------------------- .../connectors/fs/bucketing/BucketingSink.java | 13 ++-- .../fs/bucketing/BucketingSinkTest.java | 79 ++++++++++++++++++++ 2 files changed, 86 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b12acea2/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 23e4e0c..34fb1b7 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -552,21 +552,17 @@ public class BucketingSink<T> // clean the base directory in case of rescaling. int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter); + Path partPath = assemblePartPath(bucketPath, subtaskIndex, bucketState.partCounter); while (fs.exists(partPath) || fs.exists(getPendingPathFor(partPath)) || fs.exists(getInProgressPathFor(partPath))) { bucketState.partCounter++; - partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter); + partPath = assemblePartPath(bucketPath, subtaskIndex, bucketState.partCounter); } // Record the creation time of the bucket bucketState.creationTime = processingTimeService.getCurrentProcessingTime(); - if (partSuffix != null) { - partPath = partPath.suffix(partSuffix); - } - // increase, so we don't have to check for this name next time bucketState.partCounter++; @@ -667,6 +663,11 @@ public class BucketingSink<T> return m; } + private Path assemblePartPath(Path bucket, int subtaskIndex, int partIndex) { + String localPartSuffix = partSuffix != null ? partSuffix : ""; + return new Path(bucket, String.format("%s-%s-%s%s", partPrefix, subtaskIndex, partIndex, localPartSuffix)); + } + private Path getPendingPathFor(Path path) { return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix); } http://git-wip-us.apache.org/repos/asf/flink/blob/b12acea2/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index dc84846..ee425ff 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -64,6 +64,8 @@ import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; @@ -72,6 +74,8 @@ import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTe import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.PENDING_SUFFIX; import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.VALID_LENGTH_SUFFIX; import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.checkLocalFs; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; /** * Tests for the {@link BucketingSink}. @@ -899,6 +903,81 @@ public class BucketingSinkTest extends TestLogger { inStream.close(); } + @Test + public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState() + throws Exception { + testThatPartIndexIsIncremented(".my", "part-0-0.my" + IN_PROGRESS_SUFFIX); + } + + @Test + public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInPendingState() + throws Exception { + testThatPartIndexIsIncremented(".my", "part-0-0.my" + PENDING_SUFFIX); + } + + @Test + public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInFinalState() + throws Exception { + testThatPartIndexIsIncremented(".my", "part-0-0.my"); + } + + @Test + public void testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInProgressState() + throws Exception { + testThatPartIndexIsIncremented(null, "part-0-0" + IN_PROGRESS_SUFFIX); + } + + @Test + public void testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInPendingState() + throws Exception { + testThatPartIndexIsIncremented(null, "part-0-0" + PENDING_SUFFIX); + } + + @Test + public void testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInFinalState() + throws Exception { + testThatPartIndexIsIncremented(null, "part-0-0"); + } + + private void testThatPartIndexIsIncremented(String partSuffix, String existingPartFile) throws Exception { + File outDir = tempFolder.newFolder(); + long inactivityInterval = 100; + + java.nio.file.Path bucket = Paths.get(outDir.getPath()); + Files.createFile(bucket.resolve(existingPartFile)); + + String basePath = outDir.getAbsolutePath(); + BucketingSink<String> sink = new BucketingSink<String>(basePath) + .setBucketer(new BasePathBucketer<>()) + .setInactiveBucketCheckInterval(inactivityInterval) + .setInactiveBucketThreshold(inactivityInterval) + .setPartPrefix(PART_PREFIX) + .setInProgressPrefix("") + .setPendingPrefix("") + .setValidLengthPrefix("") + .setInProgressSuffix(IN_PROGRESS_SUFFIX) + .setPendingSuffix(PENDING_SUFFIX) + .setValidLengthSuffix(VALID_LENGTH_SUFFIX) + .setPartSuffix(partSuffix) + .setBatchSize(0); + + try (OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0)) { + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>("test1", 1L)); + + testHarness.setProcessingTime(101L); + testHarness.snapshot(0, 0); + testHarness.notifyOfCompletedCheckpoint(0); + } + + String expectedFileName = partSuffix == null ? "part-0-1" : "part-0-1" + partSuffix; + assertThat(Files.exists(bucket.resolve(expectedFileName)), is(true)); + } + private static class StreamWriterWithConfigCheck<K, V> extends AvroKeyValueSinkWriter<K, V> { private Map<String, String> properties; private String key;