Repository: flink
Updated Branches:
  refs/heads/master b5b4fb9bc -> b12acea2a


[FLINK-9603] Fix part counter loop in BucketingSink to account for part suffix.

1. all logic, that is responsible for path assembly moved into method;
2. test logic of part file indexing, when in-progress/ pending/ final
   part files already exists in bucket;
3. test the same logic, when part file has suffix

This closes #6176.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b12acea2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b12acea2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b12acea2

Branch: refs/heads/master
Commit: b12acea2afcd75a9183aca549fea426e86bfc5de
Parents: b5b4fb9
Author: Rinat Sharipov <r.shari...@cleverdata.ru>
Authored: Sun Jun 17 15:03:54 2018 +0200
Committer: kkloudas <kklou...@gmail.com>
Committed: Mon Jul 9 13:01:29 2018 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  | 13 ++--
 .../fs/bucketing/BucketingSinkTest.java         | 79 ++++++++++++++++++++
 2 files changed, 86 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b12acea2/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 23e4e0c..34fb1b7 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -552,21 +552,17 @@ public class BucketingSink<T>
                // clean the base directory in case of rescaling.
 
                int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-               Path partPath = new Path(bucketPath, partPrefix + "-" + 
subtaskIndex + "-" + bucketState.partCounter);
+               Path partPath = assemblePartPath(bucketPath, subtaskIndex, 
bucketState.partCounter);
                while (fs.exists(partPath) ||
                                fs.exists(getPendingPathFor(partPath)) ||
                                fs.exists(getInProgressPathFor(partPath))) {
                        bucketState.partCounter++;
-                       partPath = new Path(bucketPath, partPrefix + "-" + 
subtaskIndex + "-" + bucketState.partCounter);
+                       partPath = assemblePartPath(bucketPath, subtaskIndex, 
bucketState.partCounter);
                }
 
                // Record the creation time of the bucket
                bucketState.creationTime = 
processingTimeService.getCurrentProcessingTime();
 
-               if (partSuffix != null) {
-                       partPath = partPath.suffix(partSuffix);
-               }
-
                // increase, so we don't have to check for this name next time
                bucketState.partCounter++;
 
@@ -667,6 +663,11 @@ public class BucketingSink<T>
                return m;
        }
 
+       private Path assemblePartPath(Path bucket, int subtaskIndex, int 
partIndex) {
+               String localPartSuffix = partSuffix != null ? partSuffix : "";
+               return new Path(bucket, String.format("%s-%s-%s%s", partPrefix, 
subtaskIndex, partIndex, localPartSuffix));
+       }
+
        private Path getPendingPathFor(Path path) {
                return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b12acea2/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index dc84846..ee425ff 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -64,6 +64,8 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -72,6 +74,8 @@ import static 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTe
 import static 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.PENDING_SUFFIX;
 import static 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.VALID_LENGTH_SUFFIX;
 import static 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.checkLocalFs;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 /**
  * Tests for the {@link BucketingSink}.
@@ -899,6 +903,81 @@ public class BucketingSinkTest extends TestLogger {
                inStream.close();
        }
 
+       @Test
+       public void 
testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState()
+               throws Exception {
+               testThatPartIndexIsIncremented(".my", "part-0-0.my" + 
IN_PROGRESS_SUFFIX);
+       }
+
+       @Test
+       public void 
testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInPendingState()
+               throws Exception {
+               testThatPartIndexIsIncremented(".my", "part-0-0.my" + 
PENDING_SUFFIX);
+       }
+
+       @Test
+       public void 
testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInFinalState()
+               throws Exception {
+               testThatPartIndexIsIncremented(".my", "part-0-0.my");
+       }
+
+       @Test
+       public void 
testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInProgressState()
+               throws Exception {
+               testThatPartIndexIsIncremented(null, "part-0-0" + 
IN_PROGRESS_SUFFIX);
+       }
+
+       @Test
+       public void 
testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInPendingState()
+               throws Exception {
+               testThatPartIndexIsIncremented(null, "part-0-0" + 
PENDING_SUFFIX);
+       }
+
+       @Test
+       public void 
testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInFinalState()
+               throws Exception {
+               testThatPartIndexIsIncremented(null, "part-0-0");
+       }
+
+       private void testThatPartIndexIsIncremented(String partSuffix, String 
existingPartFile) throws Exception {
+               File outDir = tempFolder.newFolder();
+               long inactivityInterval = 100;
+
+               java.nio.file.Path bucket = Paths.get(outDir.getPath());
+               Files.createFile(bucket.resolve(existingPartFile));
+
+               String basePath = outDir.getAbsolutePath();
+               BucketingSink<String> sink = new BucketingSink<String>(basePath)
+                       .setBucketer(new BasePathBucketer<>())
+                       .setInactiveBucketCheckInterval(inactivityInterval)
+                       .setInactiveBucketThreshold(inactivityInterval)
+                       .setPartPrefix(PART_PREFIX)
+                       .setInProgressPrefix("")
+                       .setPendingPrefix("")
+                       .setValidLengthPrefix("")
+                       .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+                       .setPendingSuffix(PENDING_SUFFIX)
+                       .setValidLengthSuffix(VALID_LENGTH_SUFFIX)
+                       .setPartSuffix(partSuffix)
+                       .setBatchSize(0);
+
+               try (OneInputStreamOperatorTestHarness<String, Object> 
testHarness = createTestSink(sink, 1, 0)) {
+                       testHarness.setup();
+                       testHarness.open();
+
+                       testHarness.setProcessingTime(0L);
+
+                       testHarness.processElement(new StreamRecord<>("test1", 
1L));
+
+                       testHarness.setProcessingTime(101L);
+                       testHarness.snapshot(0, 0);
+                       testHarness.notifyOfCompletedCheckpoint(0);
+               }
+
+               String expectedFileName = partSuffix == null ? "part-0-1" : 
"part-0-1" + partSuffix;
+               assertThat(Files.exists(bucket.resolve(expectedFileName)), 
is(true));
+       }
+
        private static class StreamWriterWithConfigCheck<K, V> extends 
AvroKeyValueSinkWriter<K, V> {
                private Map<String, String> properties;
                private String key;

Reply via email to