Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201058329 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A bucket is the directory organization of the output of the {@link BucketingSink}. + * + * + * <p>For each incoming element in the {@code BucketingSink}, the user-specified + * {@link org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer Bucketer} is + * queried to see in which bucket this element should be written to. + */ +public class Bucket<IN> { + + private static final String PART_PREFIX = "part"; + + private final Path bucketPath; + + private int subtaskIndex; + + private long partCounter; + + private long creationTime; + + private long lastWrittenTime; + + private final long maxPathSize; + + private final long rolloverTime; + + private final long inactivityTime; + + private final Writer<IN> outputFormatWriter; + + private final ResumableWriter fsWriter; + + private RecoverableFsDataOutputStream currentOpenPartStream; + + private List<ResumableWriter.CommitRecoverable> pending = new ArrayList<>(); + + private Map<Long, List<ResumableWriter.CommitRecoverable>> pendingPerCheckpoint = new HashMap<>(); + + public Bucket( + ResumableWriter fsWriter, + int subtaskIndex, + Path bucketPath, + long initialPartCounter, + long maxPartSize, + long rolloverTime, + long inactivityTime, + Writer<IN> writer, + BucketState bucketstate) throws IOException { + + this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, maxPartSize, rolloverTime, inactivityTime, writer); + + // the constructor must have already initialized the filesystem writer + Preconditions.checkState(fsWriter != null); + + // we try to resume the previous in-progress file, if the filesystem + // supports such operation. If not, we just commit the file and start fresh. + + final ResumableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress(); + if (resumable != null) { + this.currentOpenPartStream = fsWriter.recover(resumable); + this.creationTime = bucketstate.getCreationTime(); + } + + // we commit pending files for previous checkpoints to the last successful one + // (from which we are recovering from) + for (List<ResumableWriter.CommitRecoverable> commitables: bucketstate.getPendingPerCheckpoint().values()) { + for (ResumableWriter.CommitRecoverable commitable: commitables) { + fsWriter.recoverForCommit(commitable).commit(); --- End diff -- Should this use `commitAfterRecovery()`?
---