[FLINK-9903] [DataStream API] Refactor StreamingFileSink / add bulk encoders
* Add supports for bulk encoders. * Expose more options in the rolling policy and * Allows to return any object as bucket id from the bucketer. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b56c75ca Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b56c75ca Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b56c75ca Branch: refs/heads/master Commit: b56c75ca375049b1d2c80d2d0945ae1ae04eb39e Parents: d309e61 Author: kkloudas <[email protected]> Authored: Tue Jul 17 11:52:02 2018 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Jul 20 16:12:29 2018 +0200 ---------------------------------------------------------------------- .../api/common/serialization/BulkWriter.java | 95 ++++ .../core/fs/SafetyNetWrapperFileSystem.java | 5 + .../api/functions/sink/filesystem/Bucket.java | 48 +- .../sink/filesystem/BucketFactory.java | 29 +- .../functions/sink/filesystem/BucketState.java | 12 +- .../sink/filesystem/BucketStateSerializer.java | 39 +- .../api/functions/sink/filesystem/Buckets.java | 339 ++++++++++++++ .../sink/filesystem/BulkPartWriter.java | 110 +++++ .../sink/filesystem/DefaultBucketFactory.java | 35 +- .../sink/filesystem/DefaultRollingPolicy.java | 142 ------ .../sink/filesystem/PartFileHandler.java | 122 ----- .../functions/sink/filesystem/PartFileInfo.java | 4 +- .../sink/filesystem/PartFileWriter.java | 141 ++++++ .../sink/filesystem/RollingPolicy.java | 20 +- .../sink/filesystem/RowWisePartWriter.java | 96 ++++ .../sink/filesystem/StreamingFileSink.java | 457 ++++++++----------- .../filesystem/bucketers/BasePathBucketer.java | 11 +- .../sink/filesystem/bucketers/Bucketer.java | 22 +- .../filesystem/bucketers/DateTimeBucketer.java | 12 +- .../SimpleVersionedStringSerializer.java | 77 ++++ .../rolling/policies/DefaultRollingPolicy.java | 157 +++++++ .../policies/OnCheckpointRollingPolicy.java | 45 ++ .../filesystem/BucketStateSerializerTest.java | 47 +- .../functions/sink/filesystem/BucketsTest.java | 119 +++++ .../sink/filesystem/BulkWriterTest.java | 144 ++++++ .../filesystem/LocalStreamingFileSinkTest.java | 267 ++++------- .../sink/filesystem/RollingPolicyTest.java | 225 +++++++++ .../functions/sink/filesystem/TestUtils.java | 164 +++++++ 28 files changed, 2177 insertions(+), 807 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java new file mode 100644 index 0000000..44f5fbe --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.fs.FSDataOutputStream; + +import java.io.IOException; +import java.io.Serializable; + +/** + * An encoder that encodes data in a bulk fashion, encoding many records together at a time. + * + * <p>Examples for bulk encoding are most compressed formats, including formats like + * Parquet and ORC which encode batches of records into blocks of column vectors. + * + * <p>The bulk encoder may be stateful and is bound to a single stream during its + * lifetime. + * + * @param <T> The type of the elements encoded through this encoder. + */ +@PublicEvolving +public interface BulkWriter<T> { + + /** + * Adds an element to the encoder. The encoder may temporarily buffer the element, + * or immediately write it to the stream. + * + * <p>It may be that adding this element fills up an internal buffer and causes the + * encoding and flushing of a batch of internally buffered elements. + * + * @param element The element to add. + * @throws IOException Thrown, if the element cannot be added to the encoder, + * or if the output stream throws an exception. + */ + void addElement(T element) throws IOException; + + /** + * Flushes all intermediate buffered data to the output stream. + * It is expected that flushing often may reduce the efficiency of the encoding. + * + * @throws IOException Thrown if the encoder cannot be flushed, or if the output + * stream throws an exception. + */ + void flush() throws IOException; + + /** + * Finishes the writing. This must flush all internal buffer, finish encoding, and write + * footers. + * + * <p>The writer is not expected to handle any more records via {@link #addElement(Object)} after + * this method is called. + * + * <p><b>Important:</b> This method MUST NOT close the stream that the writer writes to. + * Closing the stream is expected to happen through the invoker of this method afterwards. + * + * @throws IOException Thrown if the finalization fails. + */ + void finish() throws IOException; + + // ------------------------------------------------------------------------ + + /** + * A factory that creates a {@link BulkWriter}. + * @param <T> The type of record to write. + */ + @FunctionalInterface + interface Factory<T> extends Serializable { + + /** + * Creates a writer that writes to the given stream. + * + * @param out The output stream to write the encoded data to. + * @throws IOException Thrown if the writer cannot be opened, or if the output + * stream throws an exception. + */ + BulkWriter<T> create(FSDataOutputStream out) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java index 92b3a74c..04e6315 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java @@ -65,6 +65,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr } @Override + public RecoverableWriter createRecoverableWriter() throws IOException { + return unsafeFileSystem.createRecoverableWriter(); + } + + @Override public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { return unsafeFileSystem.getFileBlockLocations(file, start, len); } http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java index d9a6d75..3e2d22c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.util.Preconditions; @@ -30,6 +29,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; /** * A bucket is the directory organization of the output of the {@link StreamingFileSink}. @@ -39,17 +39,17 @@ import java.util.Map; * queried to see in which bucket this element should be written to. */ @PublicEvolving -public class Bucket<IN> { +public class Bucket<IN, BucketID> { private static final String PART_PREFIX = "part"; - private final String bucketId; + private final BucketID bucketId; private final Path bucketPath; private final int subtaskIndex; - private final Encoder<IN> encoder; + private final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory; private final RecoverableWriter fsWriter; @@ -57,7 +57,7 @@ public class Bucket<IN> { private long partCounter; - private PartFileHandler<IN> currentPart; + private PartFileWriter<IN, BucketID> currentPart; private List<RecoverableWriter.CommitRecoverable> pending; @@ -68,10 +68,10 @@ public class Bucket<IN> { RecoverableWriter fsWriter, int subtaskIndex, long initialPartCounter, - Encoder<IN> writer, - BucketState bucketstate) throws IOException { + PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory, + BucketState<BucketID> bucketState) throws IOException { - this(fsWriter, subtaskIndex, bucketstate.getBucketId(), bucketstate.getBucketPath(), initialPartCounter, writer); + this(fsWriter, subtaskIndex, bucketState.getBucketId(), bucketState.getBucketPath(), initialPartCounter, partFileFactory); // the constructor must have already initialized the filesystem writer Preconditions.checkState(fsWriter != null); @@ -79,15 +79,15 @@ public class Bucket<IN> { // we try to resume the previous in-progress file, if the filesystem // supports such operation. If not, we just commit the file and start fresh. - final RecoverableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress(); + final RecoverableWriter.ResumeRecoverable resumable = bucketState.getInProgress(); if (resumable != null) { - currentPart = PartFileHandler.resumeFrom( - bucketId, fsWriter, resumable, bucketstate.getCreationTime()); + currentPart = partFileFactory.resumeFrom( + bucketId, fsWriter, resumable, bucketState.getCreationTime()); } // we commit pending files for previous checkpoints to the last successful one // (from which we are recovering from) - for (List<RecoverableWriter.CommitRecoverable> commitables: bucketstate.getPendingPerCheckpoint().values()) { + for (List<RecoverableWriter.CommitRecoverable> commitables: bucketState.getPendingPerCheckpoint().values()) { for (RecoverableWriter.CommitRecoverable commitable: commitables) { fsWriter.recoverForCommit(commitable).commitAfterRecovery(); } @@ -100,26 +100,26 @@ public class Bucket<IN> { public Bucket( RecoverableWriter fsWriter, int subtaskIndex, - String bucketId, + BucketID bucketId, Path bucketPath, long initialPartCounter, - Encoder<IN> writer) { + PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory) { this.fsWriter = Preconditions.checkNotNull(fsWriter); this.subtaskIndex = subtaskIndex; this.bucketId = Preconditions.checkNotNull(bucketId); this.bucketPath = Preconditions.checkNotNull(bucketPath); this.partCounter = initialPartCounter; - this.encoder = Preconditions.checkNotNull(writer); + this.partFileFactory = Preconditions.checkNotNull(partFileFactory); this.pending = new ArrayList<>(); } - public PartFileInfo getInProgressPartInfo() { + public PartFileInfo<BucketID> getInProgressPartInfo() { return currentPart; } - public String getBucketId() { + public BucketID getBucketId() { return bucketId; } @@ -137,18 +137,18 @@ public class Bucket<IN> { void write(IN element, long currentTime) throws IOException { Preconditions.checkState(currentPart != null, "bucket has been closed"); - currentPart.write(element, encoder, currentTime); + currentPart.write(element, currentTime); } void rollPartFile(final long currentTime) throws IOException { closePartFile(); - currentPart = PartFileHandler.openNew(bucketId, fsWriter, getNewPartPath(), currentTime); + currentPart = partFileFactory.openNew(bucketId, fsWriter, getNewPartPath(), currentTime); partCounter++; } - void merge(final Bucket<IN> bucket) throws IOException { + void merge(final Bucket<IN, BucketID> bucket) throws IOException { Preconditions.checkNotNull(bucket); - Preconditions.checkState(bucket.getBucketPath().equals(getBucketPath())); + Preconditions.checkState(Objects.equals(bucket.getBucketPath(), bucketPath)); // there should be no pending files in the "to-merge" states. Preconditions.checkState(bucket.pending.isEmpty()); @@ -176,7 +176,7 @@ public class Bucket<IN> { } } - public void commitUpToCheckpoint(long checkpointId) throws IOException { + public void onCheckpointAcknowledgment(long checkpointId) throws IOException { Preconditions.checkNotNull(fsWriter); Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it = @@ -193,7 +193,7 @@ public class Bucket<IN> { } } - public BucketState snapshot(long checkpointId) throws IOException { + public BucketState<BucketID> onCheckpoint(long checkpointId) throws IOException { RecoverableWriter.ResumeRecoverable resumable = null; long creationTime = Long.MAX_VALUE; @@ -206,7 +206,7 @@ public class Bucket<IN> { pendingPerCheckpoint.put(checkpointId, pending); pending = new ArrayList<>(); } - return new BucketState(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint); + return new BucketState<>(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint); } private Path getNewPartPath() { http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java index 88f3c1a..0c6b587 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableWriter; @@ -30,20 +29,20 @@ import java.io.Serializable; * A factory able to create {@link Bucket buckets} for the {@link StreamingFileSink}. */ @Internal -public interface BucketFactory<IN> extends Serializable { +interface BucketFactory<IN, BucketID> extends Serializable { - Bucket<IN> getNewBucket( - RecoverableWriter fsWriter, - int subtaskIndex, - String bucketId, - Path bucketPath, - long initialPartCounter, - Encoder<IN> writer) throws IOException; + Bucket<IN, BucketID> getNewBucket( + final RecoverableWriter fsWriter, + final int subtaskIndex, + final BucketID bucketId, + final Path bucketPath, + final long initialPartCounter, + final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory) throws IOException; - Bucket<IN> restoreBucket( - RecoverableWriter fsWriter, - int subtaskIndex, - long initialPartCounter, - Encoder<IN> writer, - BucketState bucketstate) throws IOException; + Bucket<IN, BucketID> restoreBucket( + final RecoverableWriter fsWriter, + final int subtaskIndex, + final long initialPartCounter, + final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory, + final BucketState<BucketID> bucketState) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java index 5ebc46c..bb49e3a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java @@ -32,9 +32,9 @@ import java.util.Map; * The state of the {@link Bucket} that is to be checkpointed. */ @Internal -public class BucketState { +public class BucketState<BucketID> { - private final String bucketId; + private final BucketID bucketId; /** * The base path for the bucket, i.e. the directory where all the part files are stored. @@ -59,10 +59,10 @@ public class BucketState { private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint; public BucketState( - final String bucketId, + final BucketID bucketId, final Path bucketPath, final long creationTime, - final @Nullable RecoverableWriter.ResumeRecoverable inProgress, + @Nullable final RecoverableWriter.ResumeRecoverable inProgress, final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint ) { this.bucketId = Preconditions.checkNotNull(bucketId); @@ -72,7 +72,7 @@ public class BucketState { this.pendingPerCheckpoint = Preconditions.checkNotNull(pendingPerCheckpoint); } - public String getBucketId() { + public BucketID getBucketId() { return bucketId; } @@ -85,7 +85,7 @@ public class BucketState { } @Nullable - public RecoverableWriter.ResumeRecoverable getCurrentInProgress() { + public RecoverableWriter.ResumeRecoverable getInProgress() { return inProgress; } http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java index a167ec9..cf9b805 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableWriter; -import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; @@ -42,7 +41,7 @@ import java.util.Map.Entry; * A {@code SimpleVersionedSerializer} used to serialize the {@link BucketState BucketState}. */ @Internal -class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> { +class BucketStateSerializer<BucketID> implements SimpleVersionedSerializer<BucketState<BucketID>> { private static final int MAGIC_NUMBER = 0x1e764b79; @@ -50,12 +49,16 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> { private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer; - public BucketStateSerializer( - final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer, - final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer) { + private final SimpleVersionedSerializer<BucketID> bucketIdSerializer; + BucketStateSerializer( + final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer, + final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer, + final SimpleVersionedSerializer<BucketID> bucketIdSerializer + ) { this.resumableSerializer = Preconditions.checkNotNull(resumableSerializer); this.commitableSerializer = Preconditions.checkNotNull(commitableSerializer); + this.bucketIdSerializer = Preconditions.checkNotNull(bucketIdSerializer); } @Override @@ -64,7 +67,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> { } @Override - public byte[] serialize(BucketState state) throws IOException { + public byte[] serialize(BucketState<BucketID> state) throws IOException { DataOutputSerializer out = new DataOutputSerializer(256); out.writeInt(MAGIC_NUMBER); serializeV1(state, out); @@ -72,7 +75,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> { } @Override - public BucketState deserialize(int version, byte[] serialized) throws IOException { + public BucketState<BucketID> deserialize(int version, byte[] serialized) throws IOException { switch (version) { case 1: DataInputDeserializer in = new DataInputDeserializer(serialized); @@ -84,13 +87,13 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> { } @VisibleForTesting - void serializeV1(BucketState state, DataOutputView out) throws IOException { - out.writeUTF(state.getBucketId()); + void serializeV1(BucketState<BucketID> state, DataOutputView out) throws IOException { + SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), out); out.writeUTF(state.getBucketPath().toString()); out.writeLong(state.getCreationTime()); // put the current open part file - final RecoverableWriter.ResumeRecoverable currentPart = state.getCurrentInProgress(); + final RecoverableWriter.ResumeRecoverable currentPart = state.getInProgress(); if (currentPart != null) { out.writeBoolean(true); SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, currentPart, out); @@ -100,19 +103,19 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> { } // put the map of pending files per checkpoint - final Map<Long, List<CommitRecoverable>> pendingCommitters = state.getPendingPerCheckpoint(); + final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommitters = state.getPendingPerCheckpoint(); // manually keep the version here to safe some bytes out.writeInt(commitableSerializer.getVersion()); out.writeInt(pendingCommitters.size()); - for (Entry<Long, List<CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) { - List<CommitRecoverable> resumables = resumablesForCheckpoint.getValue(); + for (Entry<Long, List<RecoverableWriter.CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) { + List<RecoverableWriter.CommitRecoverable> resumables = resumablesForCheckpoint.getValue(); out.writeLong(resumablesForCheckpoint.getKey()); out.writeInt(resumables.size()); - for (CommitRecoverable resumable : resumables) { + for (RecoverableWriter.CommitRecoverable resumable : resumables) { byte[] serialized = commitableSerializer.serialize(resumable); out.writeInt(serialized.length); out.write(serialized); @@ -121,8 +124,8 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> { } @VisibleForTesting - BucketState deserializeV1(DataInputView in) throws IOException { - final String bucketId = in.readUTF(); + BucketState<BucketID> deserializeV1(DataInputView in) throws IOException { + final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, in); final String bucketPathStr = in.readUTF(); final long creationTime = in.readLong(); @@ -140,7 +143,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> { final long checkpointId = in.readLong(); final int noOfResumables = in.readInt(); - final ArrayList<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables); + final List<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables); for (int j = 0; j < noOfResumables; j++) { final byte[] bytes = new byte[in.readInt()]; in.readFully(bytes); @@ -149,7 +152,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> { resumablesPerCheckpoint.put(checkpointId, resumables); } - return new BucketState( + return new BucketState<>( bucketId, new Path(bucketPathStr), creationTime, http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java new file mode 100644 index 0000000..6afba17 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * The manager of the different active buckets in the {@link StreamingFileSink}. + * + * <p>This class is responsible for all bucket-related operations and the actual + * {@link StreamingFileSink} is just plugging in the functionality offered by + * this class to the lifecycle of the operator. + * + * @param <IN> The type of input elements. + * @param <BucketID> The type of ids for the buckets, as returned by the {@link Bucketer}. + */ +public class Buckets<IN, BucketID> { + + private static final Logger LOG = LoggerFactory.getLogger(Buckets.class); + + // ------------------------ configuration fields -------------------------- + + private final Path basePath; + + private final BucketFactory<IN, BucketID> bucketFactory; + + private final Bucketer<IN, BucketID> bucketer; + + private final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory; + + private final RollingPolicy<BucketID> rollingPolicy; + + // --------------------------- runtime fields ----------------------------- + + private final int subtaskIndex; + + private final Buckets.BucketerContext bucketerContext; + + private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets; + + private long initMaxPartCounter; + + private long maxPartCounterUsed; + + private final RecoverableWriter fileSystemWriter; + + // --------------------------- State Related Fields ----------------------------- + + private final BucketStateSerializer<BucketID> bucketStateSerializer; + + /** + * A private constructor creating a new empty bucket manager. + * + * @param basePath The base path for our buckets. + * @param bucketer The {@link Bucketer} provided by the user. + * @param bucketFactory The {@link BucketFactory} to be used to create buckets. + * @param partFileWriterFactory The {@link PartFileWriter.PartFileFactory} to be used when writing data. + * @param rollingPolicy The {@link RollingPolicy} as specified by the user. + */ + Buckets( + final Path basePath, + final Bucketer<IN, BucketID> bucketer, + final BucketFactory<IN, BucketID> bucketFactory, + final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory, + final RollingPolicy<BucketID> rollingPolicy, + final int subtaskIndex) throws IOException { + + this.basePath = Preconditions.checkNotNull(basePath); + this.bucketer = Preconditions.checkNotNull(bucketer); + this.bucketFactory = Preconditions.checkNotNull(bucketFactory); + this.partFileWriterFactory = Preconditions.checkNotNull(partFileWriterFactory); + this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy); + this.subtaskIndex = subtaskIndex; + + this.activeBuckets = new HashMap<>(); + this.bucketerContext = new Buckets.BucketerContext(); + + this.fileSystemWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter(); + this.bucketStateSerializer = new BucketStateSerializer<>( + fileSystemWriter.getResumeRecoverableSerializer(), + fileSystemWriter.getCommitRecoverableSerializer(), + bucketer.getSerializer() + ); + + this.initMaxPartCounter = 0L; + this.maxPartCounterUsed = 0L; + } + + /** + * Initializes the state after recovery from a failure. + * @param bucketStates the state holding recovered state about active buckets. + * @param partCounterState the state holding the max previously used part counters. + * @throws Exception + */ + void initializeState(final ListState<byte[]> bucketStates, final ListState<Long> partCounterState) throws Exception { + + // When resuming after a failure: + // 1) we get the max part counter used before in order to make sure that we do not overwrite valid data + // 2) we commit any pending files for previous checkpoints (previous to the last successful one) + // 3) we resume writing to the previous in-progress file of each bucket, and + // 4) if we receive multiple states for the same bucket, we merge them. + + // get the max counter + long maxCounter = 0L; + for (long partCounter: partCounterState.get()) { + maxCounter = Math.max(partCounter, maxCounter); + } + initMaxPartCounter = maxCounter; + + // get the restored buckets + for (byte[] recoveredState : bucketStates.get()) { + final BucketState<BucketID> bucketState = SimpleVersionedSerialization.readVersionAndDeSerialize( + bucketStateSerializer, recoveredState); + + final BucketID bucketId = bucketState.getBucketId(); + + LOG.info("Recovered bucket for {}", bucketId); + + final Bucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket( + fileSystemWriter, + subtaskIndex, + initMaxPartCounter, + partFileWriterFactory, + bucketState + ); + + final Bucket<IN, BucketID> existingBucket = activeBuckets.get(bucketId); + if (existingBucket == null) { + activeBuckets.put(bucketId, restoredBucket); + } else { + existingBucket.merge(restoredBucket); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("{} idx {} restored state for bucket {}", getClass().getSimpleName(), + subtaskIndex, assembleBucketPath(bucketId)); + } + } + } + + void publishUpToCheckpoint(long checkpointId) throws IOException { + final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt = + activeBuckets.entrySet().iterator(); + + while (activeBucketIt.hasNext()) { + Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue(); + bucket.onCheckpointAcknowledgment(checkpointId); + + if (!bucket.isActive()) { + // We've dealt with all the pending files and the writer for this bucket is not currently open. + // Therefore this bucket is currently inactive and we can remove it from our state. + activeBucketIt.remove(); + } + } + } + + void snapshotState( + final long checkpointId, + final long checkpointTimestamp, + final ListState<byte[]> bucketStates, + final ListState<Long> partCounterState) throws Exception { + + Preconditions.checkState( + fileSystemWriter != null && bucketStateSerializer != null, + "sink has not been initialized" + ); + + for (Bucket<IN, BucketID> bucket : activeBuckets.values()) { + final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo(); + + if (info != null && + (rollingPolicy.shouldRollOnCheckpoint(info) || + rollingPolicy.shouldRollOnEvent(info) || + rollingPolicy.shouldRollOnProcessingTime(info, checkpointTimestamp)) + ) { + // we also check here so that we do not have to always + // wait for the "next" element to arrive. + bucket.closePartFile(); + } + + final BucketState<BucketID> bucketState = bucket.onCheckpoint(checkpointId); + bucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState)); + } + + partCounterState.add(maxPartCounterUsed); + } + + /** + * Called on every incoming element to write it to its final location. + * @param value the element itself. + * @param context the {@link SinkFunction.Context context} available to the sink function. + * @throws Exception + */ + void onElement(IN value, SinkFunction.Context context) throws Exception { + final long currentProcessingTime = context.currentProcessingTime(); + + // setting the values in the bucketer context + bucketerContext.update( + context.timestamp(), + context.currentWatermark(), + currentProcessingTime); + + final BucketID bucketId = bucketer.getBucketId(value, bucketerContext); + + Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId); + if (bucket == null) { + final Path bucketPath = assembleBucketPath(bucketId); + bucket = bucketFactory.getNewBucket( + fileSystemWriter, + subtaskIndex, + bucketId, + bucketPath, + initMaxPartCounter, + partFileWriterFactory); + activeBuckets.put(bucketId, bucket); + } + + final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo(); + if (info == null || rollingPolicy.shouldRollOnEvent(info)) { + bucket.rollPartFile(currentProcessingTime); + } + bucket.write(value, currentProcessingTime); + + // we update the counter here because as buckets become inactive and + // get removed in the initializeState(), at the time we snapshot they + // may not be there to take them into account during checkpointing. + updateMaxPartCounter(bucket.getPartCounter()); + } + + void onProcessingTime(long timestamp) throws Exception { + for (Bucket<IN, BucketID> bucket : activeBuckets.values()) { + final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo(); + if (info != null && rollingPolicy.shouldRollOnProcessingTime(info, timestamp)) { + bucket.closePartFile(); + } + } + } + + void close() { + if (activeBuckets != null) { + activeBuckets.values().forEach(Bucket::dispose); + } + } + + /** + * Assembles the final bucket {@link Path} that will be used for the provided bucket in the + * underlying filesystem. + * @param bucketId the id of the bucket as returned by the {@link Bucketer}. + * @return The resulting path. + */ + private Path assembleBucketPath(BucketID bucketId) { + return new Path(basePath, bucketId.toString()); + } + + /** + * Updates the state keeping track of the maximum used part + * counter across all local active buckets. + * @param candidate the part counter that will potentially replace the current {@link #maxPartCounterUsed}. + */ + private void updateMaxPartCounter(long candidate) { + maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate); + } + + /** + * The {@link Bucketer.Context} exposed to the + * {@link Bucketer#getBucketId(Object, Bucketer.Context)} + * whenever a new incoming element arrives. + */ + private static final class BucketerContext implements Bucketer.Context { + + @Nullable + private Long elementTimestamp; + + private long currentWatermark; + + private long currentProcessingTime; + + private BucketerContext() { + this.elementTimestamp = null; + this.currentWatermark = Long.MIN_VALUE; + this.currentProcessingTime = Long.MIN_VALUE; + } + + void update(@Nullable Long element, long watermark, long processingTime) { + this.elementTimestamp = element; + this.currentWatermark = watermark; + this.currentProcessingTime = processingTime; + } + + @Override + public long currentProcessingTime() { + return currentProcessingTime; + } + + @Override + public long currentWatermark() { + return currentWatermark; + } + + @Override + @Nullable + public Long timestamp() { + return elementTimestamp; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java new file mode 100644 index 0000000..558b1bf --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A {@link PartFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}. + * This also implements the {@link PartFileInfo}. + */ +@Internal +final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> { + + private final BulkWriter<IN> writer; + + private BulkPartWriter( + final BucketID bucketId, + final RecoverableFsDataOutputStream currentPartStream, + final BulkWriter<IN> writer, + final long creationTime) { + super(bucketId, currentPartStream, creationTime); + this.writer = Preconditions.checkNotNull(writer); + } + + @Override + void write(IN element, long currentTime) throws IOException { + writer.addElement(element); + markWrite(currentTime); + } + + @Override + RecoverableWriter.ResumeRecoverable persist() { + throw new UnsupportedOperationException("Bulk Part Writers do not support \"pause and resume\" operations."); + } + + @Override + RecoverableWriter.CommitRecoverable closeForCommit() throws IOException { + writer.flush(); + writer.finish(); + return super.closeForCommit(); + } + + /** + * A factory that creates {@link BulkPartWriter BulkPartWriters}. + * @param <IN> The type of input elements. + * @param <BucketID> The type of ids for the buckets, as returned by the {@link Bucketer}. + */ + static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> { + + private final BulkWriter.Factory<IN> writerFactory; + + Factory(BulkWriter.Factory<IN> writerFactory) { + this.writerFactory = writerFactory; + } + + @Override + public PartFileWriter<IN, BucketID> resumeFrom( + final BucketID bucketId, + final RecoverableWriter fileSystemWriter, + final RecoverableWriter.ResumeRecoverable resumable, + final long creationTime) throws IOException { + + Preconditions.checkNotNull(fileSystemWriter); + Preconditions.checkNotNull(resumable); + + final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable); + final BulkWriter<IN> writer = writerFactory.create(stream); + return new BulkPartWriter<>(bucketId, stream, writer, creationTime); + } + + @Override + public PartFileWriter<IN, BucketID> openNew( + final BucketID bucketId, + final RecoverableWriter fileSystemWriter, + final Path path, + final long creationTime) throws IOException { + + Preconditions.checkNotNull(fileSystemWriter); + Preconditions.checkNotNull(path); + + final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path); + final BulkWriter<IN> writer = writerFactory.create(stream); + return new BulkPartWriter<>(bucketId, stream, writer, creationTime); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java index 795ba74..532138f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableWriter; @@ -29,18 +28,18 @@ import java.io.IOException; * A factory returning {@link Bucket buckets}. */ @Internal -public class DefaultBucketFactory<IN> implements BucketFactory<IN> { +class DefaultBucketFactory<IN, BucketID> implements BucketFactory<IN, BucketID> { - private static final long serialVersionUID = 3372881359208513357L; + private static final long serialVersionUID = 1L; @Override - public Bucket<IN> getNewBucket( - RecoverableWriter fsWriter, - int subtaskIndex, - String bucketId, - Path bucketPath, - long initialPartCounter, - Encoder<IN> writer) throws IOException { + public Bucket<IN, BucketID> getNewBucket( + final RecoverableWriter fsWriter, + final int subtaskIndex, + final BucketID bucketId, + final Path bucketPath, + final long initialPartCounter, + final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory) { return new Bucket<>( fsWriter, @@ -48,22 +47,22 @@ public class DefaultBucketFactory<IN> implements BucketFactory<IN> { bucketId, bucketPath, initialPartCounter, - writer); + partFileWriterFactory); } @Override - public Bucket<IN> restoreBucket( - RecoverableWriter fsWriter, - int subtaskIndex, - long initialPartCounter, - Encoder<IN> writer, - BucketState bucketState) throws IOException { + public Bucket<IN, BucketID> restoreBucket( + final RecoverableWriter fsWriter, + final int subtaskIndex, + final long initialPartCounter, + final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory, + final BucketState<BucketID> bucketState) throws IOException { return new Bucket<>( fsWriter, subtaskIndex, initialPartCounter, - writer, + partFileWriterFactory, bucketState); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java deleted file mode 100644 index 026ac70..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.functions.sink.filesystem; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; - -/** - * The default implementation of the {@link RollingPolicy}. - * - * <p>This policy rolls a part file if: - * <ol> - * <li>there is no open part file,</li> - * <li>the current file has reached the maximum bucket size (by default 128MB),</li> - * <li>the current file is older than the roll over interval (by default 60 sec), or</li> - * <li>the current file has not been written to for more than the allowed inactivityTime (by default 60 sec).</li> - * </ol> - */ -@PublicEvolving -public final class DefaultRollingPolicy implements RollingPolicy { - - private static final long serialVersionUID = 1318929857047767030L; - - private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L; - - private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L; - - private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L; - - private final long partSize; - - private final long rolloverInterval; - - private final long inactivityInterval; - - /** - * Private constructor to avoid direct instantiation. - */ - private DefaultRollingPolicy(long partSize, long rolloverInterval, long inactivityInterval) { - Preconditions.checkArgument(partSize > 0L); - Preconditions.checkArgument(rolloverInterval > 0L); - Preconditions.checkArgument(inactivityInterval > 0L); - - this.partSize = partSize; - this.rolloverInterval = rolloverInterval; - this.inactivityInterval = inactivityInterval; - } - - @Override - public boolean shouldRoll(final PartFileInfo state, final long currentTime) throws IOException { - if (state == null) { - // this means that there is no currently open part file. - return true; - } - - if (state.getSize() > partSize) { - return true; - } - - if (currentTime - state.getCreationTime() > rolloverInterval) { - return true; - } - - return currentTime - state.getLastUpdateTime() > inactivityInterval; - } - - /** - * Initiates the instantiation of a {@link DefaultRollingPolicy}. - * To finalize it and have the actual policy, call {@code .create()}. - */ - public static PolicyBuilder create() { - return new PolicyBuilder(); - } - - /** - * A helper class that holds the configuration properties for the {@link DefaultRollingPolicy}. - */ - @PublicEvolving - public static class PolicyBuilder { - - private long partSize = DEFAULT_MAX_PART_SIZE; - - private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL; - - private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL; - - /** - * Sets the part size above which a part file will have to roll. - * @param size the allowed part size. - */ - public PolicyBuilder withMaxPartSize(long size) { - Preconditions.checkState(size > 0L); - this.partSize = size; - return this; - } - - /** - * Sets the interval of allowed inactivity after which a part file will have to roll. - * @param interval the allowed inactivity interval. - */ - public PolicyBuilder withInactivityInterval(long interval) { - Preconditions.checkState(interval > 0L); - this.inactivityInterval = interval; - return this; - } - - /** - * Sets the max time a part file can stay open before having to roll. - * @param interval the desired rollover interval. - */ - public PolicyBuilder withRolloverInterval(long interval) { - Preconditions.checkState(interval > 0L); - this.rolloverInterval = interval; - return this; - } - - /** - * Creates the actual policy. - */ - public DefaultRollingPolicy build() { - return new DefaultRollingPolicy(partSize, rolloverInterval, inactivityInterval); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java deleted file mode 100644 index 10fd12b..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.functions.sink.filesystem; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.serialization.Encoder; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.fs.RecoverableFsDataOutputStream; -import org.apache.flink.core.fs.RecoverableWriter; -import org.apache.flink.util.IOUtils; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; - -/** - * A handler for the currently open part file in a specific {@link Bucket}. - * This also implements the {@link PartFileInfo}. - */ -@Internal -class PartFileHandler<IN> implements PartFileInfo { - - private final String bucketId; - - private final long creationTime; - - private final RecoverableFsDataOutputStream currentPartStream; - - private long lastUpdateTime; - - private PartFileHandler( - final String bucketId, - final RecoverableFsDataOutputStream currentPartStream, - final long creationTime) { - - Preconditions.checkArgument(creationTime >= 0L); - this.bucketId = Preconditions.checkNotNull(bucketId); - this.currentPartStream = Preconditions.checkNotNull(currentPartStream); - this.creationTime = creationTime; - this.lastUpdateTime = creationTime; - } - - public static <IN> PartFileHandler<IN> resumeFrom( - final String bucketId, - final RecoverableWriter fileSystemWriter, - final RecoverableWriter.ResumeRecoverable resumable, - final long creationTime) throws IOException { - Preconditions.checkNotNull(bucketId); - Preconditions.checkNotNull(fileSystemWriter); - Preconditions.checkNotNull(resumable); - - final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable); - return new PartFileHandler<>(bucketId, stream, creationTime); - } - - public static <IN> PartFileHandler<IN> openNew( - final String bucketId, - final RecoverableWriter fileSystemWriter, - final Path path, - final long creationTime) throws IOException { - Preconditions.checkNotNull(bucketId); - Preconditions.checkNotNull(fileSystemWriter); - Preconditions.checkNotNull(path); - - final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path); - return new PartFileHandler<>(bucketId, stream, creationTime); - } - - void write(IN element, Encoder<IN> encoder, long currentTime) throws IOException { - encoder.encode(element, currentPartStream); - this.lastUpdateTime = currentTime; - } - - RecoverableWriter.ResumeRecoverable persist() throws IOException { - return currentPartStream.persist(); - } - - RecoverableWriter.CommitRecoverable closeForCommit() throws IOException { - return currentPartStream.closeForCommit().getRecoverable(); - } - - void dispose() { - // we can suppress exceptions here, because we do not rely on close() to - // flush or persist any data - IOUtils.closeQuietly(currentPartStream); - } - - @Override - public String getBucketId() { - return bucketId; - } - - @Override - public long getCreationTime() { - return creationTime; - } - - @Override - public long getSize() throws IOException { - return currentPartStream.getPos(); - } - - @Override - public long getLastUpdateTime() { - return lastUpdateTime; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java index 9c3d047..5e72ea0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java @@ -29,13 +29,13 @@ import java.io.IOException; * should roll the part file or not. */ @PublicEvolving -public interface PartFileInfo { +public interface PartFileInfo<BucketID> { /** * @return The bucket identifier of the current buffer, as returned by the * {@link Bucketer#getBucketId(Object, Bucketer.Context)}. */ - String getBucketId(); + BucketID getBucketId(); /** * @return The creation time (in ms) of the currently open part file. http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java new file mode 100644 index 0000000..662454b --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * An abstract writer for the currently open part file in a specific {@link Bucket}. + * + * <p>Currently, there are two subclasses, of this class: + * <ol> + * <li>One for row-wise formats: the {@link RowWisePartWriter}.</li> + * <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li> + * </ol> + * + * <p>This also implements the {@link PartFileInfo}. + */ +@Internal +abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> { + + private final BucketID bucketId; + + private final long creationTime; + + protected final RecoverableFsDataOutputStream currentPartStream; + + private long lastUpdateTime; + + protected PartFileWriter( + final BucketID bucketId, + final RecoverableFsDataOutputStream currentPartStream, + final long creationTime) { + + Preconditions.checkArgument(creationTime >= 0L); + this.bucketId = Preconditions.checkNotNull(bucketId); + this.currentPartStream = Preconditions.checkNotNull(currentPartStream); + this.creationTime = creationTime; + this.lastUpdateTime = creationTime; + } + + abstract void write(IN element, long currentTime) throws IOException; + + RecoverableWriter.ResumeRecoverable persist() throws IOException { + return currentPartStream.persist(); + } + + RecoverableWriter.CommitRecoverable closeForCommit() throws IOException { + return currentPartStream.closeForCommit().getRecoverable(); + } + + void dispose() { + // we can suppress exceptions here, because we do not rely on close() to + // flush or persist any data + IOUtils.closeQuietly(currentPartStream); + } + + void markWrite(long now) { + this.lastUpdateTime = now; + } + + @Override + public BucketID getBucketId() { + return bucketId; + } + + @Override + public long getCreationTime() { + return creationTime; + } + + @Override + public long getSize() throws IOException { + return currentPartStream.getPos(); + } + + @Override + public long getLastUpdateTime() { + return lastUpdateTime; + } + + // ------------------------------------------------------------------------ + + /** + * An interface for factories that create the different {@link PartFileWriter writers}. + */ + interface PartFileFactory<IN, BucketID> { + + /** + * Used upon recovery from a failure to recover a {@link PartFileWriter writer}. + * @param bucketId the id of the bucket this writer is writing to. + * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem. + * @param resumable the state of the stream we are resurrecting. + * @param creationTime the creation time of the stream. + * @return the recovered {@link PartFileWriter writer}. + * @throws IOException + */ + PartFileWriter<IN, BucketID> resumeFrom( + final BucketID bucketId, + final RecoverableWriter fileSystemWriter, + final RecoverableWriter.ResumeRecoverable resumable, + final long creationTime) throws IOException; + + /** + * Used to create a new {@link PartFileWriter writer}. + * @param bucketId the id of the bucket this writer is writing to. + * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem. + * @param path the part this writer will write to. + * @param creationTime the creation time of the stream. + * @return the new {@link PartFileWriter writer}. + * @throws IOException + */ + PartFileWriter<IN, BucketID> openNew( + final BucketID bucketId, + final RecoverableWriter fileSystemWriter, + final Path path, + final long creationTime) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java index 936377e..24c38aa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java @@ -28,13 +28,27 @@ import java.io.Serializable; * rolls its currently open part file and opens a new one. */ @PublicEvolving -public interface RollingPolicy extends Serializable { +public interface RollingPolicy<BucketID> extends Serializable { /** - * Determines if the in-progress part file for a bucket should roll. + * Determines if the in-progress part file for a bucket should roll on every checkpoint. + * @param partFileState the state of the currently open part file of the bucket. + * @return {@code True} if the part file should roll, {@link false} otherwise. + */ + boolean shouldRollOnCheckpoint(final PartFileInfo<BucketID> partFileState) throws IOException; + + /** + * Determines if the in-progress part file for a bucket should roll based on its current state, e.g. its size. + * @param partFileState the state of the currently open part file of the bucket. + * @return {@code True} if the part file should roll, {@link false} otherwise. + */ + boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState) throws IOException; + + /** + * Determines if the in-progress part file for a bucket should roll based on a time condition. * @param partFileState the state of the currently open part file of the bucket. * @param currentTime the current processing time. * @return {@code True} if the part file should roll, {@link false} otherwise. */ - boolean shouldRoll(final PartFileInfo partFileState, final long currentTime) throws IOException; + boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java new file mode 100644 index 0000000..0b00b43 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A {@link PartFileWriter} for row-wise formats that use an {@link Encoder}. + * This also implements the {@link PartFileInfo}. + */ +@Internal +final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> { + + private final Encoder<IN> encoder; + + private RowWisePartWriter( + final BucketID bucketId, + final RecoverableFsDataOutputStream currentPartStream, + final Encoder<IN> encoder, + final long creationTime) { + super(bucketId, currentPartStream, creationTime); + this.encoder = Preconditions.checkNotNull(encoder); + } + + @Override + void write(IN element, long currentTime) throws IOException { + encoder.encode(element, currentPartStream); + markWrite(currentTime); + } + + /** + * A factory that creates {@link RowWisePartWriter RowWisePartWriters}. + * @param <IN> The type of input elements. + * @param <BucketID> The type of ids for the buckets, as returned by the {@link Bucketer}. + */ + static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> { + + private final Encoder<IN> encoder; + + Factory(Encoder<IN> encoder) { + this.encoder = encoder; + } + + @Override + public PartFileWriter<IN, BucketID> resumeFrom( + final BucketID bucketId, + final RecoverableWriter fileSystemWriter, + final RecoverableWriter.ResumeRecoverable resumable, + final long creationTime) throws IOException { + + Preconditions.checkNotNull(fileSystemWriter); + Preconditions.checkNotNull(resumable); + + final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable); + return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime); + } + + @Override + public PartFileWriter<IN, BucketID> openNew( + final BucketID bucketId, + final RecoverableWriter fileSystemWriter, + final Path path, + final long creationTime) throws IOException { + + Preconditions.checkNotNull(fileSystemWriter); + Preconditions.checkNotNull(path); + + final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path); + return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime); + } + } +}
