This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7dc8e69274a924776629537e7cb62e5d191c3e9f Author: Kostas Kloudas <[email protected]> AuthorDate: Wed Nov 21 10:43:55 2018 +0100 [hotfix][fs-connector] Refactor Bucket to statically import Preconditions. --- .../api/functions/sink/filesystem/Bucket.java | 24 ++++++++++++---------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java index 042bcda..8ba35b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java @@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +37,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + /** * A bucket is the directory organization of the output of the {@link StreamingFileSink}. * @@ -84,13 +86,13 @@ public class Bucket<IN, BucketID> { final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory, final RollingPolicy<IN, BucketID> rollingPolicy) { - this.fsWriter = Preconditions.checkNotNull(fsWriter); + this.fsWriter = checkNotNull(fsWriter); this.subtaskIndex = subtaskIndex; - this.bucketId = Preconditions.checkNotNull(bucketId); - this.bucketPath = Preconditions.checkNotNull(bucketPath); + this.bucketId = checkNotNull(bucketId); + this.bucketPath = checkNotNull(bucketPath); this.partCounter = initialPartCounter; - this.partFileFactory = Preconditions.checkNotNull(partFileFactory); - this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy); + this.partFileFactory = checkNotNull(partFileFactory); + this.rollingPolicy = checkNotNull(rollingPolicy); this.pendingPartsForCurrentCheckpoint = new ArrayList<>(); this.pendingPartsPerCheckpoint = new HashMap<>(); @@ -158,8 +160,8 @@ public class Bucket<IN, BucketID> { } void merge(final Bucket<IN, BucketID> bucket) throws IOException { - Preconditions.checkNotNull(bucket); - Preconditions.checkState(Objects.equals(bucket.bucketPath, bucketPath)); + checkNotNull(bucket); + checkState(Objects.equals(bucket.bucketPath, bucketPath)); // There should be no pending files in the "to-merge" states. // The reason is that: @@ -167,8 +169,8 @@ public class Bucket<IN, BucketID> { // So a snapshot, including the one we are recovering from, will never contain such files. // 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()). - Preconditions.checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty()); - Preconditions.checkState(bucket.pendingPartsPerCheckpoint.isEmpty()); + checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty()); + checkState(bucket.pendingPartsPerCheckpoint.isEmpty()); RecoverableWriter.CommitRecoverable committable = bucket.closePartFile(); if (committable != null) { @@ -257,7 +259,7 @@ public class Bucket<IN, BucketID> { } void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException { - Preconditions.checkNotNull(fsWriter); + checkNotNull(fsWriter); Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it = pendingPartsPerCheckpoint.entrySet().iterator();
