zentol commented on a change in pull request #6477: [FLINK-10027] Add logging
to StreamingFileSink
URL: https://github.com/apache/flink/pull/6477#discussion_r207219546
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
##########
@@ -222,79 +224,56 @@ void onElement(IN value, SinkFunction.Context context)
throws Exception {
context.currentWatermark(),
currentProcessingTime);
- final BucketID bucketId = bucketer.getBucketId(value,
bucketerContext);
+ final BucketID bucketId = bucketAssigner.getBucketId(value,
bucketerContext);
+ final Bucket<IN, BucketID> bucket =
getOrCreateBucketForBucketId(bucketId);
+ bucket.write(value, currentProcessingTime);
+
+ // we update the global max counter here because as buckets
become inactive and
+ // get removed from the list of active buckets, at the time
when we want to create
+ // another part file for the bucket, if we start from 0 we may
overwrite previous parts.
+
+ this.maxPartCounter = Math.max(maxPartCounter,
bucket.getPartCounter());
+ }
+ private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final
BucketID bucketId) throws IOException {
Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
if (bucket == null) {
final Path bucketPath = assembleBucketPath(bucketId);
bucket = bucketFactory.getNewBucket(
- fileSystemWriter,
+ fsWriter,
subtaskIndex,
bucketId,
bucketPath,
- maxPartCounterUsed,
- partFileWriterFactory);
+ maxPartCounter,
+ partFileWriterFactory,
+ rollingPolicy);
activeBuckets.put(bucketId, bucket);
}
-
- final PartFileInfo<BucketID> info =
bucket.getInProgressPartInfo();
- if (info == null || rollingPolicy.shouldRollOnEvent(info,
value)) {
-
- // info will be null if there is no currently open part
file. This
- // is the case when we have a new, just created bucket
or a bucket
- // that has not received any data after the closing of
its previously
- // open in-progress file due to the specified rolling
policy.
-
- bucket.rollPartFile(currentProcessingTime);
- }
- bucket.write(value, currentProcessingTime);
-
- // we update the counter here because as buckets become
inactive and
- // get removed in the initializeState(), at the time we
snapshot they
- // may not be there to take them into account during
checkpointing.
- updateMaxPartCounter(bucket.getPartCounter());
+ return bucket;
}
void onProcessingTime(long timestamp) throws Exception {
for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
- final PartFileInfo<BucketID> info =
bucket.getInProgressPartInfo();
- if (info != null &&
rollingPolicy.shouldRollOnProcessingTime(info, timestamp)) {
- bucket.closePartFile();
- }
+ bucket.rollOnProcessingTimeIfNeeded(timestamp);
Review comment:
rename to `onProcessingTime` to stay similar to `onReceptionOfCheckpoint`,
i.e. `on<Event>`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services