zentol commented on a change in pull request #6477: [FLINK-10027] Add logging 
to StreamingFileSink
URL: https://github.com/apache/flink/pull/6477#discussion_r207219546
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
 ##########
 @@ -222,79 +224,56 @@ void onElement(IN value, SinkFunction.Context context) 
throws Exception {
                                context.currentWatermark(),
                                currentProcessingTime);
 
-               final BucketID bucketId = bucketer.getBucketId(value, 
bucketerContext);
+               final BucketID bucketId = bucketAssigner.getBucketId(value, 
bucketerContext);
+               final Bucket<IN, BucketID> bucket = 
getOrCreateBucketForBucketId(bucketId);
+               bucket.write(value, currentProcessingTime);
+
+               // we update the global max counter here because as buckets 
become inactive and
+               // get removed from the list of active buckets, at the time 
when we want to create
+               // another part file for the bucket, if we start from 0 we may 
overwrite previous parts.
+
+               this.maxPartCounter = Math.max(maxPartCounter, 
bucket.getPartCounter());
+       }
 
+       private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final 
BucketID bucketId) throws IOException {
                Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
                if (bucket == null) {
                        final Path bucketPath = assembleBucketPath(bucketId);
                        bucket = bucketFactory.getNewBucket(
-                                       fileSystemWriter,
+                                       fsWriter,
                                        subtaskIndex,
                                        bucketId,
                                        bucketPath,
-                                       maxPartCounterUsed,
-                                       partFileWriterFactory);
+                                       maxPartCounter,
+                                       partFileWriterFactory,
+                                       rollingPolicy);
                        activeBuckets.put(bucketId, bucket);
                }
-
-               final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
-               if (info == null || rollingPolicy.shouldRollOnEvent(info, 
value)) {
-
-                       // info will be null if there is no currently open part 
file. This
-                       // is the case when we have a new, just created bucket 
or a bucket
-                       // that has not received any data after the closing of 
its previously
-                       // open in-progress file due to the specified rolling 
policy.
-
-                       bucket.rollPartFile(currentProcessingTime);
-               }
-               bucket.write(value, currentProcessingTime);
-
-               // we update the counter here because as buckets become 
inactive and
-               // get removed in the initializeState(), at the time we 
snapshot they
-               // may not be there to take them into account during 
checkpointing.
-               updateMaxPartCounter(bucket.getPartCounter());
+               return bucket;
        }
 
        void onProcessingTime(long timestamp) throws Exception {
                for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
-                       final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
-                       if (info != null && 
rollingPolicy.shouldRollOnProcessingTime(info, timestamp)) {
-                               bucket.closePartFile();
-                       }
+                       bucket.rollOnProcessingTimeIfNeeded(timestamp);
 
 Review comment:
   rename to `onProcessingTime` to stay similar to `onReceptionOfCheckpoint`, 
i.e. `on<Event>`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to