Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6281#discussion_r201058329
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 ---
    @@ -0,0 +1,297 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink.filesystem;
    +
    +import org.apache.flink.api.common.serialization.Writer;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
    +import org.apache.flink.core.fs.ResumableWriter;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * A bucket is the directory organization of the output of the {@link 
BucketingSink}.
    + *
    + *
    + * <p>For each incoming  element in the {@code BucketingSink}, the 
user-specified
    + * {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer 
Bucketer} is
    + * queried to see in which bucket this element should be written to.
    + */
    +public class Bucket<IN> {
    +
    +   private static final String PART_PREFIX = "part";
    +
    +   private final Path bucketPath;
    +
    +   private int subtaskIndex;
    +
    +   private long partCounter;
    +
    +   private long creationTime;
    +
    +   private long lastWrittenTime;
    +
    +   private final long maxPathSize;
    +
    +   private final long rolloverTime;
    +
    +   private final long inactivityTime;
    +
    +   private final Writer<IN> outputFormatWriter;
    +
    +   private final ResumableWriter fsWriter;
    +
    +   private RecoverableFsDataOutputStream currentOpenPartStream;
    +
    +   private List<ResumableWriter.CommitRecoverable> pending = new 
ArrayList<>();
    +
    +   private Map<Long, List<ResumableWriter.CommitRecoverable>> 
pendingPerCheckpoint = new HashMap<>();
    +
    +   public Bucket(
    +                   ResumableWriter fsWriter,
    +                   int subtaskIndex,
    +                   Path bucketPath,
    +                   long initialPartCounter,
    +                   long maxPartSize,
    +                   long rolloverTime,
    +                   long inactivityTime,
    +                   Writer<IN> writer,
    +                   BucketState bucketstate) throws IOException {
    +
    +           this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, 
maxPartSize, rolloverTime, inactivityTime, writer);
    +
    +           // the constructor must have already initialized the filesystem 
writer
    +           Preconditions.checkState(fsWriter != null);
    +
    +           // we try to resume the previous in-progress file, if the 
filesystem
    +           // supports such operation. If not, we just commit the file and 
start fresh.
    +
    +           final ResumableWriter.ResumeRecoverable resumable = 
bucketstate.getCurrentInProgress();
    +           if (resumable != null) {
    +                   this.currentOpenPartStream = 
fsWriter.recover(resumable);
    +                   this.creationTime = bucketstate.getCreationTime();
    +           }
    +
    +           // we commit pending files for previous checkpoints to the last 
successful one
    +           // (from which we are recovering from)
    +           for (List<ResumableWriter.CommitRecoverable> commitables: 
bucketstate.getPendingPerCheckpoint().values()) {
    +                   for (ResumableWriter.CommitRecoverable commitable: 
commitables) {
    +                           fsWriter.recoverForCommit(commitable).commit();
    --- End diff --
    
    Should this use `commitAfterRecovery()`?


---

Reply via email to