This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7dc8e69274a924776629537e7cb62e5d191c3e9f
Author: Kostas Kloudas <[email protected]>
AuthorDate: Wed Nov 21 10:43:55 2018 +0100

    [hotfix][fs-connector] Refactor Bucket to statically import Preconditions.
---
 .../api/functions/sink/filesystem/Bucket.java      | 24 ++++++++++++----------
 1 file changed, 13 insertions(+), 11 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 042bcda..8ba35b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +37,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A bucket is the directory organization of the output of the {@link 
StreamingFileSink}.
  *
@@ -84,13 +86,13 @@ public class Bucket<IN, BucketID> {
                        final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileFactory,
                        final RollingPolicy<IN, BucketID> rollingPolicy) {
 
-               this.fsWriter = Preconditions.checkNotNull(fsWriter);
+               this.fsWriter = checkNotNull(fsWriter);
                this.subtaskIndex = subtaskIndex;
-               this.bucketId = Preconditions.checkNotNull(bucketId);
-               this.bucketPath = Preconditions.checkNotNull(bucketPath);
+               this.bucketId = checkNotNull(bucketId);
+               this.bucketPath = checkNotNull(bucketPath);
                this.partCounter = initialPartCounter;
-               this.partFileFactory = 
Preconditions.checkNotNull(partFileFactory);
-               this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+               this.partFileFactory = checkNotNull(partFileFactory);
+               this.rollingPolicy = checkNotNull(rollingPolicy);
 
                this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
                this.pendingPartsPerCheckpoint = new HashMap<>();
@@ -158,8 +160,8 @@ public class Bucket<IN, BucketID> {
        }
 
        void merge(final Bucket<IN, BucketID> bucket) throws IOException {
-               Preconditions.checkNotNull(bucket);
-               Preconditions.checkState(Objects.equals(bucket.bucketPath, 
bucketPath));
+               checkNotNull(bucket);
+               checkState(Objects.equals(bucket.bucketPath, bucketPath));
 
                // There should be no pending files in the "to-merge" states.
                // The reason is that:
@@ -167,8 +169,8 @@ public class Bucket<IN, BucketID> {
                //    So a snapshot, including the one we are recovering from, 
will never contain such files.
                // 2) the files in pendingPartsPerCheckpoint are committed upon 
recovery (see commitRecoveredPendingFiles()).
 
-               
Preconditions.checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
-               
Preconditions.checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
+               checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
+               checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
 
                RecoverableWriter.CommitRecoverable committable = 
bucket.closePartFile();
                if (committable != null) {
@@ -257,7 +259,7 @@ public class Bucket<IN, BucketID> {
        }
 
        void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws 
IOException {
-               Preconditions.checkNotNull(fsWriter);
+               checkNotNull(fsWriter);
 
                Iterator<Map.Entry<Long, 
List<RecoverableWriter.CommitRecoverable>>> it =
                                pendingPartsPerCheckpoint.entrySet().iterator();

Reply via email to