Repository: flink
Updated Branches:
  refs/heads/master 73088749e -> 9e348d32c


[FLINK-9921][DataStream API] Update RollingPolicy interface


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e348d32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e348d32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e348d32

Branch: refs/heads/master
Commit: 9e348d32c5182ea0a3c2b0dfd03560806b029d9d
Parents: 7308874
Author: kkloudas <[email protected]>
Authored: Fri Jul 20 17:03:16 2018 +0200
Committer: kkloudas <[email protected]>
Committed: Mon Jul 23 14:15:02 2018 +0200

----------------------------------------------------------------------
 .../api/functions/sink/filesystem/Buckets.java  | 13 +++-------
 .../sink/filesystem/RollingPolicy.java          |  5 ++--
 .../sink/filesystem/StreamingFileSink.java      |  7 +++--
 .../rolling/policies/DefaultRollingPolicy.java  | 27 +++++---------------
 .../policies/OnCheckpointRollingPolicy.java     |  4 +--
 .../filesystem/LocalStreamingFileSinkTest.java  |  2 +-
 .../sink/filesystem/RollingPolicyTest.java      |  8 +++---
 .../functions/sink/filesystem/TestUtils.java    |  4 +--
 8 files changed, 26 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 6afba17..e6f8c00 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -61,7 +61,7 @@ public class Buckets<IN, BucketID> {
 
        private final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory;
 
-       private final RollingPolicy<BucketID> rollingPolicy;
+       private final RollingPolicy<IN, BucketID> rollingPolicy;
 
        // --------------------------- runtime fields 
-----------------------------
 
@@ -95,7 +95,7 @@ public class Buckets<IN, BucketID> {
                        final Bucketer<IN, BucketID> bucketer,
                        final BucketFactory<IN, BucketID> bucketFactory,
                        final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory,
-                       final RollingPolicy<BucketID> rollingPolicy,
+                       final RollingPolicy<IN, BucketID> rollingPolicy,
                        final int subtaskIndex) throws IOException {
 
                this.basePath = Preconditions.checkNotNull(basePath);
@@ -189,7 +189,6 @@ public class Buckets<IN, BucketID> {
 
        void snapshotState(
                        final long checkpointId,
-                       final long checkpointTimestamp,
                        final ListState<byte[]> bucketStates,
                        final ListState<Long> partCounterState) throws 
Exception {
 
@@ -201,11 +200,7 @@ public class Buckets<IN, BucketID> {
                for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
                        final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
 
-                       if (info != null &&
-                                       
(rollingPolicy.shouldRollOnCheckpoint(info) ||
-                                       rollingPolicy.shouldRollOnEvent(info) ||
-                                       
rollingPolicy.shouldRollOnProcessingTime(info, checkpointTimestamp))
-                       ) {
+                       if (info != null && 
rollingPolicy.shouldRollOnCheckpoint(info)) {
                                // we also check here so that we do not have to 
always
                                // wait for the "next" element to arrive.
                                bucket.closePartFile();
@@ -249,7 +244,7 @@ public class Buckets<IN, BucketID> {
                }
 
                final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
-               if (info == null || rollingPolicy.shouldRollOnEvent(info)) {
+               if (info == null || rollingPolicy.shouldRollOnEvent(info, 
value)) {
                        bucket.rollPartFile(currentProcessingTime);
                }
                bucket.write(value, currentProcessingTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
index 24c38aa..b1354a9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
  * rolls its currently open part file and opens a new one.
  */
 @PublicEvolving
-public interface RollingPolicy<BucketID> extends Serializable {
+public interface RollingPolicy<IN, BucketID> extends Serializable {
 
        /**
         * Determines if the in-progress part file for a bucket should roll on 
every checkpoint.
@@ -39,10 +39,11 @@ public interface RollingPolicy<BucketID> extends 
Serializable {
 
        /**
         * Determines if the in-progress part file for a bucket should roll 
based on its current state, e.g. its size.
+        * @param element the element being processed.
         * @param partFileState the state of the currently open part file of 
the bucket.
         * @return {@code True} if the part file should roll, {@link false} 
otherwise.
         */
-       boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState) 
throws IOException;
+       boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, 
IN element) throws IOException;
 
        /**
         * Determines if the in-progress part file for a bucket should roll 
based on a time condition.

http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index c208079..0ebcc4f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -194,7 +194,7 @@ public class StreamingFileSink<IN>
 
                private Bucketer<IN, BucketID> bucketer;
 
-               private RollingPolicy<BucketID> rollingPolicy;
+               private RollingPolicy<IN, BucketID> rollingPolicy;
 
                private BucketFactory<IN, BucketID> bucketFactory = new 
DefaultBucketFactory<>();
 
@@ -215,12 +215,12 @@ public class StreamingFileSink<IN>
                        return this;
                }
 
-               public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withRollingPolicy(final RollingPolicy<BucketID> policy) {
+               public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
                        this.rollingPolicy = Preconditions.checkNotNull(policy);
                        return this;
                }
 
-               public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> 
withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<ID> 
policy) {
+               public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> 
withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<IN, 
ID> policy) {
                        @SuppressWarnings("unchecked")
                        StreamingFileSink.RowFormatBuilder<IN, ID> 
reInterpreted = (StreamingFileSink.RowFormatBuilder<IN, ID>) this;
                        reInterpreted.bucketer = 
Preconditions.checkNotNull(bucketer);
@@ -340,7 +340,6 @@ public class StreamingFileSink<IN>
 
                buckets.snapshotState(
                                context.getCheckpointId(),
-                               context.getCheckpointTimestamp(),
                                bucketStates,
                                maxPartCountersState);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
index a9ff617..15c3b4d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
@@ -37,7 +37,7 @@ import java.io.IOException;
  * </ol>
  */
 @PublicEvolving
-public final class DefaultRollingPolicy<BucketID> implements 
RollingPolicy<BucketID> {
+public final class DefaultRollingPolicy<IN, BucketID> implements 
RollingPolicy<IN, BucketID> {
 
        private static final long serialVersionUID = 1L;
 
@@ -67,32 +67,19 @@ public final class DefaultRollingPolicy<BucketID> 
implements RollingPolicy<Bucke
        }
 
        @Override
-       public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> 
partFileState) {
-               return false;
+       public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> 
partFileState) throws IOException {
+               return partFileState.getSize() > partSize;
        }
 
        @Override
-       public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) 
throws IOException {
-               if (partFileState == null) {
-                       // this means that there is no currently open part file.
-                       return true;
-               }
-
+       public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, 
IN element) throws IOException {
                return partFileState.getSize() > partSize;
        }
 
        @Override
        public boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> 
partFileState, final long currentTime) {
-               if (partFileState == null) {
-                       // this means that there is no currently open part file.
-                       return true;
-               }
-
-               if (currentTime - partFileState.getCreationTime() > 
rolloverInterval) {
-                       return true;
-               }
-
-               return currentTime - partFileState.getLastUpdateTime() > 
inactivityInterval;
+               return currentTime - partFileState.getCreationTime() > 
rolloverInterval ||
+                               currentTime - partFileState.getLastUpdateTime() 
> inactivityInterval;
        }
 
        /**
@@ -150,7 +137,7 @@ public final class DefaultRollingPolicy<BucketID> 
implements RollingPolicy<Bucke
                /**
                 * Creates the actual policy.
                 */
-               public <BucketID> DefaultRollingPolicy<BucketID> build() {
+               public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> 
build() {
                        return new DefaultRollingPolicy<>(partSize, 
rolloverInterval, inactivityInterval);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
index 4361941..df15981 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
 /**
  * A {@link RollingPolicy} which rolls on every checkpoint.
  */
-public class OnCheckpointRollingPolicy<BucketID> implements 
RollingPolicy<BucketID> {
+public class OnCheckpointRollingPolicy<IN, BucketID> implements 
RollingPolicy<IN, BucketID> {
 
        private static final long serialVersionUID = 1L;
 
@@ -34,7 +34,7 @@ public class OnCheckpointRollingPolicy<BucketID> implements 
RollingPolicy<Bucket
        }
 
        @Override
-       public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) {
+       public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, 
IN element) {
                return false;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index 6e942e9..7c23918 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -494,7 +494,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
                final TestBucketFactory first = new TestBucketFactory();
                final TestBucketFactory second = new TestBucketFactory();
 
-               final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy
+               final RollingPolicy<Tuple2<String, Integer>, String> 
rollingPolicy = DefaultRollingPolicy
                                .create()
                                .withMaxPartSize(2L)
                                .withRolloverInterval(100L)

http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index 61e1433..078a46b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -44,7 +44,7 @@ public class RollingPolicyTest {
        public void testDefaultRollingPolicy() throws Exception {
                final File outDir = TEMP_FOLDER.newFolder();
 
-               final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy
+               final RollingPolicy<Tuple2<String, Integer>, String> 
rollingPolicy = DefaultRollingPolicy
                                .create()
                                .withMaxPartSize(10L)
                                .withInactivityInterval(4L)
@@ -104,7 +104,7 @@ public class RollingPolicyTest {
        public void testRollOnCheckpointPolicy() throws Exception {
                final File outDir = TEMP_FOLDER.newFolder();
 
-               final RollingPolicy<String> rollingPolicy = new 
OnCheckpointRollingPolicy<>();
+               final RollingPolicy<Tuple2<String, Integer>, String> 
rollingPolicy = new OnCheckpointRollingPolicy<>();
 
                try (
                                
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness 
= TestUtils.createCustomRescalingTestSink(
@@ -159,7 +159,7 @@ public class RollingPolicyTest {
        public void testCustomRollingPolicy() throws Exception {
                final File outDir = TEMP_FOLDER.newFolder();
 
-               final RollingPolicy<String> rollingPolicy = new 
RollingPolicy<String>() {
+               final RollingPolicy<Tuple2<String, Integer>, String> 
rollingPolicy = new RollingPolicy<Tuple2<String, Integer>, String>() {
 
                        private static final long serialVersionUID = 1L;
 
@@ -169,7 +169,7 @@ public class RollingPolicyTest {
                        }
 
                        @Override
-                       public boolean shouldRollOnEvent(PartFileInfo<String> 
partFileState) throws IOException {
+                       public boolean shouldRollOnEvent(PartFileInfo<String> 
partFileState, Tuple2<String, Integer> element) throws IOException {
                                // this means that 2 elements will close the 
part file.
                                return partFileState.getSize() > 12L;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index 184e23e..9589c5a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -51,7 +51,7 @@ public class TestUtils {
                        long inactivityInterval,
                        long partMaxSize) throws Exception {
 
-               final RollingPolicy<String> rollingPolicy =
+               final RollingPolicy<Tuple2<String, Integer>, String> 
rollingPolicy =
                                DefaultRollingPolicy
                                                .create()
                                                .withMaxPartSize(partMaxSize)
@@ -84,7 +84,7 @@ public class TestUtils {
                        final long bucketCheckInterval,
                        final Bucketer<Tuple2<String, Integer>, String> 
bucketer,
                        final Encoder<Tuple2<String, Integer>> writer,
-                       final RollingPolicy<String> rollingPolicy,
+                       final RollingPolicy<Tuple2<String, Integer>, String> 
rollingPolicy,
                        final BucketFactory<Tuple2<String, Integer>, String> 
bucketFactory) throws Exception {
 
                StreamingFileSink<Tuple2<String, Integer>> sink = 
StreamingFileSink

Reply via email to