Repository: flink Updated Branches: refs/heads/master 73088749e -> 9e348d32c
[FLINK-9921][DataStream API] Update RollingPolicy interface Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e348d32 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e348d32 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e348d32 Branch: refs/heads/master Commit: 9e348d32c5182ea0a3c2b0dfd03560806b029d9d Parents: 7308874 Author: kkloudas <[email protected]> Authored: Fri Jul 20 17:03:16 2018 +0200 Committer: kkloudas <[email protected]> Committed: Mon Jul 23 14:15:02 2018 +0200 ---------------------------------------------------------------------- .../api/functions/sink/filesystem/Buckets.java | 13 +++------- .../sink/filesystem/RollingPolicy.java | 5 ++-- .../sink/filesystem/StreamingFileSink.java | 7 +++-- .../rolling/policies/DefaultRollingPolicy.java | 27 +++++--------------- .../policies/OnCheckpointRollingPolicy.java | 4 +-- .../filesystem/LocalStreamingFileSinkTest.java | 2 +- .../sink/filesystem/RollingPolicyTest.java | 8 +++--- .../functions/sink/filesystem/TestUtils.java | 4 +-- 8 files changed, 26 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java index 6afba17..e6f8c00 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java @@ -61,7 +61,7 @@ public class Buckets<IN, BucketID> { private final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory; - private final RollingPolicy<BucketID> rollingPolicy; + private final RollingPolicy<IN, BucketID> rollingPolicy; // --------------------------- runtime fields ----------------------------- @@ -95,7 +95,7 @@ public class Buckets<IN, BucketID> { final Bucketer<IN, BucketID> bucketer, final BucketFactory<IN, BucketID> bucketFactory, final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory, - final RollingPolicy<BucketID> rollingPolicy, + final RollingPolicy<IN, BucketID> rollingPolicy, final int subtaskIndex) throws IOException { this.basePath = Preconditions.checkNotNull(basePath); @@ -189,7 +189,6 @@ public class Buckets<IN, BucketID> { void snapshotState( final long checkpointId, - final long checkpointTimestamp, final ListState<byte[]> bucketStates, final ListState<Long> partCounterState) throws Exception { @@ -201,11 +200,7 @@ public class Buckets<IN, BucketID> { for (Bucket<IN, BucketID> bucket : activeBuckets.values()) { final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo(); - if (info != null && - (rollingPolicy.shouldRollOnCheckpoint(info) || - rollingPolicy.shouldRollOnEvent(info) || - rollingPolicy.shouldRollOnProcessingTime(info, checkpointTimestamp)) - ) { + if (info != null && rollingPolicy.shouldRollOnCheckpoint(info)) { // we also check here so that we do not have to always // wait for the "next" element to arrive. bucket.closePartFile(); @@ -249,7 +244,7 @@ public class Buckets<IN, BucketID> { } final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo(); - if (info == null || rollingPolicy.shouldRollOnEvent(info)) { + if (info == null || rollingPolicy.shouldRollOnEvent(info, value)) { bucket.rollPartFile(currentProcessingTime); } bucket.write(value, currentProcessingTime); http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java index 24c38aa..b1354a9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java @@ -28,7 +28,7 @@ import java.io.Serializable; * rolls its currently open part file and opens a new one. */ @PublicEvolving -public interface RollingPolicy<BucketID> extends Serializable { +public interface RollingPolicy<IN, BucketID> extends Serializable { /** * Determines if the in-progress part file for a bucket should roll on every checkpoint. @@ -39,10 +39,11 @@ public interface RollingPolicy<BucketID> extends Serializable { /** * Determines if the in-progress part file for a bucket should roll based on its current state, e.g. its size. + * @param element the element being processed. * @param partFileState the state of the currently open part file of the bucket. * @return {@code True} if the part file should roll, {@link false} otherwise. */ - boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState) throws IOException; + boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, IN element) throws IOException; /** * Determines if the in-progress part file for a bucket should roll based on a time condition. http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java index c208079..0ebcc4f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java @@ -194,7 +194,7 @@ public class StreamingFileSink<IN> private Bucketer<IN, BucketID> bucketer; - private RollingPolicy<BucketID> rollingPolicy; + private RollingPolicy<IN, BucketID> rollingPolicy; private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>(); @@ -215,12 +215,12 @@ public class StreamingFileSink<IN> return this; } - public StreamingFileSink.RowFormatBuilder<IN, BucketID> withRollingPolicy(final RollingPolicy<BucketID> policy) { + public StreamingFileSink.RowFormatBuilder<IN, BucketID> withRollingPolicy(final RollingPolicy<IN, BucketID> policy) { this.rollingPolicy = Preconditions.checkNotNull(policy); return this; } - public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<ID> policy) { + public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<IN, ID> policy) { @SuppressWarnings("unchecked") StreamingFileSink.RowFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.RowFormatBuilder<IN, ID>) this; reInterpreted.bucketer = Preconditions.checkNotNull(bucketer); @@ -340,7 +340,6 @@ public class StreamingFileSink<IN> buckets.snapshotState( context.getCheckpointId(), - context.getCheckpointTimestamp(), bucketStates, maxPartCountersState); } http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java index a9ff617..15c3b4d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java @@ -37,7 +37,7 @@ import java.io.IOException; * </ol> */ @PublicEvolving -public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<BucketID> { +public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> { private static final long serialVersionUID = 1L; @@ -67,32 +67,19 @@ public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<Bucke } @Override - public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) { - return false; + public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException { + return partFileState.getSize() > partSize; } @Override - public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) throws IOException { - if (partFileState == null) { - // this means that there is no currently open part file. - return true; - } - + public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException { return partFileState.getSize() > partSize; } @Override public boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) { - if (partFileState == null) { - // this means that there is no currently open part file. - return true; - } - - if (currentTime - partFileState.getCreationTime() > rolloverInterval) { - return true; - } - - return currentTime - partFileState.getLastUpdateTime() > inactivityInterval; + return currentTime - partFileState.getCreationTime() > rolloverInterval || + currentTime - partFileState.getLastUpdateTime() > inactivityInterval; } /** @@ -150,7 +137,7 @@ public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<Bucke /** * Creates the actual policy. */ - public <BucketID> DefaultRollingPolicy<BucketID> build() { + public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> build() { return new DefaultRollingPolicy<>(partSize, rolloverInterval, inactivityInterval); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java index 4361941..df15981 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java @@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; /** * A {@link RollingPolicy} which rolls on every checkpoint. */ -public class OnCheckpointRollingPolicy<BucketID> implements RollingPolicy<BucketID> { +public class OnCheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> { private static final long serialVersionUID = 1L; @@ -34,7 +34,7 @@ public class OnCheckpointRollingPolicy<BucketID> implements RollingPolicy<Bucket } @Override - public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) { + public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) { return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java index 6e942e9..7c23918 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java @@ -494,7 +494,7 @@ public class LocalStreamingFileSinkTest extends TestLogger { final TestBucketFactory first = new TestBucketFactory(); final TestBucketFactory second = new TestBucketFactory(); - final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy + final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = DefaultRollingPolicy .create() .withMaxPartSize(2L) .withRolloverInterval(100L) http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java index 61e1433..078a46b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java @@ -44,7 +44,7 @@ public class RollingPolicyTest { public void testDefaultRollingPolicy() throws Exception { final File outDir = TEMP_FOLDER.newFolder(); - final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy + final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = DefaultRollingPolicy .create() .withMaxPartSize(10L) .withInactivityInterval(4L) @@ -104,7 +104,7 @@ public class RollingPolicyTest { public void testRollOnCheckpointPolicy() throws Exception { final File outDir = TEMP_FOLDER.newFolder(); - final RollingPolicy<String> rollingPolicy = new OnCheckpointRollingPolicy<>(); + final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = new OnCheckpointRollingPolicy<>(); try ( OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createCustomRescalingTestSink( @@ -159,7 +159,7 @@ public class RollingPolicyTest { public void testCustomRollingPolicy() throws Exception { final File outDir = TEMP_FOLDER.newFolder(); - final RollingPolicy<String> rollingPolicy = new RollingPolicy<String>() { + final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = new RollingPolicy<Tuple2<String, Integer>, String>() { private static final long serialVersionUID = 1L; @@ -169,7 +169,7 @@ public class RollingPolicyTest { } @Override - public boolean shouldRollOnEvent(PartFileInfo<String> partFileState) throws IOException { + public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, Tuple2<String, Integer> element) throws IOException { // this means that 2 elements will close the part file. return partFileState.getSize() > 12L; } http://git-wip-us.apache.org/repos/asf/flink/blob/9e348d32/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java index 184e23e..9589c5a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java @@ -51,7 +51,7 @@ public class TestUtils { long inactivityInterval, long partMaxSize) throws Exception { - final RollingPolicy<String> rollingPolicy = + final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = DefaultRollingPolicy .create() .withMaxPartSize(partMaxSize) @@ -84,7 +84,7 @@ public class TestUtils { final long bucketCheckInterval, final Bucketer<Tuple2<String, Integer>, String> bucketer, final Encoder<Tuple2<String, Integer>> writer, - final RollingPolicy<String> rollingPolicy, + final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy, final BucketFactory<Tuple2<String, Integer>, String> bucketFactory) throws Exception { StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink
