http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java index 0406afc..c208079 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java @@ -20,8 +20,8 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.Encoder; -import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; @@ -30,29 +30,23 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.core.fs.RecoverableWriter; -import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy; +import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.OnCheckpointRollingPolicy; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; +import java.io.Serializable; /** * Sink that emits its input elements to {@link FileSystem} files within buckets. This is @@ -69,7 +63,9 @@ import java.util.Map; * be written to inside the base directory. The {@code Bucketer} can, for example, use time or * a property of the element to determine the bucket directory. The default {@code Bucketer} is a * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify - * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. + * a custom {@code Bucketer} using the {@code setBucketer(Bucketer)} method, after calling + * {@link StreamingFileSink#forRowFormat(Path, Encoder)} or + * {@link StreamingFileSink#forBulkFormat(Path, BulkWriter.Factory)}. * * * <p>The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink @@ -94,19 +90,6 @@ import java.util.Map; * state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that * they do not contain data that arrived after the checkpoint from which we restore. * - * <p><b>NOTE:</b> - * <ol> - * <li> - * If checkpointing is not enabled the pending files will never be moved to the finished state. - * </li> - * <li> - * The part files are written using an instance of {@link Encoder}. By default, a - * {@link SimpleStringEncoder} is used, which writes the result of {@code toString()} for - * every element, separated by newlines. You can configure the writer using the - * {@link #setEncoder(Encoder)}. - * </li> - * </ol> - * * @param <IN> Type of the elements emitted by this sink */ @PublicEvolving @@ -116,8 +99,6 @@ public class StreamingFileSink<IN> private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(StreamingFileSink.class); - // -------------------------- state descriptors --------------------------- private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC = @@ -128,298 +109,264 @@ public class StreamingFileSink<IN> // ------------------------ configuration fields -------------------------- - private final Path basePath; - - private final BucketFactory<IN> bucketFactory; - - private long bucketCheckInterval = 60L * 1000L; - - private Bucketer<IN> bucketer; + private final long bucketCheckInterval; - private Encoder<IN> encoder; - - private RollingPolicy rollingPolicy; + private final StreamingFileSink.BucketsBuilder<IN, ?> bucketsBuilder; // --------------------------- runtime fields ----------------------------- - private transient BucketerContext bucketerContext; - - private transient RecoverableWriter fileSystemWriter; + private transient Buckets<IN, ?> buckets; private transient ProcessingTimeService processingTimeService; - private transient Map<String, Bucket<IN>> activeBuckets; + // --------------------------- State Related Fields ----------------------------- - ////////////////// State Related Fields ///////////////////// + private transient ListState<byte[]> bucketStates; - private transient BucketStateSerializer bucketStateSerializer; + private transient ListState<Long> maxPartCountersState; - private transient ListState<byte[]> restoredBucketStates; + /** + * Creates a new {@code StreamingFileSink} that writes files to the given base directory. + */ + private StreamingFileSink( + final StreamingFileSink.BucketsBuilder<IN, ?> bucketsBuilder, + final long bucketCheckInterval) { + + Preconditions.checkArgument(bucketCheckInterval > 0L); - private transient ListState<Long> restoredMaxCounters; + this.bucketsBuilder = Preconditions.checkNotNull(bucketsBuilder); + this.bucketCheckInterval = bucketCheckInterval; + } - private transient long initMaxPartCounter; + // ------------------------------------------------------------------------ - private transient long maxPartCounterUsed; + // --------------------------- Sink Builders ----------------------------- /** - * Creates a new {@code StreamingFileSink} that writes files to the given base directory. - * - * <p>This uses a {@link DateTimeBucketer} as {@link Bucketer} and a {@link SimpleStringEncoder} as a writer. - * - * @param basePath The directory to which to write the bucket files. + * Creates the builder for a {@code StreamingFileSink} with row-encoding format. + * @param basePath the base path where all the buckets are going to be created as sub-directories. + * @param encoder the {@link Encoder} to be used when writing elements in the buckets. + * @param <IN> the type of incoming elements + * @return The builder where the remaining of the configuration parameters for the sink can be configured. + * In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters. */ - public StreamingFileSink(Path basePath) { - this(basePath, new DefaultBucketFactory<>()); + public static <IN> StreamingFileSink.RowFormatBuilder<IN, String> forRowFormat( + final Path basePath, final Encoder<IN> encoder) { + return new StreamingFileSink.RowFormatBuilder<>(basePath, encoder, new DateTimeBucketer<>()); } - @VisibleForTesting - StreamingFileSink(Path basePath, BucketFactory<IN> bucketFactory) { - this.basePath = Preconditions.checkNotNull(basePath); - this.bucketer = new DateTimeBucketer<>(); - this.encoder = new SimpleStringEncoder<>(); - this.rollingPolicy = DefaultRollingPolicy.create().build(); - this.bucketFactory = Preconditions.checkNotNull(bucketFactory); + /** + * Creates the builder for a {@link StreamingFileSink} with row-encoding format. + * @param basePath the base path where all the buckets are going to be created as sub-directories. + * @param writerFactory the {@link BulkWriter.Factory} to be used when writing elements in the buckets. + * @param <IN> the type of incoming elements + * @return The builder where the remaining of the configuration parameters for the sink can be configured. + * In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters. + */ + public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String> forBulkFormat( + final Path basePath, final BulkWriter.Factory<IN> writerFactory) { + return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketer<>()); } - public StreamingFileSink<IN> setEncoder(Encoder<IN> encoder) { - this.encoder = Preconditions.checkNotNull(encoder); - return this; - } + /** + * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}. + */ + private abstract static class BucketsBuilder<IN, BucketID> implements Serializable { - public StreamingFileSink<IN> setBucketer(Bucketer<IN> bucketer) { - this.bucketer = Preconditions.checkNotNull(bucketer); - return this; - } + private static final long serialVersionUID = 1L; - public StreamingFileSink<IN> setBucketCheckInterval(long interval) { - this.bucketCheckInterval = interval; - return this; + abstract Buckets<IN, BucketID> createBuckets(final int subtaskIndex) throws IOException; } - public StreamingFileSink<IN> setRollingPolicy(RollingPolicy policy) { - this.rollingPolicy = Preconditions.checkNotNull(policy); - return this; - } + /** + * A builder for configuring the sink for row-wise encoding formats. + */ + @PublicEvolving + public static class RowFormatBuilder<IN, BucketID> extends StreamingFileSink.BucketsBuilder<IN, BucketID> { - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - final Iterator<Map.Entry<String, Bucket<IN>>> activeBucketIt = - activeBuckets.entrySet().iterator(); - - while (activeBucketIt.hasNext()) { - Bucket<IN> bucket = activeBucketIt.next().getValue(); - bucket.commitUpToCheckpoint(checkpointId); - - if (!bucket.isActive()) { - // We've dealt with all the pending files and the writer for this bucket is not currently open. - // Therefore this bucket is currently inactive and we can remove it from our state. - activeBucketIt.remove(); - } - } - } + private static final long serialVersionUID = 1L; - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - Preconditions.checkState( - restoredBucketStates != null && fileSystemWriter != null && bucketStateSerializer != null, - "sink has not been initialized"); + private long bucketCheckInterval = 60L * 1000L; + + private final Path basePath; - restoredBucketStates.clear(); - for (Bucket<IN> bucket : activeBuckets.values()) { + private final Encoder<IN> encoder; - final PartFileInfo info = bucket.getInProgressPartInfo(); - final long checkpointTimestamp = context.getCheckpointTimestamp(); + private Bucketer<IN, BucketID> bucketer; - if (info != null && rollingPolicy.shouldRoll(info, checkpointTimestamp)) { - // we also check here so that we do not have to always - // wait for the "next" element to arrive. - bucket.closePartFile(); - } + private RollingPolicy<BucketID> rollingPolicy; - final BucketState bucketState = bucket.snapshot(context.getCheckpointId()); - restoredBucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState)); + private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>(); + + RowFormatBuilder(Path basePath, Encoder<IN> encoder, Bucketer<IN, BucketID> bucketer) { + this.basePath = Preconditions.checkNotNull(basePath); + this.encoder = Preconditions.checkNotNull(encoder); + this.bucketer = Preconditions.checkNotNull(bucketer); + this.rollingPolicy = DefaultRollingPolicy.create().build(); } - restoredMaxCounters.clear(); - restoredMaxCounters.add(maxPartCounterUsed); - } + public StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketCheckInterval(final long interval) { + this.bucketCheckInterval = interval; + return this; + } - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - initFileSystemWriter(); + public StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketer(final Bucketer<IN, BucketID> bucketer) { + this.bucketer = Preconditions.checkNotNull(bucketer); + return this; + } - this.activeBuckets = new HashMap<>(); + public StreamingFileSink.RowFormatBuilder<IN, BucketID> withRollingPolicy(final RollingPolicy<BucketID> policy) { + this.rollingPolicy = Preconditions.checkNotNull(policy); + return this; + } - // When resuming after a failure: - // 1) we get the max part counter used before in order to make sure that we do not overwrite valid data - // 2) we commit any pending files for previous checkpoints (previous to the last successful one) - // 3) we resume writing to the previous in-progress file of each bucket, and - // 4) if we receive multiple states for the same bucket, we merge them. + public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<ID> policy) { + @SuppressWarnings("unchecked") + StreamingFileSink.RowFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.RowFormatBuilder<IN, ID>) this; + reInterpreted.bucketer = Preconditions.checkNotNull(bucketer); + reInterpreted.rollingPolicy = Preconditions.checkNotNull(policy); + return reInterpreted; + } - final OperatorStateStore stateStore = context.getOperatorStateStore(); + @VisibleForTesting + StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketFactory(final BucketFactory<IN, BucketID> factory) { + this.bucketFactory = Preconditions.checkNotNull(factory); + return this; + } - restoredBucketStates = stateStore.getListState(BUCKET_STATE_DESC); - restoredMaxCounters = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC); + /** Creates the actual sink. */ + public StreamingFileSink<IN> build() { + return new StreamingFileSink<>(this, bucketCheckInterval); + } - if (context.isRestored()) { - final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - - LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex); - - long maxCounter = 0L; - for (long partCounter: restoredMaxCounters.get()) { - maxCounter = Math.max(partCounter, maxCounter); - } - initMaxPartCounter = maxCounter; - - for (byte[] recoveredState : restoredBucketStates.get()) { - final BucketState bucketState = SimpleVersionedSerialization.readVersionAndDeSerialize( - bucketStateSerializer, recoveredState); - - final String bucketId = bucketState.getBucketId(); - - LOG.info("Recovered bucket for {}", bucketId); - - final Bucket<IN> restoredBucket = bucketFactory.restoreBucket( - fileSystemWriter, - subtaskIndex, - initMaxPartCounter, - encoder, - bucketState - ); - - final Bucket<IN> existingBucket = activeBuckets.get(bucketId); - if (existingBucket == null) { - activeBuckets.put(bucketId, restoredBucket); - } else { - existingBucket.merge(restoredBucket); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("{} idx {} restored state for bucket {}", getClass().getSimpleName(), - subtaskIndex, assembleBucketPath(bucketId)); - } - } + @Override + Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException { + return new Buckets<>( + basePath, + bucketer, + bucketFactory, + new RowWisePartWriter.Factory<>(encoder), + rollingPolicy, + subtaskIndex); } } - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); + /** + * A builder for configuring the sink for bulk-encoding formats, e.g. Parquet/ORC. + */ + @PublicEvolving + public static class BulkFormatBuilder<IN, BucketID> extends StreamingFileSink.BucketsBuilder<IN, BucketID> { - processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(); - long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); - processingTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this); - this.bucketerContext = new BucketerContext(); - } + private static final long serialVersionUID = 1L; - @Override - public void onProcessingTime(long timestamp) throws Exception { - final long currentTime = processingTimeService.getCurrentProcessingTime(); - for (Bucket<IN> bucket : activeBuckets.values()) { - final PartFileInfo info = bucket.getInProgressPartInfo(); - if (info != null && rollingPolicy.shouldRoll(info, currentTime)) { - bucket.closePartFile(); - } - } - processingTimeService.registerTimer(timestamp + bucketCheckInterval, this); - } + private long bucketCheckInterval = 60L * 1000L; - @Override - public void invoke(IN value, Context context) throws Exception { - final long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); - final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + private final Path basePath; + + private final BulkWriter.Factory<IN> writerFactory; + + private Bucketer<IN, BucketID> bucketer; - // setting the values in the bucketer context - bucketerContext.update(context.timestamp(), context.currentWatermark(), currentProcessingTime); - - final String bucketId = bucketer.getBucketId(value, bucketerContext); - - Bucket<IN> bucket = activeBuckets.get(bucketId); - if (bucket == null) { - final Path bucketPath = assembleBucketPath(bucketId); - bucket = bucketFactory.getNewBucket( - fileSystemWriter, - subtaskIndex, - bucketId, - bucketPath, - initMaxPartCounter, - encoder); - activeBuckets.put(bucketId, bucket); + private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>(); + + BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, Bucketer<IN, BucketID> bucketer) { + this.basePath = Preconditions.checkNotNull(basePath); + this.writerFactory = Preconditions.checkNotNull(writerFactory); + this.bucketer = Preconditions.checkNotNull(bucketer); } - final PartFileInfo info = bucket.getInProgressPartInfo(); - if (info == null || rollingPolicy.shouldRoll(info, currentProcessingTime)) { - bucket.rollPartFile(currentProcessingTime); + public StreamingFileSink.BulkFormatBuilder<IN, BucketID> withBucketCheckInterval(long interval) { + this.bucketCheckInterval = interval; + return this; } - bucket.write(value, currentProcessingTime); - // we update the counter here because as buckets become inactive and - // get removed in the initializeState(), at the time we snapshot they - // may not be there to take them into account during checkpointing. - updateMaxPartCounter(bucket.getPartCounter()); - } + public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID> withBucketer(Bucketer<IN, ID> bucketer) { + @SuppressWarnings("unchecked") + StreamingFileSink.BulkFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.BulkFormatBuilder<IN, ID>) this; + reInterpreted.bucketer = Preconditions.checkNotNull(bucketer); + return reInterpreted; + } - @Override - public void close() throws Exception { - if (activeBuckets != null) { - activeBuckets.values().forEach(Bucket::dispose); + @VisibleForTesting + StreamingFileSink.BulkFormatBuilder<IN, BucketID> withBucketFactory(final BucketFactory<IN, BucketID> factory) { + this.bucketFactory = Preconditions.checkNotNull(factory); + return this; } - } - private void initFileSystemWriter() throws IOException { - if (fileSystemWriter == null) { - fileSystemWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter(); - bucketStateSerializer = new BucketStateSerializer( - fileSystemWriter.getResumeRecoverableSerializer(), - fileSystemWriter.getCommitRecoverableSerializer() - ); + /** Creates the actual sink. */ + public StreamingFileSink<IN> build() { + return new StreamingFileSink<>(this, bucketCheckInterval); } - } - private void updateMaxPartCounter(long candidate) { - maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate); + @Override + Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException { + return new Buckets<>( + basePath, + bucketer, + bucketFactory, + new BulkPartWriter.Factory<>(writerFactory), + new OnCheckpointRollingPolicy<>(), + subtaskIndex); + } } - private Path assembleBucketPath(String bucketId) { - return new Path(basePath, bucketId); + // --------------------------- Sink Methods ----------------------------- + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + this.buckets = bucketsBuilder.createBuckets(subtaskIndex); + + final OperatorStateStore stateStore = context.getOperatorStateStore(); + bucketStates = stateStore.getListState(BUCKET_STATE_DESC); + maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC); + + if (context.isRestored()) { + buckets.initializeState(bucketStates, maxPartCountersState); + } } - /** - * The {@link Bucketer.Context} exposed to the - * {@link Bucketer#getBucketId(Object, Bucketer.Context)} - * whenever a new incoming element arrives. - */ - private static class BucketerContext implements Bucketer.Context { + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + buckets.publishUpToCheckpoint(checkpointId); + } - @Nullable - private Long elementTimestamp; + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + Preconditions.checkState(bucketStates != null && maxPartCountersState != null, "sink has not been initialized"); - private long currentWatermark; + bucketStates.clear(); + maxPartCountersState.clear(); - private long currentProcessingTime; + buckets.snapshotState( + context.getCheckpointId(), + context.getCheckpointTimestamp(), + bucketStates, + maxPartCountersState); + } - void update(@Nullable Long elementTimestamp, long currentWatermark, long currentProcessingTime) { - this.elementTimestamp = elementTimestamp; - this.currentWatermark = currentWatermark; - this.currentProcessingTime = currentProcessingTime; - } + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(); + long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); + processingTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this); + } - @Override - public long currentProcessingTime() { - return currentProcessingTime; - } + @Override + public void onProcessingTime(long timestamp) throws Exception { + final long currentTime = processingTimeService.getCurrentProcessingTime(); + buckets.onProcessingTime(currentTime); + processingTimeService.registerTimer(currentTime + bucketCheckInterval, this); + } - @Override - public long currentWatermark() { - return currentWatermark; - } + @Override + public void invoke(IN value, SinkFunction.Context context) throws Exception { + buckets.onElement(value, context); + } - @Override - @Nullable - public Long timestamp() { - return elementTimestamp; - } + @Override + public void close() throws Exception { + buckets.close(); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java index 5ffe152..d7b2013 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java @@ -19,22 +19,29 @@ package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.io.SimpleVersionedSerializer; /** * A {@link Bucketer} that does not perform any * bucketing of files. All files are written to the base path. */ @PublicEvolving -public class BasePathBucketer<T> implements Bucketer<T> { +public class BasePathBucketer<T> implements Bucketer<T, String> { private static final long serialVersionUID = -6033643155550226022L; @Override - public String getBucketId(T element, Context context) { + public String getBucketId(T element, Bucketer.Context context) { return ""; } @Override + public SimpleVersionedSerializer<String> getSerializer() { + // in the future this could be optimized as it is the empty string. + return SimpleVersionedStringSerializer.INSTANCE; + } + + @Override public String toString() { return "BasePathBucketer"; } http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java index 5c30927..503e361 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java @@ -19,6 +19,8 @@ package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import javax.annotation.Nullable; @@ -33,9 +35,16 @@ import java.io.Serializable; * <p>The {@code StreamingFileSink} can be writing to many buckets at a time, and it is responsible for managing * a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket the * element should fall in. The {@code Bucketer} can, for example, determine buckets based on system time. + * + * @param <IN> The type of input elements. + * @param <BucketID> The type of the object returned by the {@link #getBucketId(Object, Bucketer.Context)}. This has to have + * a correct {@link #hashCode()} and {@link #equals(Object)} method. In addition, the {@link Path} + * to the created bucket will be the result of the {@link #toString()} of this method, appended to + * the {@code basePath} specified in the + * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink} */ @PublicEvolving -public interface Bucketer<T> extends Serializable { +public interface Bucketer<IN, BucketID> extends Serializable { /** * Returns the identifier of the bucket the provided element should be put into. @@ -48,13 +57,20 @@ public interface Bucketer<T> extends Serializable { * and the {@code base path} provided during the initialization of the * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}. */ - String getBucketId(T element, Context context); + BucketID getBucketId(IN element, Bucketer.Context context); + + /** + * @return A {@link SimpleVersionedSerializer} capable of serializing/deserializing the elements + * of type {@code BucketID}. That is the type of the objects returned by the + * {@link #getBucketId(Object, Bucketer.Context)}. + */ + SimpleVersionedSerializer<BucketID> getSerializer(); /** * Context that the {@link Bucketer} can use for getting additional data about * an input record. * - * <p>The context is only valid for the duration of a {@link Bucketer#getBucketId(Object, Context)} call. + * <p>The context is only valid for the duration of a {@link Bucketer#getBucketId(Object, Bucketer.Context)} call. * Do not store the context and use afterwards! */ @PublicEvolving http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java index 515468c..eed0b79 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.io.SimpleVersionedSerializer; import java.text.SimpleDateFormat; import java.util.Date; @@ -50,9 +51,9 @@ import java.util.Date; * */ @PublicEvolving -public class DateTimeBucketer<T> implements Bucketer<T> { +public class DateTimeBucketer<IN> implements Bucketer<IN, String> { - private static final long serialVersionUID = 3284420879277893804L; + private static final long serialVersionUID = 1L; private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH"; @@ -78,7 +79,7 @@ public class DateTimeBucketer<T> implements Bucketer<T> { } @Override - public String getBucketId(T element, Context context) { + public String getBucketId(IN element, Bucketer.Context context) { if (dateFormatter == null) { dateFormatter = new SimpleDateFormat(formatString); } @@ -86,6 +87,11 @@ public class DateTimeBucketer<T> implements Bucketer<T> { } @Override + public SimpleVersionedSerializer<String> getSerializer() { + return SimpleVersionedStringSerializer.INSTANCE; + } + + @Override public String toString() { return "DateTimeBucketer{" + "formatString='" + formatString + '\'' + http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java new file mode 100644 index 0000000..d025af9 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** + * A {@link SimpleVersionedSerializer} implementation for Strings. + */ +public final class SimpleVersionedStringSerializer implements SimpleVersionedSerializer<String> { + + private static final Charset CHARSET = StandardCharsets.UTF_8; + + public static final SimpleVersionedStringSerializer INSTANCE = new SimpleVersionedStringSerializer(); + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(String value) { + final byte[] serialized = value.getBytes(StandardCharsets.UTF_8); + final byte[] targetBytes = new byte[Integer.BYTES + serialized.length]; + + final ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN); + bb.putInt(serialized.length); + bb.put(serialized); + return targetBytes; + } + + @Override + public String deserialize(int version, byte[] serialized) throws IOException { + switch (version) { + case 1: + return deserializeV1(serialized); + default: + throw new IOException("Unrecognized version or corrupt state: " + version); + } + } + + private static String deserializeV1(byte[] serialized) { + final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN); + final byte[] targetStringBytes = new byte[bb.getInt()]; + bb.get(targetStringBytes); + return new String(targetStringBytes, CHARSET); + } + + /** + * Private constructor to prevent instantiation. + * Access the serializer through the {@link #INSTANCE}. + */ + private SimpleVersionedStringSerializer() {} +} + http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java new file mode 100644 index 0000000..a9ff617 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * The default implementation of the {@link RollingPolicy}. + * + * <p>This policy rolls a part file if: + * <ol> + * <li>there is no open part file,</li> + * <li>the current file has reached the maximum bucket size (by default 128MB),</li> + * <li>the current file is older than the roll over interval (by default 60 sec), or</li> + * <li>the current file has not been written to for more than the allowed inactivityTime (by default 60 sec).</li> + * </ol> + */ +@PublicEvolving +public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<BucketID> { + + private static final long serialVersionUID = 1L; + + private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L; + + private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L; + + private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L; + + private final long partSize; + + private final long rolloverInterval; + + private final long inactivityInterval; + + /** + * Private constructor to avoid direct instantiation. + */ + private DefaultRollingPolicy(long partSize, long rolloverInterval, long inactivityInterval) { + Preconditions.checkArgument(partSize > 0L); + Preconditions.checkArgument(rolloverInterval > 0L); + Preconditions.checkArgument(inactivityInterval > 0L); + + this.partSize = partSize; + this.rolloverInterval = rolloverInterval; + this.inactivityInterval = inactivityInterval; + } + + @Override + public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) { + return false; + } + + @Override + public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) throws IOException { + if (partFileState == null) { + // this means that there is no currently open part file. + return true; + } + + return partFileState.getSize() > partSize; + } + + @Override + public boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) { + if (partFileState == null) { + // this means that there is no currently open part file. + return true; + } + + if (currentTime - partFileState.getCreationTime() > rolloverInterval) { + return true; + } + + return currentTime - partFileState.getLastUpdateTime() > inactivityInterval; + } + + /** + * Initiates the instantiation of a {@code DefaultRollingPolicy}. + * To finalize it and have the actual policy, call {@code .create()}. + */ + public static DefaultRollingPolicy.PolicyBuilder create() { + return new DefaultRollingPolicy.PolicyBuilder(); + } + + /** + * A helper class that holds the configuration properties for the {@link DefaultRollingPolicy}. + */ + @PublicEvolving + public static final class PolicyBuilder { + + private long partSize = DEFAULT_MAX_PART_SIZE; + + private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL; + + private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL; + + private PolicyBuilder() {} + + /** + * Sets the part size above which a part file will have to roll. + * @param size the allowed part size. + */ + public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(long size) { + Preconditions.checkState(size > 0L); + this.partSize = size; + return this; + } + + /** + * Sets the interval of allowed inactivity after which a part file will have to roll. + * @param interval the allowed inactivity interval. + */ + public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(long interval) { + Preconditions.checkState(interval > 0L); + this.inactivityInterval = interval; + return this; + } + + /** + * Sets the max time a part file can stay open before having to roll. + * @param interval the desired rollover interval. + */ + public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(long interval) { + Preconditions.checkState(interval > 0L); + this.rolloverInterval = interval; + return this; + } + + /** + * Creates the actual policy. + */ + public <BucketID> DefaultRollingPolicy<BucketID> build() { + return new DefaultRollingPolicy<>(partSize, rolloverInterval, inactivityInterval); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java new file mode 100644 index 0000000..4361941 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies; + +import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; + +/** + * A {@link RollingPolicy} which rolls on every checkpoint. + */ +public class OnCheckpointRollingPolicy<BucketID> implements RollingPolicy<BucketID> { + + private static final long serialVersionUID = 1L; + + @Override + public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) { + return true; + } + + @Override + public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) { + return false; + } + + @Override + public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java index 353ac00..3d5be63 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer; import org.junit.Assert; import org.junit.ClassRule; @@ -60,20 +61,21 @@ public class BucketStateSerializerTest { final Path testBucket = new Path(testFolder.getPath(), "test"); - final BucketState bucketState = new BucketState( + final BucketState<String> bucketState = new BucketState<>( "test", testBucket, Long.MAX_VALUE, null, new HashMap<>()); - final SimpleVersionedSerializer<BucketState> serializer = - new BucketStateSerializer( + final SimpleVersionedSerializer<BucketState<String>> serializer = + new BucketStateSerializer<>( writer.getResumeRecoverableSerializer(), - writer.getCommitRecoverableSerializer() + writer.getCommitRecoverableSerializer(), + SimpleVersionedStringSerializer.INSTANCE ); byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState); - final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes); + final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes); Assert.assertEquals(testBucket, recoveredState.getBucketPath()); - Assert.assertNull(recoveredState.getCurrentInProgress()); + Assert.assertNull(recoveredState.getInProgress()); Assert.assertTrue(recoveredState.getPendingPerCheckpoint().isEmpty()); } @@ -90,13 +92,14 @@ public class BucketStateSerializerTest { final RecoverableWriter.ResumeRecoverable current = stream.persist(); - final BucketState bucketState = new BucketState( + final BucketState<String> bucketState = new BucketState<>( "test", testBucket, Long.MAX_VALUE, current, new HashMap<>()); - final SimpleVersionedSerializer<BucketState> serializer = - new BucketStateSerializer( + final SimpleVersionedSerializer<BucketState<String>> serializer = + new BucketStateSerializer<>( writer.getResumeRecoverableSerializer(), - writer.getCommitRecoverableSerializer() + writer.getCommitRecoverableSerializer(), + SimpleVersionedStringSerializer.INSTANCE ); final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState); @@ -104,7 +107,7 @@ public class BucketStateSerializerTest { // to simulate that everything is over for file. stream.close(); - final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes); + final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes); Assert.assertEquals(testBucket, recoveredState.getBucketPath()); @@ -147,18 +150,19 @@ public class BucketStateSerializerTest { final RecoverableWriter.ResumeRecoverable current = stream.persist(); - final BucketState bucketState = new BucketState( + final BucketState<String> bucketState = new BucketState<>( "test-2", bucketPath, Long.MAX_VALUE, current, commitRecoverables); - final SimpleVersionedSerializer<BucketState> serializer = - new BucketStateSerializer( + final SimpleVersionedSerializer<BucketState<String>> serializer = + new BucketStateSerializer<>( writer.getResumeRecoverableSerializer(), - writer.getCommitRecoverableSerializer() + writer.getCommitRecoverableSerializer(), + SimpleVersionedStringSerializer.INSTANCE ); stream.close(); byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState); - final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes); + final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes); Assert.assertEquals(bucketPath, recoveredState.getBucketPath()); @@ -220,20 +224,21 @@ public class BucketStateSerializerTest { final RecoverableWriter.ResumeRecoverable current = null; - final BucketState bucketState = new BucketState( + final BucketState<String> bucketState = new BucketState<>( "", bucketPath, Long.MAX_VALUE, current, commitRecoverables); - final SimpleVersionedSerializer<BucketState> serializer = new BucketStateSerializer( + final SimpleVersionedSerializer<BucketState<String>> serializer = new BucketStateSerializer<>( writer.getResumeRecoverableSerializer(), - writer.getCommitRecoverableSerializer() + writer.getCommitRecoverableSerializer(), + SimpleVersionedStringSerializer.INSTANCE ); byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState); - final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes); + final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes); Assert.assertEquals(bucketPath, recoveredState.getBucketPath()); - Assert.assertNull(recoveredState.getCurrentInProgress()); + Assert.assertNull(recoveredState.getInProgress()); final Map<Long, List<RecoverableWriter.CommitRecoverable>> recoveredRecoverables = recoveredState.getPendingPerCheckpoint(); Assert.assertEquals(5L, recoveredRecoverables.size()); http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java new file mode 100644 index 0000000..042ba4e --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer; + +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +/** + * Tests for {@link Buckets}. + */ +public class BucketsTest { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @Test + public void testContextPassingNormalExecution() throws Exception { + testCorrectPassingOfContext(1L, 2L, 3L); + } + + @Test + public void testContextPassingNullTimestamp() throws Exception { + testCorrectPassingOfContext(null, 2L, 3L); + } + + private void testCorrectPassingOfContext(Long timestamp, long watermark, long processingTime) throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + + final Long expectedTimestamp = timestamp; + final long expectedWatermark = watermark; + final long expectedProcessingTime = processingTime; + + final Buckets<String, String> buckets = StreamingFileSink + .<String>forRowFormat(new Path(outDir.toURI()), new SimpleStringEncoder<>()) + .withBucketer(new VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime)) + .createBuckets(2); + + buckets.onElement("TEST", new SinkFunction.Context() { + @Override + public long currentProcessingTime() { + return expectedProcessingTime; + } + + @Override + public long currentWatermark() { + return expectedWatermark; + } + + @Override + public Long timestamp() { + return expectedTimestamp; + } + }); + } + + private static class VarifyingBucketer implements Bucketer<String, String> { + + private static final long serialVersionUID = 7729086510972377578L; + + private final Long expectedTimestamp; + private final long expectedWatermark; + private final long expectedProcessingTime; + + VarifyingBucketer( + final Long expectedTimestamp, + final long expectedWatermark, + final long expectedProcessingTime + ) { + this.expectedTimestamp = expectedTimestamp; + this.expectedWatermark = expectedWatermark; + this.expectedProcessingTime = expectedProcessingTime; + } + + @Override + public String getBucketId(String element, Context context) { + final Long elementTimestamp = context.timestamp(); + final long watermark = context.currentWatermark(); + final long processingTime = context.currentProcessingTime(); + + Assert.assertEquals(expectedTimestamp, elementTimestamp); + Assert.assertEquals(expectedProcessingTime, processingTime); + Assert.assertEquals(expectedWatermark, watermark); + + return element; + } + + @Override + public SimpleVersionedSerializer<String> getSerializer() { + return SimpleVersionedStringSerializer.INSTANCE; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java new file mode 100644 index 0000000..7b6b82c --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +/** + * Tests for the {@link StreamingFileSink} with {@link BulkWriter}. + */ +public class BulkWriterTest extends TestLogger { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @Test + public void testCustomBulkWriter() throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + + // we set the max bucket size to small so that we can know when it rolls + try ( + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = + TestUtils.createTestSinkWithBulkEncoder( + outDir, + 1, + 0, + 10L, + new TestUtils.TupleToStringBucketer(), + new TestBulkWriterFactory(), + new DefaultBucketFactory<>()) + ) { + + testHarness.setup(); + testHarness.open(); + + // this creates a new bucket "test1" and part-0-0 + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L)); + TestUtils.checkLocalFs(outDir, 1, 0); + + // we take a checkpoint so we roll. + testHarness.snapshot(1L, 1L); + + // these will close part-0-0 and open part-0-1 + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L)); + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L)); + + // we take a checkpoint so we roll again. + testHarness.snapshot(2L, 2L); + + TestUtils.checkLocalFs(outDir, 2, 0); + + Map<File, String> contents = TestUtils.getFileContentByPath(outDir); + int fileCounter = 0; + for (Map.Entry<File, String> fileContents : contents.entrySet()) { + if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) { + fileCounter++; + Assert.assertEquals("test1@1\n", fileContents.getValue()); + } else if (fileContents.getKey().getName().contains(".part-0-1.inprogress")) { + fileCounter++; + Assert.assertEquals("test1@2\ntest1@3\n", fileContents.getValue()); + } + } + Assert.assertEquals(2L, fileCounter); + + // we acknowledge the latest checkpoint, so everything should be published. + testHarness.notifyOfCompletedCheckpoint(2L); + TestUtils.checkLocalFs(outDir, 0, 2); + } + } + + /** + * A {@link BulkWriter} used for the tests. + */ + private static class TestBulkWriter implements BulkWriter<Tuple2<String, Integer>> { + + private static final Charset CHARSET = StandardCharsets.UTF_8; + + private final FSDataOutputStream stream; + + TestBulkWriter(final FSDataOutputStream stream) { + this.stream = Preconditions.checkNotNull(stream); + } + + @Override + public void addElement(Tuple2<String, Integer> element) throws IOException { + stream.write((element.f0 + '@' + element.f1 + '\n').getBytes(CHARSET)); + } + + @Override + public void flush() throws IOException { + stream.flush(); + } + + @Override + public void finish() throws IOException { + flush(); + } + } + + /** + * A {@link BulkWriter.Factory} used for the tests. + */ + private static class TestBulkWriterFactory implements BulkWriter.Factory<Tuple2<String, Integer>> { + + private static final long serialVersionUID = 1L; + + @Override + public BulkWriter<Tuple2<String, Integer>> create(FSDataOutputStream out) { + return new TestBulkWriter(out); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java index b6f73ac..6e942e9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java @@ -18,20 +18,17 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; -import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; -import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.TestLogger; -import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -39,9 +36,6 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Collection; -import java.util.HashMap; import java.util.Map; /** @@ -56,12 +50,13 @@ public class LocalStreamingFileSinkTest extends TestLogger { public void testClosingWithoutInput() throws Exception { final File outDir = TEMP_FOLDER.newFolder(); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = - createRescalingTestSink(outDir, 1, 0, 100L, 124L); - testHarness.setup(); - testHarness.open(); - - testHarness.close(); + try ( + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = + TestUtils.createRescalingTestSink(outDir, 1, 0, 100L, 124L); + ) { + testHarness.setup(); + testHarness.open(); + } } @Test @@ -70,7 +65,7 @@ public class LocalStreamingFileSinkTest extends TestLogger { OperatorSubtaskState snapshot; // we set the max bucket size to small so that we can know when it rolls - try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink( + try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink( outDir, 1, 0, 100L, 10L)) { testHarness.setup(); @@ -78,7 +73,7 @@ public class LocalStreamingFileSinkTest extends TestLogger { // this creates a new bucket "test1" and part-0-0 testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L)); - checkLocalFs(outDir, 1, 0); + TestUtils.checkLocalFs(outDir, 1, 0); // we take a checkpoint so that we keep the in-progress file offset. snapshot = testHarness.snapshot(1L, 1L); @@ -87,9 +82,9 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L)); testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L)); - checkLocalFs(outDir, 2, 0); + TestUtils.checkLocalFs(outDir, 2, 0); - Map<File, String> contents = getFileContentByPath(outDir); + Map<File, String> contents = TestUtils.getFileContentByPath(outDir); int fileCounter = 0; for (Map.Entry<File, String> fileContents : contents.entrySet()) { if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) { @@ -103,7 +98,7 @@ public class LocalStreamingFileSinkTest extends TestLogger { Assert.assertEquals(2L, fileCounter); } - try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink( + try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink( outDir, 1, 0, 100L, 10L)) { testHarness.setup(); @@ -111,11 +106,11 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness.open(); // the in-progress is the not cleaned up one and the pending is truncated and finalized - checkLocalFs(outDir, 2, 0); + TestUtils.checkLocalFs(outDir, 2, 0); // now we go back to the first checkpoint so it should truncate part-0-0 and restart part-0-1 int fileCounter = 0; - for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) { + for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) { if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) { // truncated fileCounter++; @@ -132,7 +127,7 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L)); fileCounter = 0; - for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) { + for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) { if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) { fileCounter++; Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue()); @@ -145,16 +140,16 @@ public class LocalStreamingFileSinkTest extends TestLogger { Assert.assertEquals(2L, fileCounter); testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L)); - checkLocalFs(outDir, 3, 0); // the previous part-0-1 in progress is simply ignored (random extension) + TestUtils.checkLocalFs(outDir, 3, 0); // the previous part-0-1 in progress is simply ignored (random extension) testHarness.snapshot(2L, 2L); // this will close the new part-0-1 testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L)); - checkLocalFs(outDir, 3, 0); + TestUtils.checkLocalFs(outDir, 3, 0); fileCounter = 0; - for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) { + for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) { if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) { fileCounter++; Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue()); @@ -169,10 +164,10 @@ public class LocalStreamingFileSinkTest extends TestLogger { // this will publish part-0-0 testHarness.notifyOfCompletedCheckpoint(2L); - checkLocalFs(outDir, 2, 1); + TestUtils.checkLocalFs(outDir, 2, 1); fileCounter = 0; - for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) { + for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) { if (fileContents.getKey().getName().equals("part-0-0")) { fileCounter++; Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue()); @@ -192,7 +187,7 @@ public class LocalStreamingFileSinkTest extends TestLogger { final File outDir = TEMP_FOLDER.newFolder(); // we set the max bucket size to small so that we can know when it rolls - try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink( + try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink( outDir, 1, 0, 100L, 10L)) { testHarness.setup(); @@ -203,11 +198,11 @@ public class LocalStreamingFileSinkTest extends TestLogger { // these 2 create a new bucket "test1", with a .part-0-0.inprogress and also fill it testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L)); testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L)); - checkLocalFs(outDir, 1, 0); + TestUtils.checkLocalFs(outDir, 1, 0); // this will open .part-0-1.inprogress testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L)); - checkLocalFs(outDir, 2, 0); + TestUtils.checkLocalFs(outDir, 2, 0); // we take a checkpoint so that we keep the in-progress file offset. testHarness.snapshot(1L, 1L); @@ -218,13 +213,13 @@ public class LocalStreamingFileSinkTest extends TestLogger { // and open and fill .part-0-2.inprogress testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L)); testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L)); - checkLocalFs(outDir, 3, 0); // nothing committed yet + TestUtils.checkLocalFs(outDir, 3, 0); // nothing committed yet testHarness.snapshot(2L, 2L); // open .part-0-3.inprogress testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 7), 7L)); - checkLocalFs(outDir, 4, 0); + TestUtils.checkLocalFs(outDir, 4, 0); // this will close the part file (time) testHarness.setProcessingTime(101L); @@ -232,10 +227,10 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness.snapshot(3L, 3L); testHarness.notifyOfCompletedCheckpoint(1L); // the pending for checkpoint 1 are committed - checkLocalFs(outDir, 3, 1); + TestUtils.checkLocalFs(outDir, 3, 1); int fileCounter = 0; - for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) { + for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) { if (fileContents.getKey().getName().equals("part-0-0")) { fileCounter++; Assert.assertEquals("test1@1\ntest1@2\n", fileContents.getValue()); @@ -253,10 +248,10 @@ public class LocalStreamingFileSinkTest extends TestLogger { Assert.assertEquals(4L, fileCounter); testHarness.notifyOfCompletedCheckpoint(3L); // all the pending for checkpoint 2 and 3 are committed - checkLocalFs(outDir, 0, 4); + TestUtils.checkLocalFs(outDir, 0, 4); fileCounter = 0; - for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) { + for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) { if (fileContents.getKey().getName().equals("part-0-0")) { fileCounter++; Assert.assertEquals("test1@1\ntest1@2\n", fileContents.getValue()); @@ -280,7 +275,7 @@ public class LocalStreamingFileSinkTest extends TestLogger { final File outDir = TEMP_FOLDER.newFolder(); // we set a big bucket size so that it does not close by size, but by timers. - try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink( + try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink( outDir, 1, 0, 100L, 124L)) { testHarness.setup(); @@ -290,10 +285,10 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L)); testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L)); - checkLocalFs(outDir, 2, 0); + TestUtils.checkLocalFs(outDir, 2, 0); int bucketCounter = 0; - for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) { + for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) { if (fileContents.getKey().getParentFile().getName().equals("test1")) { bucketCounter++; } else if (fileContents.getKey().getParentFile().getName().equals("test2")) { @@ -303,10 +298,10 @@ public class LocalStreamingFileSinkTest extends TestLogger { Assert.assertEquals(2L, bucketCounter); // verifies that we have 2 buckets, "test1" and "test2" testHarness.setProcessingTime(101L); // put them in pending - checkLocalFs(outDir, 2, 0); + TestUtils.checkLocalFs(outDir, 2, 0); testHarness.snapshot(0L, 0L); // put them in pending for 0 - checkLocalFs(outDir, 2, 0); + TestUtils.checkLocalFs(outDir, 2, 0); // create another 2 buckets with 1 inprogress file each testHarness.processElement(new StreamRecord<>(Tuple2.of("test3", 1), 1L)); @@ -315,13 +310,13 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness.setProcessingTime(202L); // put them in pending testHarness.snapshot(1L, 0L); // put them in pending for 1 - checkLocalFs(outDir, 4, 0); + TestUtils.checkLocalFs(outDir, 4, 0); testHarness.notifyOfCompletedCheckpoint(0L); // put the pending for 0 to the "committed" state - checkLocalFs(outDir, 2, 2); + TestUtils.checkLocalFs(outDir, 2, 2); bucketCounter = 0; - for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) { + for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) { if (fileContents.getKey().getParentFile().getName().equals("test1")) { bucketCounter++; Assert.assertEquals("part-0-0", fileContents.getKey().getName()); @@ -339,10 +334,10 @@ public class LocalStreamingFileSinkTest extends TestLogger { Assert.assertEquals(4L, bucketCounter); testHarness.notifyOfCompletedCheckpoint(1L); // put the pending for 1 to the "committed" state - checkLocalFs(outDir, 0, 4); + TestUtils.checkLocalFs(outDir, 0, 4); bucketCounter = 0; - for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) { + for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) { if (fileContents.getKey().getParentFile().getName().equals("test1")) { bucketCounter++; Assert.assertEquals("test1@1\n", fileContents.getValue()); @@ -367,9 +362,10 @@ public class LocalStreamingFileSinkTest extends TestLogger { public void testClosingOnSnapshot() throws Exception { final File outDir = TEMP_FOLDER.newFolder(); - try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = - createRescalingTestSink(outDir, 1, 0, 100L, 2L)) { - + try ( + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = + TestUtils.createRescalingTestSink(outDir, 1, 0, 100L, 2L) + ) { testHarness.setup(); testHarness.open(); @@ -377,29 +373,29 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L)); testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L)); - checkLocalFs(outDir, 2, 0); + TestUtils.checkLocalFs(outDir, 2, 0); // this is to check the inactivity threshold testHarness.setProcessingTime(101L); - checkLocalFs(outDir, 2, 0); + TestUtils.checkLocalFs(outDir, 2, 0); testHarness.processElement(new StreamRecord<>(Tuple2.of("test3", 1), 1L)); - checkLocalFs(outDir, 3, 0); + TestUtils.checkLocalFs(outDir, 3, 0); testHarness.snapshot(0L, 1L); - checkLocalFs(outDir, 3, 0); + TestUtils.checkLocalFs(outDir, 3, 0); testHarness.notifyOfCompletedCheckpoint(0L); - checkLocalFs(outDir, 0, 3); + TestUtils.checkLocalFs(outDir, 0, 3); testHarness.snapshot(1L, 0L); testHarness.processElement(new StreamRecord<>(Tuple2.of("test4", 10), 10L)); - checkLocalFs(outDir, 1, 3); + TestUtils.checkLocalFs(outDir, 1, 3); } // at close it is not moved to final. - checkLocalFs(outDir, 1, 3); + TestUtils.checkLocalFs(outDir, 1, 3); } @Test @@ -408,11 +404,11 @@ public class LocalStreamingFileSinkTest extends TestLogger { OperatorSubtaskState mergedSnapshot; - // we set small file size so that the part file rolls. + // we set small file size so that the part file rolls on every element. try ( - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createRescalingTestSink( + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = TestUtils.createRescalingTestSink( outDir, 2, 0, 100L, 10L); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createRescalingTestSink( + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = TestUtils.createRescalingTestSink( outDir, 2, 1, 100L, 10L) ) { testHarness1.setup(); @@ -422,16 +418,16 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness2.open(); testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L)); - checkLocalFs(outDir, 1, 0); + TestUtils.checkLocalFs(outDir, 1, 0); testHarness2.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L)); testHarness2.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L)); // all the files are in-progress - checkLocalFs(outDir, 3, 0); + TestUtils.checkLocalFs(outDir, 3, 0); int counter = 0; - for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) { + for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) { final String parentFilename = fileContents.getKey().getParentFile().getName(); final String inProgressFilename = fileContents.getKey().getName(); @@ -456,7 +452,7 @@ public class LocalStreamingFileSinkTest extends TestLogger { } try ( - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink( + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink( outDir, 1, 0, 100L, 10L) ) { testHarness.setup(); @@ -464,13 +460,13 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness.open(); // still everything in-progress but the in-progress for prev task 1 should be put in pending now - checkLocalFs(outDir, 3, 0); + TestUtils.checkLocalFs(outDir, 3, 0); testHarness.snapshot(2L, 2L); testHarness.notifyOfCompletedCheckpoint(2L); int counter = 0; - for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) { + for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) { final String parentFilename = fileContents.getKey().getParentFile().getName(); final String filename = fileContents.getKey().getName(); @@ -498,11 +494,18 @@ public class LocalStreamingFileSinkTest extends TestLogger { final TestBucketFactory first = new TestBucketFactory(); final TestBucketFactory second = new TestBucketFactory(); + final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy + .create() + .withMaxPartSize(2L) + .withRolloverInterval(100L) + .withInactivityInterval(100L) + .build(); + try ( - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createCustomRescalingTestSink( - outDir, 2, 0, 100L, 2L, first, new SimpleStringEncoder<>()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createCustomRescalingTestSink( - outDir, 2, 1, 100L, 2L, second, new SimpleStringEncoder<>()) + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = TestUtils.createCustomRescalingTestSink( + outDir, 2, 0, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, first); + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = TestUtils.createCustomRescalingTestSink( + outDir, 2, 1, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, second) ) { testHarness1.setup(); testHarness1.open(); @@ -514,7 +517,7 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L)); testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L)); testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L)); - checkLocalFs(outDir, 3, 0); + TestUtils.checkLocalFs(outDir, 3, 0); // intentionally we snapshot them in the reverse order so that the states are shuffled mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( @@ -527,10 +530,10 @@ public class LocalStreamingFileSinkTest extends TestLogger { final TestBucketFactory secondRecovered = new TestBucketFactory(); try ( - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createCustomRescalingTestSink( - outDir, 2, 0, 100L, 2L, firstRecovered, new SimpleStringEncoder<>()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createCustomRescalingTestSink( - outDir, 2, 1, 100L, 2L, secondRecovered, new SimpleStringEncoder<>()) + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = TestUtils.createCustomRescalingTestSink( + outDir, 2, 0, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, firstRecovered); + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = TestUtils.createCustomRescalingTestSink( + outDir, 2, 1, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, secondRecovered) ) { testHarness1.setup(); testHarness1.initializeState(mergedSnapshot); @@ -540,7 +543,7 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness1.processElement(new StreamRecord<>(Tuple2.of("test4", 0), 0L)); Assert.assertEquals(3L, firstRecovered.getInitialCounter()); - checkLocalFs(outDir, 1, 3); + TestUtils.checkLocalFs(outDir, 1, 3); testHarness2.setup(); testHarness2.initializeState(mergedSnapshot); @@ -550,78 +553,26 @@ public class LocalStreamingFileSinkTest extends TestLogger { testHarness2.processElement(new StreamRecord<>(Tuple2.of("test2", 0), 0L)); Assert.assertEquals(3L, secondRecovered.getInitialCounter()); - checkLocalFs(outDir, 2, 3); + TestUtils.checkLocalFs(outDir, 2, 3); } } ////////////////////// Helper Methods ////////////////////// - private OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createRescalingTestSink( - File outDir, - int totalParallelism, - int taskIdx, - long inactivityInterval, - long partMaxSize) throws Exception { - - return createCustomRescalingTestSink( - outDir, - totalParallelism, - taskIdx, - inactivityInterval, - partMaxSize, - new DefaultBucketFactory<>(), - (Encoder<Tuple2<String, Integer>>) (element, stream) -> { - stream.write((element.f0 + '@' + element.f1).getBytes(StandardCharsets.UTF_8)); - stream.write('\n'); - }); - } - - private OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createCustomRescalingTestSink( - File outDir, - int totalParallelism, - int taskIdx, - long inactivityInterval, - long partMaxSize, - BucketFactory<Tuple2<String, Integer>> factory, - Encoder<Tuple2<String, Integer>> writer) throws Exception { - - StreamingFileSink<Tuple2<String, Integer>> sink = new StreamingFileSink<>(new Path(outDir.toURI()), factory) - .setBucketer(new Bucketer<Tuple2<String, Integer>>() { - - private static final long serialVersionUID = -3086487303018372007L; - - @Override - public String getBucketId(Tuple2<String, Integer> element, Context context) { - return element.f0; - } - }) - .setEncoder(writer) - .setRollingPolicy( - DefaultRollingPolicy - .create() - .withMaxPartSize(partMaxSize) - .withRolloverInterval(inactivityInterval) - .withInactivityInterval(inactivityInterval) - .build()) - .setBucketCheckInterval(10L); - - return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx); - } - - static class TestBucketFactory extends DefaultBucketFactory<Tuple2<String, Integer>> { + static class TestBucketFactory extends DefaultBucketFactory<Tuple2<String, Integer>, String> { private static final long serialVersionUID = 2794824980604027930L; private long initialCounter = -1L; @Override - public Bucket<Tuple2<String, Integer>> getNewBucket( - RecoverableWriter fsWriter, - int subtaskIndex, - String bucketId, - Path bucketPath, - long initialPartCounter, - Encoder<Tuple2<String, Integer>> writer) throws IOException { + public Bucket<Tuple2<String, Integer>, String> getNewBucket( + final RecoverableWriter fsWriter, + final int subtaskIndex, + final String bucketId, + final Path bucketPath, + final long initialPartCounter, + final PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> partFileWriterFactory) { this.initialCounter = initialPartCounter; @@ -631,16 +582,16 @@ public class LocalStreamingFileSinkTest extends TestLogger { bucketId, bucketPath, initialPartCounter, - writer); + partFileWriterFactory); } @Override - public Bucket<Tuple2<String, Integer>> restoreBucket( - RecoverableWriter fsWriter, - int subtaskIndex, - long initialPartCounter, - Encoder<Tuple2<String, Integer>> writer, - BucketState bucketState) throws IOException { + public Bucket<Tuple2<String, Integer>, String> restoreBucket( + final RecoverableWriter fsWriter, + final int subtaskIndex, + final long initialPartCounter, + final PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> partFileWriterFactory, + final BucketState<String> bucketState) throws IOException { this.initialCounter = initialPartCounter; @@ -648,7 +599,7 @@ public class LocalStreamingFileSinkTest extends TestLogger { fsWriter, subtaskIndex, initialPartCounter, - writer, + partFileWriterFactory, bucketState); } @@ -656,34 +607,4 @@ public class LocalStreamingFileSinkTest extends TestLogger { return initialCounter; } } - - private static void checkLocalFs(File outDir, int expectedInProgress, int expectedCompleted) { - int inProgress = 0; - int finished = 0; - - for (File file: FileUtils.listFiles(outDir, null, true)) { - if (file.getAbsolutePath().endsWith("crc")) { - continue; - } - - if (file.toPath().getFileName().toString().startsWith(".")) { - inProgress++; - } else { - finished++; - } - } - - Assert.assertEquals(expectedInProgress, inProgress); - Assert.assertEquals(expectedCompleted, finished); - } - - private static Map<File, String> getFileContentByPath(File directory) throws IOException { - Map<File, String> contents = new HashMap<>(4); - - final Collection<File> filesInBucket = FileUtils.listFiles(directory, null, true); - for (File file : filesInBucket) { - contents.put(file, FileUtils.readFileToString(file)); - } - return contents; - } }
