This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new c4acec8 [FLINK-13027][fs-connector] SFS bulk-encoded writer supports
customized checkpoint policy
c4acec8 is described below
commit c4acec8975a03a57713a1cbc8543cbed1d83612d
Author: Ying <[email protected]>
AuthorDate: Fri Dec 20 15:12:09 2019 -0800
[FLINK-13027][fs-connector] SFS bulk-encoded writer supports customized
checkpoint policy
---
.../sink/filesystem/StreamingFileSink.java | 22 ++++++++++----
...ingPolicy.java => CheckpointRollingPolicy.java} | 34 ++++++++++------------
.../rollingpolicies/OnCheckpointRollingPolicy.java | 9 ++----
.../api/functions/sink/filesystem/TestUtils.java | 3 ++
4 files changed, 38 insertions(+), 30 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index cd4afc2..d8d9d6d 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -37,6 +37,7 @@ import
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -73,7 +74,7 @@ import java.io.Serializable;
* {@code "prefix-1-17.ext"} containing the data from {@code subtask 1} of the
sink and is the {@code 17th} bucket
* created by that subtask.
* Part files roll based on the user-specified {@link RollingPolicy}. By
default, a {@link DefaultRollingPolicy}
- * is used.
+ * is used for row-encoded sink output; a {@link OnCheckpointRollingPolicy} is
used for bulk-encoded sink output.
*
* <p>In some scenarios, the open buckets are required to change based on
time. In these cases, the user
* can specify a {@code bucketCheckInterval} (by default 1m) and the sink will
check periodically and roll
@@ -268,7 +269,7 @@ public class StreamingFileSink<IN>
public <ID> StreamingFileSink.RowFormatBuilder<IN, ID, ?
extends RowFormatBuilder<IN, ID, ?>> withNewBucketAssignerAndPolicy(final
BucketAssigner<IN, ID> assigner, final RollingPolicy<IN, ID> policy) {
Preconditions.checkState(bucketFactory.getClass() ==
DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssignerAndPolicy() cannot
be called after specifying a customized bucket factory");
- return new RowFormatBuilder<>(basePath, encoder,
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy),
bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig);
+ return new RowFormatBuilder(basePath, encoder,
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy),
bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig);
}
/** Creates the actual sink. */
@@ -311,24 +312,29 @@ public class StreamingFileSink<IN>
private BucketAssigner<IN, BucketID> bucketAssigner;
+ private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
+
private BucketFactory<IN, BucketID> bucketFactory;
private OutputFileConfig outputFileConfig;
protected BulkFormatBuilder(Path basePath,
BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, BucketID> assigner) {
- this(basePath, writerFactory, assigner,
DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(),
OutputFileConfig.builder().build());
+ this(basePath, writerFactory, assigner,
OnCheckpointRollingPolicy.build(), DEFAULT_BUCKET_CHECK_INTERVAL,
+ new DefaultBucketFactoryImpl<>(),
OutputFileConfig.builder().build());
}
protected BulkFormatBuilder(
Path basePath,
BulkWriter.Factory<IN> writerFactory,
BucketAssigner<IN, BucketID> assigner,
+ CheckpointRollingPolicy<IN, BucketID> policy,
long bucketCheckInterval,
BucketFactory<IN, BucketID> bucketFactory,
OutputFileConfig outputFileConfig) {
this.basePath = Preconditions.checkNotNull(basePath);
this.writerFactory = writerFactory;
this.bucketAssigner =
Preconditions.checkNotNull(assigner);
+ this.rollingPolicy = Preconditions.checkNotNull(policy);
this.bucketCheckInterval = bucketCheckInterval;
this.bucketFactory =
Preconditions.checkNotNull(bucketFactory);
this.outputFileConfig =
Preconditions.checkNotNull(outputFileConfig);
@@ -348,6 +354,11 @@ public class StreamingFileSink<IN>
return self();
}
+ public T withRollingPolicy(CheckpointRollingPolicy<IN,
BucketID> rollingPolicy) {
+ this.rollingPolicy =
Preconditions.checkNotNull(rollingPolicy);
+ return self();
+ }
+
@VisibleForTesting
T withBucketFactory(final BucketFactory<IN, BucketID> factory) {
this.bucketFactory =
Preconditions.checkNotNull(factory);
@@ -361,7 +372,8 @@ public class StreamingFileSink<IN>
public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID, ?
extends BulkFormatBuilder<IN, ID, ?>> withNewBucketAssigner(final
BucketAssigner<IN, ID> assigner) {
Preconditions.checkState(bucketFactory.getClass() ==
DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssigner() cannot be
called after specifying a customized bucket factory");
- return new BulkFormatBuilder<>(basePath, writerFactory,
Preconditions.checkNotNull(assigner), bucketCheckInterval, new
DefaultBucketFactoryImpl<>(), outputFileConfig);
+ return new BulkFormatBuilder(basePath, writerFactory,
Preconditions.checkNotNull(assigner),
+ rollingPolicy, bucketCheckInterval, new
DefaultBucketFactoryImpl<>(), outputFileConfig);
}
/** Creates the actual sink. */
@@ -376,7 +388,7 @@ public class StreamingFileSink<IN>
bucketAssigner,
bucketFactory,
new
BulkPartWriter.Factory<>(writerFactory),
- OnCheckpointRollingPolicy.build(),
+ rollingPolicy,
subtaskIndex,
outputFileConfig);
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.java
similarity index 60%
copy from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
copy to
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.java
index 53fce08..e6399d8 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.java
@@ -22,32 +22,30 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import java.io.IOException;
+
/**
- * A {@link RollingPolicy} which rolls on every checkpoint.
+ * An abstract {@link RollingPolicy} which rolls on every checkpoint.
*/
@PublicEvolving
-public class OnCheckpointRollingPolicy<IN, BucketID> implements
RollingPolicy<IN, BucketID> {
-
- private static final long serialVersionUID = 1L;
-
- private OnCheckpointRollingPolicy() {}
-
- @Override
+public abstract class CheckpointRollingPolicy<IN, BucketID> implements
RollingPolicy<IN, BucketID> {
public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID>
partFileState) {
return true;
}
- @Override
- public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState,
IN element) {
- return false;
- }
+ public abstract boolean shouldRollOnEvent(final PartFileInfo<BucketID>
partFileState, IN element) throws IOException;
- @Override
- public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID>
partFileState, long currentTime) {
- return false;
- }
+ public abstract boolean shouldRollOnProcessingTime(final
PartFileInfo<BucketID> partFileState, final long currentTime) throws
IOException;
+
+ /**
+ * The base abstract builder class for {@link CheckpointRollingPolicy}.
+ */
+ public abstract static class PolicyBuilder<IN, BucketID, T extends
PolicyBuilder<IN, BucketID, T>> {
+ @SuppressWarnings("unchecked")
+ protected T self() {
+ return (T) this;
+ }
- public static <IN, BucketID> OnCheckpointRollingPolicy<IN, BucketID>
build() {
- return new OnCheckpointRollingPolicy<>();
+ public abstract CheckpointRollingPolicy<IN, BucketID> build();
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
index 53fce08..572fda9 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
@@ -23,21 +23,16 @@ import
org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
/**
- * A {@link RollingPolicy} which rolls on every checkpoint.
+ * A {@link RollingPolicy} which rolls (ONLY) on every checkpoint.
*/
@PublicEvolving
-public class OnCheckpointRollingPolicy<IN, BucketID> implements
RollingPolicy<IN, BucketID> {
+public final class OnCheckpointRollingPolicy<IN, BucketID> extends
CheckpointRollingPolicy<IN, BucketID> {
private static final long serialVersionUID = 1L;
private OnCheckpointRollingPolicy() {}
@Override
- public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID>
partFileState) {
- return true;
- }
-
- @Override
public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState,
IN element) {
return false;
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index 45460fc..df005c7 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -157,6 +158,7 @@ public class TestUtils {
.forBulkFormat(new Path(outDir.toURI()), writer)
.withBucketAssigner(bucketer)
.withBucketCheckInterval(bucketCheckInterval)
+ .withRollingPolicy(OnCheckpointRollingPolicy.build())
.withBucketFactory(bucketFactory)
.withOutputFileConfig(outputFileConfig)
.build();
@@ -197,6 +199,7 @@ public class TestUtils {
StreamingFileSink<Tuple2<String, Integer>> sink =
StreamingFileSink
.forBulkFormat(new Path(outDir.toURI()), writer)
.withNewBucketAssigner(bucketer)
+
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withBucketCheckInterval(bucketCheckInterval)
.withBucketFactory(bucketFactory)
.withOutputFileConfig(outputFileConfig)