asfgit closed pull request #6477: [FLINK-10027] Add logging to StreamingFileSink
URL: https://github.com/apache/flink/pull/6477
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index a350096e38b..6187e6853dd 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -18,11 +18,14 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -34,13 +37,14 @@
/**
* A bucket is the directory organization of the output of the {@link
StreamingFileSink}.
*
- * <p>For each incoming element in the {@code BucketingSink}, the
user-specified
- * {@link Bucketer Bucketer} is
- * queried to see in which bucket this element should be written to.
+ * <p>For each incoming element in the {@code StreamingFileSink}, the
user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element
should be written to.
*/
-@PublicEvolving
+@Internal
public class Bucket<IN, BucketID> {
+ private static final Logger LOG = LoggerFactory.getLogger(Bucket.class);
+
private static final String PART_PREFIX = "part";
private final BucketID bucketId;
@@ -53,57 +57,27 @@
private final RecoverableWriter fsWriter;
- private final Map<Long, List<RecoverableWriter.CommitRecoverable>>
pendingPerCheckpoint = new HashMap<>();
-
- private long partCounter;
-
- private PartFileWriter<IN, BucketID> currentPart;
+ private final RollingPolicy<IN, BucketID> rollingPolicy;
- private List<RecoverableWriter.CommitRecoverable> pending;
-
- /**
- * Constructor to restore a bucket from checkpointed state.
- */
- public Bucket(
- RecoverableWriter fsWriter,
- int subtaskIndex,
- long initialPartCounter,
- PartFileWriter.PartFileFactory<IN, BucketID>
partFileFactory,
- BucketState<BucketID> bucketState) throws IOException {
+ private final Map<Long, List<RecoverableWriter.CommitRecoverable>>
pendingPartsPerCheckpoint = new HashMap<>();
- this(fsWriter, subtaskIndex, bucketState.getBucketId(),
bucketState.getBucketPath(), initialPartCounter, partFileFactory);
-
- // the constructor must have already initialized the filesystem
writer
- Preconditions.checkState(fsWriter != null);
-
- // we try to resume the previous in-progress file, if the
filesystem
- // supports such operation. If not, we just commit the file and
start fresh.
+ private long partCounter;
- final RecoverableWriter.ResumeRecoverable resumable =
bucketState.getInProgress();
- if (resumable != null) {
- currentPart = partFileFactory.resumeFrom(
- bucketId, fsWriter, resumable,
bucketState.getCreationTime());
- }
+ private PartFileWriter<IN, BucketID> inProgressPart;
- // we commit pending files for previous checkpoints to the last
successful one
- // (from which we are recovering from)
- for (List<RecoverableWriter.CommitRecoverable> commitables:
bucketState.getPendingPerCheckpoint().values()) {
- for (RecoverableWriter.CommitRecoverable commitable:
commitables) {
-
fsWriter.recoverForCommit(commitable).commitAfterRecovery();
- }
- }
- }
+ private List<RecoverableWriter.CommitRecoverable>
pendingPartsForCurrentCheckpoint;
/**
* Constructor to create a new empty bucket.
*/
- public Bucket(
- RecoverableWriter fsWriter,
- int subtaskIndex,
- BucketID bucketId,
- Path bucketPath,
- long initialPartCounter,
- PartFileWriter.PartFileFactory<IN, BucketID>
partFileFactory) {
+ private Bucket(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final BucketID bucketId,
+ final Path bucketPath,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory<IN, BucketID>
partFileFactory,
+ final RollingPolicy<IN, BucketID> rollingPolicy) {
this.fsWriter = Preconditions.checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
@@ -111,117 +85,241 @@ public Bucket(
this.bucketPath = Preconditions.checkNotNull(bucketPath);
this.partCounter = initialPartCounter;
this.partFileFactory =
Preconditions.checkNotNull(partFileFactory);
+ this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
- this.pending = new ArrayList<>();
+ this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
}
/**
- * Gets the information available for the currently
- * open part file, i.e. the one we are currently writing to.
- *
- * <p>This will be null if there is no currently open part file. This
- * is the case when we have a new, just created bucket or a bucket
- * that has not received any data after the closing of its previously
- * open in-progress file due to the specified rolling policy.
- *
- * @return The information about the currently in-progress part file
- * or {@code null} if there is no open part file.
+ * Constructor to restore a bucket from checkpointed state.
*/
- public PartFileInfo<BucketID> getInProgressPartInfo() {
- return currentPart;
+ private Bucket(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory<IN, BucketID>
partFileFactory,
+ final RollingPolicy<IN, BucketID> rollingPolicy,
+ final BucketState<BucketID> bucketState) throws
IOException {
+
+ this(
+ fsWriter,
+ subtaskIndex,
+ bucketState.getBucketId(),
+ bucketState.getBucketPath(),
+ initialPartCounter,
+ partFileFactory,
+ rollingPolicy);
+
+ restoreInProgressFile(bucketState);
+ commitRecoveredPendingFiles(bucketState);
}
- public BucketID getBucketId() {
+ private void restoreInProgressFile(final BucketState<BucketID> state)
throws IOException {
+
+ // we try to resume the previous in-progress file
+ if (state.hasInProgressResumableFile()) {
+ final RecoverableWriter.ResumeRecoverable resumable =
state.getInProgressResumableFile();
+ inProgressPart = partFileFactory.resumeFrom(
+ bucketId, fsWriter, resumable,
state.getInProgressFileCreationTime());
+ }
+ }
+
+ private void commitRecoveredPendingFiles(final BucketState<BucketID>
state) throws IOException {
+
+ // we commit pending files for checkpoints that precess the
last successful one, from which we are recovering
+ for (List<RecoverableWriter.CommitRecoverable> committables:
state.getCommittableFilesPerCheckpoint().values()) {
+ for (RecoverableWriter.CommitRecoverable committable:
committables) {
+
fsWriter.recoverForCommit(committable).commitAfterRecovery();
+ }
+ }
+ }
+
+ BucketID getBucketId() {
return bucketId;
}
- public Path getBucketPath() {
+ Path getBucketPath() {
return bucketPath;
}
- public long getPartCounter() {
+ long getPartCounter() {
return partCounter;
}
- public boolean isActive() {
- return currentPart != null || !pending.isEmpty() ||
!pendingPerCheckpoint.isEmpty();
+ boolean isActive() {
+ return inProgressPart != null ||
!pendingPartsForCurrentCheckpoint.isEmpty() ||
!pendingPartsPerCheckpoint.isEmpty();
+ }
+
+ void merge(final Bucket<IN, BucketID> bucket) throws IOException {
+ Preconditions.checkNotNull(bucket);
+ Preconditions.checkState(Objects.equals(bucket.bucketPath,
bucketPath));
+
+ // There should be no pending files in the "to-merge" states.
+ // The reason is that:
+ // 1) the pendingPartsForCurrentCheckpoint is emptied whenever
we take a snapshot (see prepareBucketForCheckpointing()).
+ // So a snapshot, including the one we are recovering from,
will never contain such files.
+ // 2) the files in pendingPartsPerCheckpoint are committed upon
recovery (see commitRecoveredPendingFiles()).
+
+
Preconditions.checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
+
Preconditions.checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
+
+ RecoverableWriter.CommitRecoverable committable =
bucket.closePartFile();
+ if (committable != null) {
+ pendingPartsForCurrentCheckpoint.add(committable);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} merging buckets for bucket
id={}", subtaskIndex, bucketId);
+ }
}
void write(IN element, long currentTime) throws IOException {
- Preconditions.checkState(currentPart != null, "bucket has been
closed");
- currentPart.write(element, currentTime);
+ if (inProgressPart == null ||
rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} closing in-progress part
file for bucket id={} due to element {}.",
+ subtaskIndex, bucketId,
element);
+ }
+
+ rollPartFile(currentTime);
+ }
+ inProgressPart.write(element, currentTime);
}
- void rollPartFile(final long currentTime) throws IOException {
+ private void rollPartFile(final long currentTime) throws IOException {
closePartFile();
- currentPart = partFileFactory.openNew(bucketId, fsWriter,
getNewPartPath(), currentTime);
+
+ final Path partFilePath = assembleNewPartPath();
+ inProgressPart = partFileFactory.openNew(bucketId, fsWriter,
partFilePath, currentTime);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} opening new part file \"{}\" for
bucket id={}.",
+ subtaskIndex, partFilePath.getName(),
bucketId);
+ }
+
partCounter++;
}
- void merge(final Bucket<IN, BucketID> bucket) throws IOException {
- Preconditions.checkNotNull(bucket);
- Preconditions.checkState(Objects.equals(bucket.getBucketPath(),
bucketPath));
+ private Path assembleNewPartPath() {
+ return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex +
'-' + partCounter);
+ }
- // there should be no pending files in the "to-merge" states.
- Preconditions.checkState(bucket.pending.isEmpty());
- Preconditions.checkState(bucket.pendingPerCheckpoint.isEmpty());
+ private RecoverableWriter.CommitRecoverable closePartFile() throws
IOException {
+ RecoverableWriter.CommitRecoverable committable = null;
+ if (inProgressPart != null) {
+ committable = inProgressPart.closeForCommit();
+ pendingPartsForCurrentCheckpoint.add(committable);
+ inProgressPart = null;
+ }
+ return committable;
+ }
- RecoverableWriter.CommitRecoverable commitable =
bucket.closePartFile();
- if (commitable != null) {
- pending.add(commitable);
+ void disposePartFile() {
+ if (inProgressPart != null) {
+ inProgressPart.dispose();
}
}
- RecoverableWriter.CommitRecoverable closePartFile() throws IOException {
- RecoverableWriter.CommitRecoverable commitable = null;
- if (currentPart != null) {
- commitable = currentPart.closeForCommit();
- pending.add(commitable);
- currentPart = null;
+ BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws
IOException {
+ prepareBucketForCheckpointing(checkpointId);
+
+ RecoverableWriter.ResumeRecoverable inProgressResumable = null;
+ long inProgressFileCreationTime = Long.MAX_VALUE;
+
+ if (inProgressPart != null) {
+ inProgressResumable = inProgressPart.persist();
+ inProgressFileCreationTime =
inProgressPart.getCreationTime();
}
- return commitable;
+
+ return new BucketState<>(bucketId, bucketPath,
inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
}
- public void dispose() {
- if (currentPart != null) {
- currentPart.dispose();
+ private void prepareBucketForCheckpointing(long checkpointId) throws
IOException {
+ if (inProgressPart != null &&
rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} closing in-progress part
file for bucket id={} on checkpoint.", subtaskIndex, bucketId);
+ }
+ closePartFile();
+ }
+
+ if (!pendingPartsForCurrentCheckpoint.isEmpty()) {
+ pendingPartsPerCheckpoint.put(checkpointId,
pendingPartsForCurrentCheckpoint);
+ pendingPartsForCurrentCheckpoint = new ArrayList<>();
}
}
- public void onCheckpointAcknowledgment(long checkpointId) throws
IOException {
+ void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws
IOException {
Preconditions.checkNotNull(fsWriter);
Iterator<Map.Entry<Long,
List<RecoverableWriter.CommitRecoverable>>> it =
- pendingPerCheckpoint.entrySet().iterator();
+ pendingPartsPerCheckpoint.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Long,
List<RecoverableWriter.CommitRecoverable>> entry = it.next();
+
if (entry.getKey() <= checkpointId) {
- for (RecoverableWriter.CommitRecoverable
commitable : entry.getValue()) {
-
fsWriter.recoverForCommit(commitable).commit();
+ for (RecoverableWriter.CommitRecoverable
committable : entry.getValue()) {
+
fsWriter.recoverForCommit(committable).commit();
}
it.remove();
}
}
}
- public BucketState<BucketID> onCheckpoint(long checkpointId) throws
IOException {
- RecoverableWriter.ResumeRecoverable resumable = null;
- long creationTime = Long.MAX_VALUE;
-
- if (currentPart != null) {
- resumable = currentPart.persist();
- creationTime = currentPart.getCreationTime();
+ void onProcessingTime(long timestamp) throws IOException {
+ if (inProgressPart != null &&
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} closing in-progress part
file for bucket id={} due to processing time rolling policy " +
+ "(in-progress file created @
{}, last updated @ {} and current time is {}).",
+ subtaskIndex, bucketId,
inProgressPart.getCreationTime(), inProgressPart.getLastUpdateTime(),
timestamp);
+ }
+ closePartFile();
}
+ }
- if (!pending.isEmpty()) {
- pendingPerCheckpoint.put(checkpointId, pending);
- pending = new ArrayList<>();
- }
- return new BucketState<>(bucketId, bucketPath, creationTime,
resumable, pendingPerCheckpoint);
+ // --------------------------- Static Factory Methods
-----------------------------
+
+ /**
+ * Creates a new empty {@code Bucket}.
+ * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
+ * @param subtaskIndex the index of the subtask creating the bucket.
+ * @param bucketId the identifier of the bucket, as returned by the
{@link BucketAssigner}.
+ * @param bucketPath the path to where the part files for the bucket
will be written to.
+ * @param initialPartCounter the initial counter for the part files of
the bucket.
+ * @param partFileFactory the {@link PartFileWriter.PartFileFactory}
the factory creating part file writers.
+ * @param <IN> the type of input elements to the sink.
+ * @param <BucketID> the type of the identifier of the bucket, as
returned by the {@link BucketAssigner}
+ * @return The new Bucket.
+ */
+ static <IN, BucketID> Bucket<IN, BucketID> getNew(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final BucketID bucketId,
+ final Path bucketPath,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory<IN, BucketID>
partFileFactory,
+ final RollingPolicy<IN, BucketID> rollingPolicy) {
+ return new Bucket<>(fsWriter, subtaskIndex, bucketId,
bucketPath, initialPartCounter, partFileFactory, rollingPolicy);
}
- private Path getNewPartPath() {
- return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex +
'-' + partCounter);
+ /**
+ * Restores a {@code Bucket} from the state included in the provided
{@link BucketState}.
+ * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
+ * @param subtaskIndex the index of the subtask creating the bucket.
+ * @param initialPartCounter the initial counter for the part files of
the bucket.
+ * @param partFileFactory the {@link PartFileWriter.PartFileFactory}
the factory creating part file writers.
+ * @param bucketState the initial state of the restored bucket.
+ * @param <IN> the type of input elements to the sink.
+ * @param <BucketID> the type of the identifier of the bucket, as
returned by the {@link BucketAssigner}
+ * @return The restored Bucket.
+ */
+ static <IN, BucketID> Bucket<IN, BucketID> restore(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory<IN, BucketID>
partFileFactory,
+ final RollingPolicy<IN, BucketID> rollingPolicy,
+ final BucketState<BucketID> bucketState) throws
IOException {
+ return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter,
partFileFactory, rollingPolicy, bucketState);
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java
similarity index 69%
rename from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java
rename to
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java
index a7052cb219b..a9b0200b110 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java
@@ -28,49 +28,45 @@
import java.io.Serializable;
/**
- * A bucketer is used with a {@link
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}
- * to determine the {@link
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket} each incoming
element
+ * A BucketAssigner is used with a {@link StreamingFileSink} to determine the
{@link Bucket} each incoming element
* should be put into.
*
* <p>The {@code StreamingFileSink} can be writing to many buckets at a time,
and it is responsible for managing
- * a set of active buckets. Whenever a new element arrives it will ask the
{@code Bucketer} for the bucket the
- * element should fall in. The {@code Bucketer} can, for example, determine
buckets based on system time.
+ * a set of active buckets. Whenever a new element arrives it will ask the
{@code BucketAssigner} for the bucket the
+ * element should fall in. The {@code BucketAssigner} can, for example,
determine buckets based on system time.
*
* @param <IN> The type of input elements.
- * @param <BucketID> The type of the object returned by the {@link
#getBucketId(Object, Bucketer.Context)}. This has to have
+ * @param <BucketID> The type of the object returned by the {@link
#getBucketId(Object, BucketAssigner.Context)}. This has to have
* a correct {@link #hashCode()} and {@link #equals(Object)}
method. In addition, the {@link Path}
* to the created bucket will be the result of the {@link
#toString()} of this method, appended to
- * the {@code basePath} specified in the
- * {@link
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
StreamingFileSink}
+ * the {@code basePath} specified in the {@link
StreamingFileSink StreamingFileSink}.
*/
@PublicEvolving
-public interface Bucketer<IN, BucketID> extends Serializable {
+public interface BucketAssigner<IN, BucketID> extends Serializable {
/**
* Returns the identifier of the bucket the provided element should be
put into.
* @param element The current element being processed.
- * @param context The {@link SinkFunction.Context context} used by the
- * {@link
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
sink}.
+ * @param context The {@link SinkFunction.Context context} used by the
{@link StreamingFileSink sink}.
*
* @return A string representing the identifier of the bucket the
element should be put into.
- * This actual path to the bucket will result from the concatenation of
the returned string
- * and the {@code base path} provided during the initialization of the
- * {@link
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
sink}.
+ * The actual path to the bucket will result from the concatenation of
the returned string
+ * and the {@code base path} provided during the initialization of the
{@link StreamingFileSink sink}.
*/
- BucketID getBucketId(IN element, Bucketer.Context context);
+ BucketID getBucketId(IN element, BucketAssigner.Context context);
/**
* @return A {@link SimpleVersionedSerializer} capable of
serializing/deserializing the elements
* of type {@code BucketID}. That is the type of the objects returned
by the
- * {@link #getBucketId(Object, Bucketer.Context)}.
+ * {@link #getBucketId(Object, BucketAssigner.Context)}.
*/
SimpleVersionedSerializer<BucketID> getSerializer();
/**
- * Context that the {@link Bucketer} can use for getting additional
data about
+ * Context that the {@link BucketAssigner} can use for getting
additional data about
* an input record.
*
- * <p>The context is only valid for the duration of a {@link
Bucketer#getBucketId(Object, Bucketer.Context)} call.
+ * <p>The context is only valid for the duration of a {@link
BucketAssigner#getBucketId(Object, BucketAssigner.Context)} call.
* Do not store the context and use afterwards!
*/
@PublicEvolving
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index 0c6b587b421..2306fafff6a 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -37,12 +37,14 @@
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory<IN, BucketID>
partFileWriterFactory) throws IOException;
+ final PartFileWriter.PartFileFactory<IN, BucketID>
partFileWriterFactory,
+ final RollingPolicy<IN, BucketID> rollingPolicy) throws
IOException;
Bucket<IN, BucketID> restoreBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID>
partFileWriterFactory,
+ final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState) throws
IOException;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
index bb49e3a5d25..18293815068 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -32,64 +32,90 @@
* The state of the {@link Bucket} that is to be checkpointed.
*/
@Internal
-public class BucketState<BucketID> {
+class BucketState<BucketID> {
private final BucketID bucketId;
- /**
- * The base path for the bucket, i.e. the directory where all the part
files are stored.
- */
+ /** The directory where all the part files of the bucket are stored. */
private final Path bucketPath;
/**
- * The creation time of the currently open part file, or {@code
Long.MAX_VALUE} if there is no open part file.
+ * The creation time of the currently open part file,
+ * or {@code Long.MAX_VALUE} if there is no open part file.
*/
- private final long creationTime;
+ private final long inProgressFileCreationTime;
/**
- * A {@link RecoverableWriter.ResumeRecoverable} for the currently open
part file, or null
- * if there is no currently open part file.
+ * A {@link RecoverableWriter.ResumeRecoverable} for the currently open
+ * part file, or null if there is no currently open part file.
*/
@Nullable
- private final RecoverableWriter.ResumeRecoverable inProgress;
+ private final RecoverableWriter.ResumeRecoverable
inProgressResumableFile;
/**
- * The {@link RecoverableWriter.CommitRecoverable files} pending to be
committed, organized by checkpoint id.
+ * The {@link RecoverableWriter.CommitRecoverable files} pending to be
+ * committed, organized by checkpoint id.
*/
- private final Map<Long, List<RecoverableWriter.CommitRecoverable>>
pendingPerCheckpoint;
+ private final Map<Long, List<RecoverableWriter.CommitRecoverable>>
committableFilesPerCheckpoint;
- public BucketState(
+ BucketState(
final BucketID bucketId,
final Path bucketPath,
- final long creationTime,
- @Nullable final RecoverableWriter.ResumeRecoverable
inProgress,
- final Map<Long,
List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint
+ final long inProgressFileCreationTime,
+ @Nullable final RecoverableWriter.ResumeRecoverable
inProgressResumableFile,
+ final Map<Long,
List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint
) {
this.bucketId = Preconditions.checkNotNull(bucketId);
this.bucketPath = Preconditions.checkNotNull(bucketPath);
- this.creationTime = creationTime;
- this.inProgress = inProgress;
- this.pendingPerCheckpoint =
Preconditions.checkNotNull(pendingPerCheckpoint);
+ this.inProgressFileCreationTime = inProgressFileCreationTime;
+ this.inProgressResumableFile = inProgressResumableFile;
+ this.committableFilesPerCheckpoint =
Preconditions.checkNotNull(pendingCommittablesPerCheckpoint);
}
- public BucketID getBucketId() {
+ BucketID getBucketId() {
return bucketId;
}
- public Path getBucketPath() {
+ Path getBucketPath() {
return bucketPath;
}
- public long getCreationTime() {
- return creationTime;
+ long getInProgressFileCreationTime() {
+ return inProgressFileCreationTime;
+ }
+
+ boolean hasInProgressResumableFile() {
+ return inProgressResumableFile != null;
}
@Nullable
- public RecoverableWriter.ResumeRecoverable getInProgress() {
- return inProgress;
+ RecoverableWriter.ResumeRecoverable getInProgressResumableFile() {
+ return inProgressResumableFile;
+ }
+
+ Map<Long, List<RecoverableWriter.CommitRecoverable>>
getCommittableFilesPerCheckpoint() {
+ return committableFilesPerCheckpoint;
}
- public Map<Long, List<RecoverableWriter.CommitRecoverable>>
getPendingPerCheckpoint() {
- return pendingPerCheckpoint;
+ @Override
+ public String toString() {
+ final StringBuilder strBuilder = new StringBuilder();
+
+ strBuilder
+ .append("BucketState for
bucketId=").append(bucketId)
+ .append(" and bucketPath=").append(bucketPath);
+
+ if (hasInProgressResumableFile()) {
+ strBuilder.append(", has open part file created @
").append(inProgressFileCreationTime);
+ }
+
+ if (!committableFilesPerCheckpoint.isEmpty()) {
+ strBuilder.append(", has pending files for checkpoints:
{");
+ for (long checkpointId:
committableFilesPerCheckpoint.keySet()) {
+ strBuilder.append(checkpointId).append(' ');
+ }
+ strBuilder.append('}');
+ }
+ return strBuilder.toString();
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
index cf9b8057bd1..04de2462d67 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
@@ -90,20 +90,20 @@ public int getVersion() {
void serializeV1(BucketState<BucketID> state, DataOutputView out)
throws IOException {
SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer,
state.getBucketId(), out);
out.writeUTF(state.getBucketPath().toString());
- out.writeLong(state.getCreationTime());
+ out.writeLong(state.getInProgressFileCreationTime());
// put the current open part file
- final RecoverableWriter.ResumeRecoverable currentPart =
state.getInProgress();
- if (currentPart != null) {
+ if (state.hasInProgressResumableFile()) {
+ final RecoverableWriter.ResumeRecoverable resumable =
state.getInProgressResumableFile();
out.writeBoolean(true);
-
SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer,
currentPart, out);
+
SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer,
resumable, out);
}
else {
out.writeBoolean(false);
}
// put the map of pending files per checkpoint
- final Map<Long, List<RecoverableWriter.CommitRecoverable>>
pendingCommitters = state.getPendingPerCheckpoint();
+ final Map<Long, List<RecoverableWriter.CommitRecoverable>>
pendingCommitters = state.getCommittableFilesPerCheckpoint();
// manually keep the version here to safe some bytes
out.writeInt(commitableSerializer.getVersion());
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index e62c425fc2f..2aca841f16d 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -44,8 +45,9 @@
* this class to the lifecycle of the operator.
*
* @param <IN> The type of input elements.
- * @param <BucketID> The type of ids for the buckets, as returned by the
{@link Bucketer}.
+ * @param <BucketID> The type of ids for the buckets, as returned by the
{@link BucketAssigner}.
*/
+@Internal
public class Buckets<IN, BucketID> {
private static final Logger LOG =
LoggerFactory.getLogger(Buckets.class);
@@ -56,7 +58,7 @@
private final BucketFactory<IN, BucketID> bucketFactory;
- private final Bucketer<IN, BucketID> bucketer;
+ private final BucketAssigner<IN, BucketID> bucketAssigner;
private final PartFileWriter.PartFileFactory<IN, BucketID>
partFileWriterFactory;
@@ -70,33 +72,33 @@
private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets;
- private long maxPartCounterUsed;
+ private long maxPartCounter;
- private final RecoverableWriter fileSystemWriter;
+ private final RecoverableWriter fsWriter;
// --------------------------- State Related Fields
-----------------------------
private final BucketStateSerializer<BucketID> bucketStateSerializer;
/**
- * A private constructor creating a new empty bucket manager.
+ * A constructor creating a new empty bucket manager.
*
* @param basePath The base path for our buckets.
- * @param bucketer The {@link Bucketer} provided by the user.
+ * @param bucketAssigner The {@link BucketAssigner} provided by the
user.
* @param bucketFactory The {@link BucketFactory} to be used to create
buckets.
* @param partFileWriterFactory The {@link
PartFileWriter.PartFileFactory} to be used when writing data.
* @param rollingPolicy The {@link RollingPolicy} as specified by the
user.
*/
Buckets(
final Path basePath,
- final Bucketer<IN, BucketID> bucketer,
+ final BucketAssigner<IN, BucketID> bucketAssigner,
final BucketFactory<IN, BucketID> bucketFactory,
final PartFileWriter.PartFileFactory<IN, BucketID>
partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final int subtaskIndex) throws IOException {
this.basePath = Preconditions.checkNotNull(basePath);
- this.bucketer = Preconditions.checkNotNull(bucketer);
+ this.bucketAssigner =
Preconditions.checkNotNull(bucketAssigner);
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
this.partFileWriterFactory =
Preconditions.checkNotNull(partFileWriterFactory);
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
@@ -105,75 +107,102 @@
this.activeBuckets = new HashMap<>();
this.bucketerContext = new Buckets.BucketerContext();
- this.fileSystemWriter =
FileSystem.get(basePath.toUri()).createRecoverableWriter();
+ try {
+ this.fsWriter =
FileSystem.get(basePath.toUri()).createRecoverableWriter();
+ } catch (IOException e) {
+ LOG.error("Unable to create filesystem for path: {}",
basePath);
+ throw e;
+ }
+
this.bucketStateSerializer = new BucketStateSerializer<>(
-
fileSystemWriter.getResumeRecoverableSerializer(),
-
fileSystemWriter.getCommitRecoverableSerializer(),
- bucketer.getSerializer()
+ fsWriter.getResumeRecoverableSerializer(),
+ fsWriter.getCommitRecoverableSerializer(),
+ bucketAssigner.getSerializer()
);
- this.maxPartCounterUsed = 0L;
+ this.maxPartCounter = 0L;
}
/**
* Initializes the state after recovery from a failure.
+ *
+ * <p>During this process:
+ * <ol>
+ * <li>we set the initial value for part counter to the maximum
value used before across all tasks and buckets.
+ * This guarantees that we do not overwrite valid data,</li>
+ * <li>we commit any pending files for previous checkpoints
(previous to the last successful one from which we restore),</li>
+ * <li>we resume writing to the previous in-progress file of each
bucket, and</li>
+ * <li>if we receive multiple states for the same bucket, we merge
them.</li>
+ * </ol>
* @param bucketStates the state holding recovered state about active
buckets.
* @param partCounterState the state holding the max previously used
part counters.
- * @throws Exception
+ * @throws Exception if anything goes wrong during retrieving the state
or restoring/committing of any
+ * in-progress/pending part files
*/
void initializeState(final ListState<byte[]> bucketStates, final
ListState<Long> partCounterState) throws Exception {
- // When resuming after a failure:
- // 1) we get the max part counter used before in order to make
sure that we do not overwrite valid data
- // 2) we commit any pending files for previous checkpoints
(previous to the last successful one)
- // 3) we resume writing to the previous in-progress file of
each bucket, and
- // 4) if we receive multiple states for the same bucket, we
merge them.
+ initializePartCounter(partCounterState);
+
+ LOG.info("Subtask {} initializing its state (max part
counter={}).", subtaskIndex, maxPartCounter);
+
+ initializeActiveBuckets(bucketStates);
+ }
- // get the max counter
+ private void initializePartCounter(final ListState<Long>
partCounterState) throws Exception {
long maxCounter = 0L;
for (long partCounter: partCounterState.get()) {
maxCounter = Math.max(partCounter, maxCounter);
}
- maxPartCounterUsed = maxCounter;
+ maxPartCounter = maxCounter;
+ }
- // get the restored buckets
- for (byte[] recoveredState : bucketStates.get()) {
- final BucketState<BucketID> bucketState =
SimpleVersionedSerialization.readVersionAndDeSerialize(
- bucketStateSerializer, recoveredState);
+ private void initializeActiveBuckets(final ListState<byte[]>
bucketStates) throws Exception {
+ for (byte[] serializedRecoveredState : bucketStates.get()) {
+ final BucketState<BucketID> recoveredState =
+
SimpleVersionedSerialization.readVersionAndDeSerialize(
+ bucketStateSerializer,
serializedRecoveredState);
+ handleRestoredBucketState(recoveredState);
+ }
+ }
- final BucketID bucketId = bucketState.getBucketId();
+ private void handleRestoredBucketState(final BucketState<BucketID>
recoveredState) throws Exception {
+ final BucketID bucketId = recoveredState.getBucketId();
- LOG.info("Recovered bucket for {}", bucketId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} restoring: {}", subtaskIndex,
recoveredState);
+ }
- final Bucket<IN, BucketID> restoredBucket =
bucketFactory.restoreBucket(
- fileSystemWriter,
- subtaskIndex,
- maxPartCounterUsed,
- partFileWriterFactory,
- bucketState
- );
-
- final Bucket<IN, BucketID> existingBucket =
activeBuckets.get(bucketId);
- if (existingBucket == null) {
- activeBuckets.put(bucketId, restoredBucket);
- } else {
- existingBucket.merge(restoredBucket);
- }
+ final Bucket<IN, BucketID> restoredBucket = bucketFactory
+ .restoreBucket(
+ fsWriter,
+ subtaskIndex,
+ maxPartCounter,
+ partFileWriterFactory,
+ rollingPolicy,
+ recoveredState
+ );
+
+ updateActiveBucketId(bucketId, restoredBucket);
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} idx {} restored state for bucket
{}", getClass().getSimpleName(),
- subtaskIndex,
assembleBucketPath(bucketId));
- }
+ private void updateActiveBucketId(final BucketID bucketId, final
Bucket<IN, BucketID> restoredBucket) throws IOException {
+ final Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+ if (bucket != null) {
+ bucket.merge(restoredBucket);
+ } else {
+ activeBuckets.put(bucketId, restoredBucket);
}
}
- void publishUpToCheckpoint(long checkpointId) throws IOException {
+ void commitUpToCheckpoint(final long checkpointId) throws IOException {
final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>>
activeBucketIt =
activeBuckets.entrySet().iterator();
+ LOG.info("Subtask {} received completion notification for
checkpoint with id={}.", subtaskIndex, checkpointId);
+
while (activeBucketIt.hasNext()) {
- Bucket<IN, BucketID> bucket =
activeBucketIt.next().getValue();
- bucket.onCheckpointAcknowledgment(checkpointId);
+ final Bucket<IN, BucketID> bucket =
activeBucketIt.next().getValue();
+ bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
if (!bucket.isActive()) {
// We've dealt with all the pending files and
the writer for this bucket is not currently open.
@@ -185,35 +214,39 @@ void publishUpToCheckpoint(long checkpointId) throws
IOException {
void snapshotState(
final long checkpointId,
- final ListState<byte[]> bucketStates,
- final ListState<Long> partCounterState) throws
Exception {
+ final ListState<byte[]> bucketStatesContainer,
+ final ListState<Long> partCounterStateContainer) throws
Exception {
Preconditions.checkState(
- fileSystemWriter != null &&
bucketStateSerializer != null,
- "sink has not been initialized"
- );
+ fsWriter != null && bucketStateSerializer !=
null,
+ "sink has not been initialized");
+
+ LOG.info("Subtask {} checkpointing for checkpoint with id={}
(max part counter={}).",
+ subtaskIndex, checkpointId, maxPartCounter);
+
+ snapshotActiveBuckets(checkpointId, bucketStatesContainer);
+ partCounterStateContainer.add(maxPartCounter);
+ }
+
+ private void snapshotActiveBuckets(
+ final long checkpointId,
+ final ListState<byte[]> bucketStatesContainer) throws
Exception {
for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
- final PartFileInfo<BucketID> info =
bucket.getInProgressPartInfo();
+ final BucketState<BucketID> bucketState =
bucket.onReceptionOfCheckpoint(checkpointId);
- if (info != null &&
rollingPolicy.shouldRollOnCheckpoint(info)) {
- bucket.closePartFile();
- }
+ final byte[] serializedBucketState =
SimpleVersionedSerialization
+
.writeVersionAndSerialize(bucketStateSerializer, bucketState);
- final BucketState<BucketID> bucketState =
bucket.onCheckpoint(checkpointId);
-
bucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer,
bucketState));
- }
+ bucketStatesContainer.add(serializedBucketState);
- partCounterState.add(maxPartCounterUsed);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} checkpointing: {}",
subtaskIndex, bucketState);
+ }
+ }
}
- /**
- * Called on every incoming element to write it to its final location.
- * @param value the element itself.
- * @param context the {@link SinkFunction.Context context} available to
the sink function.
- * @throws Exception
- */
- void onElement(IN value, SinkFunction.Context context) throws Exception
{
+ void onElement(final IN value, final SinkFunction.Context context)
throws Exception {
final long currentProcessingTime =
context.currentProcessingTime();
// setting the values in the bucketer context
@@ -222,79 +255,56 @@ void onElement(IN value, SinkFunction.Context context)
throws Exception {
context.currentWatermark(),
currentProcessingTime);
- final BucketID bucketId = bucketer.getBucketId(value,
bucketerContext);
+ final BucketID bucketId = bucketAssigner.getBucketId(value,
bucketerContext);
+ final Bucket<IN, BucketID> bucket =
getOrCreateBucketForBucketId(bucketId);
+ bucket.write(value, currentProcessingTime);
+
+ // we update the global max counter here because as buckets
become inactive and
+ // get removed from the list of active buckets, at the time
when we want to create
+ // another part file for the bucket, if we start from 0 we may
overwrite previous parts.
+
+ this.maxPartCounter = Math.max(maxPartCounter,
bucket.getPartCounter());
+ }
+ private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final
BucketID bucketId) throws IOException {
Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
if (bucket == null) {
final Path bucketPath = assembleBucketPath(bucketId);
bucket = bucketFactory.getNewBucket(
- fileSystemWriter,
+ fsWriter,
subtaskIndex,
bucketId,
bucketPath,
- maxPartCounterUsed,
- partFileWriterFactory);
+ maxPartCounter,
+ partFileWriterFactory,
+ rollingPolicy);
activeBuckets.put(bucketId, bucket);
}
-
- final PartFileInfo<BucketID> info =
bucket.getInProgressPartInfo();
- if (info == null || rollingPolicy.shouldRollOnEvent(info,
value)) {
-
- // info will be null if there is no currently open part
file. This
- // is the case when we have a new, just created bucket
or a bucket
- // that has not received any data after the closing of
its previously
- // open in-progress file due to the specified rolling
policy.
-
- bucket.rollPartFile(currentProcessingTime);
- }
- bucket.write(value, currentProcessingTime);
-
- // we update the counter here because as buckets become
inactive and
- // get removed in the initializeState(), at the time we
snapshot they
- // may not be there to take them into account during
checkpointing.
- updateMaxPartCounter(bucket.getPartCounter());
+ return bucket;
}
void onProcessingTime(long timestamp) throws Exception {
for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
- final PartFileInfo<BucketID> info =
bucket.getInProgressPartInfo();
- if (info != null &&
rollingPolicy.shouldRollOnProcessingTime(info, timestamp)) {
- bucket.closePartFile();
- }
+ bucket.onProcessingTime(timestamp);
}
}
void close() {
if (activeBuckets != null) {
- activeBuckets.values().forEach(Bucket::dispose);
+ activeBuckets.values().forEach(Bucket::disposePartFile);
}
}
- /**
- * Assembles the final bucket {@link Path} that will be used for the
provided bucket in the
- * underlying filesystem.
- * @param bucketId the id of the bucket as returned by the {@link
Bucketer}.
- * @return The resulting path.
- */
private Path assembleBucketPath(BucketID bucketId) {
return new Path(basePath, bucketId.toString());
}
/**
- * Updates the state keeping track of the maximum used part
- * counter across all local active buckets.
- * @param candidate the part counter that will potentially replace the
current {@link #maxPartCounterUsed}.
- */
- private void updateMaxPartCounter(long candidate) {
- maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
- }
-
- /**
- * The {@link Bucketer.Context} exposed to the
- * {@link Bucketer#getBucketId(Object, Bucketer.Context)}
+ * The {@link BucketAssigner.Context} exposed to the
+ * {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)}
* whenever a new incoming element arrives.
*/
- private static final class BucketerContext implements Bucketer.Context {
+ private static final class BucketerContext implements
BucketAssigner.Context {
@Nullable
private Long elementTimestamp;
@@ -309,8 +319,8 @@ private BucketerContext() {
this.currentProcessingTime = Long.MIN_VALUE;
}
- void update(@Nullable Long element, long watermark, long
processingTime) {
- this.elementTimestamp = element;
+ void update(@Nullable Long elementTimestamp, long watermark,
long processingTime) {
+ this.elementTimestamp = elementTimestamp;
this.currentWatermark = watermark;
this.currentProcessingTime = processingTime;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index 7b8c8fe5c04..005ae4e737f 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -66,7 +66,7 @@ void write(IN element, long currentTime) throws IOException {
/**
* A factory that creates {@link BulkPartWriter BulkPartWriters}.
* @param <IN> The type of input elements.
- * @param <BucketID> The type of ids for the buckets, as returned by
the {@link Bucketer}.
+ * @param <BucketID> The type of ids for the buckets, as returned by
the {@link BucketAssigner}.
*/
static class Factory<IN, BucketID> implements
PartFileWriter.PartFileFactory<IN, BucketID> {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
similarity index 85%
rename from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
rename to
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
index 532138f1ba2..1d2edbcaeb5 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
@@ -28,7 +28,7 @@
* A factory returning {@link Bucket buckets}.
*/
@Internal
-class DefaultBucketFactory<IN, BucketID> implements BucketFactory<IN,
BucketID> {
+class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN,
BucketID> {
private static final long serialVersionUID = 1L;
@@ -39,15 +39,17 @@
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory<IN, BucketID>
partFileWriterFactory) {
+ final PartFileWriter.PartFileFactory<IN, BucketID>
partFileWriterFactory,
+ final RollingPolicy<IN, BucketID> rollingPolicy) {
- return new Bucket<>(
+ return Bucket.getNew(
fsWriter,
subtaskIndex,
bucketId,
bucketPath,
initialPartCounter,
- partFileWriterFactory);
+ partFileWriterFactory,
+ rollingPolicy);
}
@Override
@@ -56,13 +58,15 @@
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID>
partFileWriterFactory,
+ final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState) throws
IOException {
- return new Bucket<>(
+ return Bucket.restore(
fsWriter,
subtaskIndex,
initialPartCounter,
partFileWriterFactory,
+ rollingPolicy,
bucketState);
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
index dbd62a27d8c..7750acd7e58 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
@@ -32,7 +32,7 @@
/**
* @return The bucket identifier of the current buffer, as returned by
the
- * {@link Bucketer#getBucketId(Object, Bucketer.Context)}.
+ * {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)}.
*/
BucketID getBucketId();
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index 269b12c12f5..2478b79a527 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -54,7 +54,7 @@ void write(IN element, long currentTime) throws IOException {
/**
* A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
* @param <IN> The type of input elements.
- * @param <BucketID> The type of ids for the buckets, as returned by
the {@link Bucketer}.
+ * @param <BucketID> The type of ids for the buckets, as returned by
the {@link BucketAssigner}.
*/
static class Factory<IN, BucketID> implements
PartFileWriter.PartFileFactory<IN, BucketID> {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 7635e372478..6c7e135caf8 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -36,7 +36,7 @@
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -58,11 +58,11 @@
* These part files contain the actual output data.
*
*
- * <p>The sink uses a {@link Bucketer} to determine in which bucket directory
each element should
- * be written to inside the base directory. The {@code Bucketer} can, for
example, use time or
- * a property of the element to determine the bucket directory. The default
{@code Bucketer} is a
- * {@link DateTimeBucketer} which will create one new bucket every hour. You
can specify
- * a custom {@code Bucketer} using the {@code setBucketer(Bucketer)} method,
after calling
+ * <p>The sink uses a {@link BucketAssigner} to determine in which bucket
directory each element should
+ * be written to inside the base directory. The {@code BucketAssigner} can,
for example, use time or
+ * a property of the element to determine the bucket directory. The default
{@code BucketAssigner} is a
+ * {@link DateTimeBucketAssigner} which will create one new bucket every hour.
You can specify
+ * a custom {@code BucketAssigner} using the {@code
setBucketAssigner(bucketAssigner)} method, after calling
* {@link StreamingFileSink#forRowFormat(Path, Encoder)} or
* {@link StreamingFileSink#forBulkFormat(Path, BulkWriter.Factory)}.
*
@@ -151,7 +151,7 @@ private StreamingFileSink(
*/
public static <IN> StreamingFileSink.RowFormatBuilder<IN, String>
forRowFormat(
final Path basePath, final Encoder<IN> encoder) {
- return new StreamingFileSink.RowFormatBuilder<>(basePath,
encoder, new DateTimeBucketer<>());
+ return new StreamingFileSink.RowFormatBuilder<>(basePath,
encoder, new DateTimeBucketAssigner<>());
}
/**
@@ -164,7 +164,7 @@ private StreamingFileSink(
*/
public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String>
forBulkFormat(
final Path basePath, final BulkWriter.Factory<IN>
writerFactory) {
- return new StreamingFileSink.BulkFormatBuilder<>(basePath,
writerFactory, new DateTimeBucketer<>());
+ return new StreamingFileSink.BulkFormatBuilder<>(basePath,
writerFactory, new DateTimeBucketAssigner<>());
}
/**
@@ -191,50 +191,45 @@ private StreamingFileSink(
private final Encoder<IN> encoder;
- private final Bucketer<IN, BucketID> bucketer;
+ private final BucketAssigner<IN, BucketID> bucketAssigner;
private final RollingPolicy<IN, BucketID> rollingPolicy;
private final BucketFactory<IN, BucketID> bucketFactory;
- RowFormatBuilder(Path basePath, Encoder<IN> encoder,
Bucketer<IN, BucketID> bucketer) {
- this(basePath, encoder, bucketer,
DefaultRollingPolicy.create().build(), 60L * 1000L, new
DefaultBucketFactory<>());
+ RowFormatBuilder(Path basePath, Encoder<IN> encoder,
BucketAssigner<IN, BucketID> bucketAssigner) {
+ this(basePath, encoder, bucketAssigner,
DefaultRollingPolicy.create().build(), 60L * 1000L, new
DefaultBucketFactoryImpl<>());
}
private RowFormatBuilder(
Path basePath,
Encoder<IN> encoder,
- Bucketer<IN, BucketID> bucketer,
- RollingPolicy<IN, BucketID> rollingPolicy,
+ BucketAssigner<IN, BucketID> assigner,
+ RollingPolicy<IN, BucketID> policy,
long bucketCheckInterval,
BucketFactory<IN, BucketID> bucketFactory) {
this.basePath = Preconditions.checkNotNull(basePath);
this.encoder = Preconditions.checkNotNull(encoder);
- this.bucketer = Preconditions.checkNotNull(bucketer);
- this.rollingPolicy =
Preconditions.checkNotNull(rollingPolicy);
+ this.bucketAssigner =
Preconditions.checkNotNull(assigner);
+ this.rollingPolicy = Preconditions.checkNotNull(policy);
this.bucketCheckInterval = bucketCheckInterval;
this.bucketFactory =
Preconditions.checkNotNull(bucketFactory);
}
public StreamingFileSink.RowFormatBuilder<IN, BucketID>
withBucketCheckInterval(final long interval) {
- return new RowFormatBuilder<>(basePath, encoder,
bucketer, rollingPolicy, interval, bucketFactory);
+ return new RowFormatBuilder<>(basePath, encoder,
bucketAssigner, rollingPolicy, interval, bucketFactory);
}
- public StreamingFileSink.RowFormatBuilder<IN, BucketID>
withBucketer(final Bucketer<IN, BucketID> bucketer) {
- return new RowFormatBuilder<>(basePath, encoder,
Preconditions.checkNotNull(bucketer), rollingPolicy, bucketCheckInterval,
bucketFactory);
+ public StreamingFileSink.RowFormatBuilder<IN, BucketID>
withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+ return new RowFormatBuilder<>(basePath, encoder,
Preconditions.checkNotNull(assigner), rollingPolicy, bucketCheckInterval,
bucketFactory);
}
public StreamingFileSink.RowFormatBuilder<IN, BucketID>
withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
- return new RowFormatBuilder<>(basePath, encoder,
bucketer, Preconditions.checkNotNull(policy), bucketCheckInterval,
bucketFactory);
+ return new RowFormatBuilder<>(basePath, encoder,
bucketAssigner, Preconditions.checkNotNull(policy), bucketCheckInterval,
bucketFactory);
}
- public <ID> StreamingFileSink.RowFormatBuilder<IN, ID>
withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<IN,
ID> policy) {
- return new RowFormatBuilder<>(basePath, encoder,
Preconditions.checkNotNull(bucketer), Preconditions.checkNotNull(policy),
bucketCheckInterval, new DefaultBucketFactory<>());
- }
-
- @VisibleForTesting
- StreamingFileSink.RowFormatBuilder<IN, BucketID>
withBucketFactory(final BucketFactory<IN, BucketID> factory) {
- return new RowFormatBuilder<>(basePath, encoder,
bucketer, rollingPolicy, bucketCheckInterval,
Preconditions.checkNotNull(factory));
+ public <ID> StreamingFileSink.RowFormatBuilder<IN, ID>
withBucketAssignerAndPolicy(final BucketAssigner<IN, ID> assigner, final
RollingPolicy<IN, ID> policy) {
+ return new RowFormatBuilder<>(basePath, encoder,
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy),
bucketCheckInterval, new DefaultBucketFactoryImpl<>());
}
/** Creates the actual sink. */
@@ -246,12 +241,17 @@ private RowFormatBuilder(
Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws
IOException {
return new Buckets<>(
basePath,
- bucketer,
+ bucketAssigner,
bucketFactory,
new
RowWisePartWriter.Factory<>(encoder),
rollingPolicy,
subtaskIndex);
}
+
+ @VisibleForTesting
+ StreamingFileSink.RowFormatBuilder<IN, BucketID>
withBucketFactory(final BucketFactory<IN, BucketID> factory) {
+ return new RowFormatBuilder<>(basePath, encoder,
bucketAssigner, rollingPolicy, bucketCheckInterval,
Preconditions.checkNotNull(factory));
+ }
}
/**
@@ -268,38 +268,38 @@ private RowFormatBuilder(
private final BulkWriter.Factory<IN> writerFactory;
- private final Bucketer<IN, BucketID> bucketer;
+ private final BucketAssigner<IN, BucketID> bucketAssigner;
private final BucketFactory<IN, BucketID> bucketFactory;
- BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN>
writerFactory, Bucketer<IN, BucketID> bucketer) {
- this(basePath, writerFactory, bucketer, 60L * 1000L,
new DefaultBucketFactory<>());
+ BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN>
writerFactory, BucketAssigner<IN, BucketID> assigner) {
+ this(basePath, writerFactory, assigner, 60L * 1000L,
new DefaultBucketFactoryImpl<>());
}
private BulkFormatBuilder(
Path basePath,
BulkWriter.Factory<IN> writerFactory,
- Bucketer<IN, BucketID> bucketer,
+ BucketAssigner<IN, BucketID> assigner,
long bucketCheckInterval,
BucketFactory<IN, BucketID> bucketFactory) {
this.basePath = Preconditions.checkNotNull(basePath);
this.writerFactory = writerFactory;
- this.bucketer = Preconditions.checkNotNull(bucketer);
+ this.bucketAssigner =
Preconditions.checkNotNull(assigner);
this.bucketCheckInterval = bucketCheckInterval;
this.bucketFactory =
Preconditions.checkNotNull(bucketFactory);
}
public StreamingFileSink.BulkFormatBuilder<IN, BucketID>
withBucketCheckInterval(long interval) {
- return new BulkFormatBuilder<>(basePath, writerFactory,
bucketer, interval, bucketFactory);
+ return new BulkFormatBuilder<>(basePath, writerFactory,
bucketAssigner, interval, bucketFactory);
}
- public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID>
withBucketer(Bucketer<IN, ID> bucketer) {
- return new BulkFormatBuilder<>(basePath, writerFactory,
Preconditions.checkNotNull(bucketer), bucketCheckInterval, new
DefaultBucketFactory<>());
+ public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID>
withBucketAssigner(BucketAssigner<IN, ID> assigner) {
+ return new BulkFormatBuilder<>(basePath, writerFactory,
Preconditions.checkNotNull(assigner), bucketCheckInterval, new
DefaultBucketFactoryImpl<>());
}
@VisibleForTesting
StreamingFileSink.BulkFormatBuilder<IN, BucketID>
withBucketFactory(final BucketFactory<IN, BucketID> factory) {
- return new BulkFormatBuilder<>(basePath, writerFactory,
bucketer, bucketCheckInterval, Preconditions.checkNotNull(factory));
+ return new BulkFormatBuilder<>(basePath, writerFactory,
bucketAssigner, bucketCheckInterval, Preconditions.checkNotNull(factory));
}
/** Creates the actual sink. */
@@ -311,10 +311,10 @@ private BulkFormatBuilder(
Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws
IOException {
return new Buckets<>(
basePath,
- bucketer,
+ bucketAssigner,
bucketFactory,
new
BulkPartWriter.Factory<>(writerFactory),
- new OnCheckpointRollingPolicy<>(),
+ OnCheckpointRollingPolicy.build(),
subtaskIndex);
}
}
@@ -337,7 +337,7 @@ public void initializeState(FunctionInitializationContext
context) throws Except
@Override
public void notifyCheckpointComplete(long checkpointId) throws
Exception {
- buckets.publishUpToCheckpoint(checkpointId);
+ buckets.commitUpToCheckpoint(checkpointId);
}
@Override
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java
similarity index 84%
rename from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
rename to
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java
index c35ba8031c2..3633004cc5d 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java
@@ -16,23 +16,23 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+package
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
/**
- * A {@link Bucketer} that does not perform any
+ * A {@link BucketAssigner} that does not perform any
* bucketing of files. All files are written to the base path.
*/
@PublicEvolving
-public class BasePathBucketer<T> implements Bucketer<T, String> {
+public class BasePathBucketAssigner<T> implements BucketAssigner<T, String> {
private static final long serialVersionUID = -6033643155550226022L;
@Override
- public String getBucketId(T element, Bucketer.Context context) {
+ public String getBucketId(T element, BucketAssigner.Context context) {
return "";
}
@@ -44,6 +44,6 @@ public String getBucketId(T element, Bucketer.Context
context) {
@Override
public String toString() {
- return "BasePathBucketer";
+ return "BasePathBucketAssigner";
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
similarity index 87%
rename from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
rename to
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
index d226d200f2f..5d30f39fa2e 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
@@ -16,17 +16,17 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+package
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
- * A {@link Bucketer} that assigns to buckets based on current system time.
+ * A {@link BucketAssigner} that assigns to buckets based on current system
time.
*
*
* <p>The {@code DateTimeBucketer} will create directories of the following
form:
@@ -52,7 +52,7 @@
*
*/
@PublicEvolving
-public class DateTimeBucketer<IN> implements Bucketer<IN, String> {
+public class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {
private static final long serialVersionUID = 1L;
@@ -65,7 +65,7 @@
/**
* Creates a new {@code DateTimeBucketer} with format string {@code
"yyyy-MM-dd--HH"}.
*/
- public DateTimeBucketer() {
+ public DateTimeBucketAssigner() {
this(DEFAULT_FORMAT_STRING);
}
@@ -75,12 +75,12 @@ public DateTimeBucketer() {
* @param formatString The format string that will be given to {@code
SimpleDateFormat} to determine
* the bucket path.
*/
- public DateTimeBucketer(String formatString) {
+ public DateTimeBucketAssigner(String formatString) {
this.formatString = formatString;
}
@Override
- public String getBucketId(IN element, Bucketer.Context context) {
+ public String getBucketId(IN element, BucketAssigner.Context context) {
if (dateFormatter == null) {
dateFormatter = new SimpleDateFormat(formatString);
}
@@ -94,8 +94,6 @@ public String getBucketId(IN element, Bucketer.Context
context) {
@Override
public String toString() {
- return "DateTimeBucketer{" +
- "formatString='" + formatString + '\'' +
- '}';
+ return "DateTimeBucketAssigner{formatString='" + formatString +
'\'' + '}';
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java
similarity index 96%
rename from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
rename to
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java
index d025af97650..4726aff967c 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+package
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
@@ -29,6 +30,7 @@
/**
* A {@link SimpleVersionedSerializer} implementation for Strings.
*/
+@PublicEvolving
public final class SimpleVersionedStringSerializer implements
SimpleVersionedSerializer<String> {
private static final Charset CHARSET = StandardCharsets.UTF_8;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
index b4b1cebf88c..7c75f1c5e25 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
@@ -87,7 +87,10 @@ public boolean shouldRollOnProcessingTime(final
PartFileInfo<BucketID> partFileS
* To finalize it and have the actual policy, call {@code .create()}.
*/
public static DefaultRollingPolicy.PolicyBuilder create() {
- return new DefaultRollingPolicy.PolicyBuilder();
+ return new DefaultRollingPolicy.PolicyBuilder(
+ DEFAULT_MAX_PART_SIZE,
+ DEFAULT_ROLLOVER_INTERVAL,
+ DEFAULT_INACTIVITY_INTERVAL);
}
/**
@@ -96,42 +99,46 @@ public boolean shouldRollOnProcessingTime(final
PartFileInfo<BucketID> partFileS
@PublicEvolving
public static final class PolicyBuilder {
- private long partSize = DEFAULT_MAX_PART_SIZE;
+ private final long partSize;
- private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
+ private final long rolloverInterval;
- private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
+ private final long inactivityInterval;
- private PolicyBuilder() {}
+ private PolicyBuilder(
+ final long partSize,
+ final long rolloverInterval,
+ final long inactivityInterval) {
+ this.partSize = partSize;
+ this.rolloverInterval = rolloverInterval;
+ this.inactivityInterval = inactivityInterval;
+ }
/**
* Sets the part size above which a part file will have to roll.
* @param size the allowed part size.
*/
- public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(long
size) {
+ public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(final
long size) {
Preconditions.checkState(size > 0L);
- this.partSize = size;
- return this;
+ return new PolicyBuilder(size, rolloverInterval,
inactivityInterval);
}
/**
* Sets the interval of allowed inactivity after which a part
file will have to roll.
* @param interval the allowed inactivity interval.
*/
- public DefaultRollingPolicy.PolicyBuilder
withInactivityInterval(long interval) {
+ public DefaultRollingPolicy.PolicyBuilder
withInactivityInterval(final long interval) {
Preconditions.checkState(interval > 0L);
- this.inactivityInterval = interval;
- return this;
+ return new PolicyBuilder(partSize, rolloverInterval,
interval);
}
/**
* Sets the max time a part file can stay open before having to
roll.
* @param interval the desired rollover interval.
*/
- public DefaultRollingPolicy.PolicyBuilder
withRolloverInterval(long interval) {
+ public DefaultRollingPolicy.PolicyBuilder
withRolloverInterval(final long interval) {
Preconditions.checkState(interval > 0L);
- this.rolloverInterval = interval;
- return this;
+ return new PolicyBuilder(partSize, interval,
inactivityInterval);
}
/**
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
index 9ad8172e9de..53fce082c3b 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
@@ -18,16 +18,20 @@
package
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
/**
* A {@link RollingPolicy} which rolls on every checkpoint.
*/
+@PublicEvolving
public class OnCheckpointRollingPolicy<IN, BucketID> implements
RollingPolicy<IN, BucketID> {
private static final long serialVersionUID = 1L;
+ private OnCheckpointRollingPolicy() {}
+
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID>
partFileState) {
return true;
@@ -42,4 +46,8 @@ public boolean shouldRollOnEvent(PartFileInfo<BucketID>
partFileState, IN elemen
public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID>
partFileState, long currentTime) {
return false;
}
+
+ public static <IN, BucketID> OnCheckpointRollingPolicy<IN, BucketID>
build() {
+ return new OnCheckpointRollingPolicy<>();
+ }
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index 3d5be6340ff..55360a4a353 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -25,7 +25,7 @@
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -75,8 +75,8 @@ public void testSerializationEmpty() throws IOException {
final BucketState<String> recoveredState =
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(testBucket, recoveredState.getBucketPath());
- Assert.assertNull(recoveredState.getInProgress());
-
Assert.assertTrue(recoveredState.getPendingPerCheckpoint().isEmpty());
+ Assert.assertNull(recoveredState.getInProgressResumableFile());
+
Assert.assertTrue(recoveredState.getCommittableFilesPerCheckpoint().isEmpty());
}
@Test
@@ -166,7 +166,7 @@ public void testSerializationFull() throws IOException {
Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
- final Map<Long, List<RecoverableWriter.CommitRecoverable>>
recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
+ final Map<Long, List<RecoverableWriter.CommitRecoverable>>
recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
Assert.assertEquals(5L, recoveredRecoverables.size());
// recover and commit
@@ -238,9 +238,9 @@ public void testSerializationNullInProgress() throws
IOException {
final BucketState<String> recoveredState =
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
- Assert.assertNull(recoveredState.getInProgress());
+ Assert.assertNull(recoveredState.getInProgressResumableFile());
- final Map<Long, List<RecoverableWriter.CommitRecoverable>>
recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
+ final Map<Long, List<RecoverableWriter.CommitRecoverable>>
recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
Assert.assertEquals(5L, recoveredRecoverables.size());
// recover and commit
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index 5e8eb6d9ba7..25622d14466 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -22,7 +22,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -58,7 +58,7 @@ private void testCorrectPassingOfContext(Long timestamp, long
watermark, long pr
final Buckets<String, String> buckets = StreamingFileSink
.<String>forRowFormat(new Path(outDir.toURI()),
new SimpleStringEncoder<>())
- .withBucketer(new
VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime))
+ .withBucketAssigner(new
VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime))
.createBuckets(2);
buckets.onElement("TEST", new SinkFunction.Context() {
@@ -79,7 +79,7 @@ public Long timestamp() {
});
}
- private static class VarifyingBucketer implements Bucketer<String,
String> {
+ private static class VarifyingBucketer implements
BucketAssigner<String, String> {
private static final long serialVersionUID =
7729086510972377578L;
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
index 7b6b82cd4eb..20890c07a93 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
@@ -59,7 +59,7 @@ public void testCustomBulkWriter() throws Exception {
10L,
new
TestUtils.TupleToStringBucketer(),
new
TestBulkWriterFactory(),
- new
DefaultBucketFactory<>())
+ new
DefaultBucketFactoryImpl<>())
) {
testHarness.setup();
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index a0c438e1847..ab487d168bc 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -491,8 +491,8 @@ public void testMaxCounterUponRecovery() throws Exception {
OperatorSubtaskState mergedSnapshot;
- final TestBucketFactory first = new TestBucketFactory();
- final TestBucketFactory second = new TestBucketFactory();
+ final TestBucketFactoryImpl first = new TestBucketFactoryImpl();
+ final TestBucketFactoryImpl second = new
TestBucketFactoryImpl();
final RollingPolicy<Tuple2<String, Integer>, String>
rollingPolicy = DefaultRollingPolicy
.create()
@@ -526,8 +526,8 @@ public void testMaxCounterUponRecovery() throws Exception {
);
}
- final TestBucketFactory firstRecovered = new
TestBucketFactory();
- final TestBucketFactory secondRecovered = new
TestBucketFactory();
+ final TestBucketFactoryImpl firstRecovered = new
TestBucketFactoryImpl();
+ final TestBucketFactoryImpl secondRecovered = new
TestBucketFactoryImpl();
try (
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1
= TestUtils.createCustomRescalingTestSink(
@@ -559,7 +559,7 @@ public void testMaxCounterUponRecovery() throws Exception {
////////////////////// Helper Methods
//////////////////////
- static class TestBucketFactory extends
DefaultBucketFactory<Tuple2<String, Integer>, String> {
+ static class TestBucketFactoryImpl extends
DefaultBucketFactoryImpl<Tuple2<String, Integer>, String> {
private static final long serialVersionUID =
2794824980604027930L;
@@ -572,7 +572,8 @@ public void testMaxCounterUponRecovery() throws Exception {
final String bucketId,
final Path bucketPath,
final long initialPartCounter,
- final
PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String>
partFileWriterFactory) {
+ final
PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String>
partFileWriterFactory,
+ final RollingPolicy<Tuple2<String, Integer>,
String> rollingPolicy) {
this.initialCounter = initialPartCounter;
@@ -582,7 +583,8 @@ public void testMaxCounterUponRecovery() throws Exception {
bucketId,
bucketPath,
initialPartCounter,
- partFileWriterFactory);
+ partFileWriterFactory,
+ rollingPolicy);
}
@Override
@@ -591,6 +593,7 @@ public void testMaxCounterUponRecovery() throws Exception {
final int subtaskIndex,
final long initialPartCounter,
final
PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String>
partFileWriterFactory,
+ final RollingPolicy<Tuple2<String, Integer>,
String> rollingPolicy,
final BucketState<String> bucketState) throws
IOException {
this.initialCounter = initialPartCounter;
@@ -600,6 +603,7 @@ public void testMaxCounterUponRecovery() throws Exception {
subtaskIndex,
initialPartCounter,
partFileWriterFactory,
+ rollingPolicy,
bucketState);
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index f16a9085d9d..851b6825d9a 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -67,7 +67,7 @@ public void testDefaultRollingPolicy() throws Exception {
new
TestUtils.TupleToStringBucketer(),
new SimpleStringEncoder<>(),
rollingPolicy,
- new DefaultBucketFactory<>())
+ new
DefaultBucketFactoryImpl<>())
) {
testHarness.setup();
testHarness.open();
@@ -111,7 +111,7 @@ public void testDefaultRollingPolicy() throws Exception {
public void testRollOnCheckpointPolicy() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
- final RollingPolicy<Tuple2<String, Integer>, String>
rollingPolicy = new OnCheckpointRollingPolicy<>();
+ final RollingPolicy<Tuple2<String, Integer>, String>
rollingPolicy = OnCheckpointRollingPolicy.build();
try (
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness
= TestUtils.createCustomRescalingTestSink(
@@ -122,7 +122,7 @@ public void testRollOnCheckpointPolicy() throws Exception {
new
TestUtils.TupleToStringBucketer(),
new SimpleStringEncoder<>(),
rollingPolicy,
- new DefaultBucketFactory<>())
+ new
DefaultBucketFactoryImpl<>())
) {
testHarness.setup();
testHarness.open();
@@ -246,7 +246,7 @@ public boolean
shouldRollOnProcessingTime(PartFileInfo<String> partFileState, lo
new
TestUtils.TupleToStringBucketer(),
new SimpleStringEncoder<>(),
rollingPolicy,
- new DefaultBucketFactory<>())
+ new
DefaultBucketFactoryImpl<>())
) {
testHarness.setup();
testHarness.open();
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index 8d9392b3c37..bfbc12043ee 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -23,7 +23,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -58,7 +58,7 @@
.withInactivityInterval(inactivityInterval)
.build();
- final Bucketer<Tuple2<String, Integer>, String> bucketer = new
TupleToStringBucketer();
+ final BucketAssigner<Tuple2<String, Integer>, String> bucketer
= new TupleToStringBucketer();
final Encoder<Tuple2<String, Integer>> encoder = (element,
stream) -> {
stream.write((element.f0 + '@' +
element.f1).getBytes(StandardCharsets.UTF_8));
@@ -73,7 +73,7 @@
bucketer,
encoder,
rollingPolicy,
- new DefaultBucketFactory<>());
+ new DefaultBucketFactoryImpl<>());
}
static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Object> createCustomRescalingTestSink(
@@ -81,14 +81,14 @@
final int totalParallelism,
final int taskIdx,
final long bucketCheckInterval,
- final Bucketer<Tuple2<String, Integer>, String>
bucketer,
+ final BucketAssigner<Tuple2<String, Integer>, String>
bucketer,
final Encoder<Tuple2<String, Integer>> writer,
final RollingPolicy<Tuple2<String, Integer>, String>
rollingPolicy,
final BucketFactory<Tuple2<String, Integer>, String>
bucketFactory) throws Exception {
StreamingFileSink<Tuple2<String, Integer>> sink =
StreamingFileSink
.forRowFormat(new Path(outDir.toURI()), writer)
- .withBucketer(bucketer)
+ .withBucketAssigner(bucketer)
.withRollingPolicy(rollingPolicy)
.withBucketCheckInterval(bucketCheckInterval)
.withBucketFactory(bucketFactory)
@@ -102,13 +102,13 @@
final int totalParallelism,
final int taskIdx,
final long bucketCheckInterval,
- final Bucketer<Tuple2<String, Integer>, String>
bucketer,
+ final BucketAssigner<Tuple2<String, Integer>, String>
bucketer,
final BulkWriter.Factory<Tuple2<String, Integer>>
writer,
final BucketFactory<Tuple2<String, Integer>, String>
bucketFactory) throws Exception {
StreamingFileSink<Tuple2<String, Integer>> sink =
StreamingFileSink
.forBulkFormat(new Path(outDir.toURI()), writer)
- .withBucketer(bucketer)
+ .withBucketAssigner(bucketer)
.withBucketCheckInterval(bucketCheckInterval)
.withBucketFactory(bucketFactory)
.build();
@@ -146,7 +146,7 @@ static void checkLocalFs(File outDir, int
expectedInProgress, int expectedComple
return contents;
}
- static class TupleToStringBucketer implements Bucketer<Tuple2<String,
Integer>, String> {
+ static class TupleToStringBucketer implements
BucketAssigner<Tuple2<String, Integer>, String> {
private static final long serialVersionUID = 1L;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services