AHeise commented on a change in pull request #16701:
URL: https://github.com/apache/flink/pull/16701#discussion_r682584648



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperatorFactory.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+import java.util.Optional;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} 
for {@link
+ * SinkOperator}.
+ *
+ * @param <InputT> The input type of the {@link SinkWriter}.
+ * @param <CommT> The committable type of the {@link SinkWriter}.
+ * @param <WriterStateT> The type of the {@link SinkWriter Writer's} state.
+ */
+public final class SinkOperatorFactory<InputT, CommT, WriterStateT>

Review comment:
       I agree but this class depends on the specific `CommitterHandler`s which 
I wanted to convert in this commit.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java
##########
@@ -33,44 +30,52 @@
  * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for 
executing {@link
  * Committer} in the batch execution mode.
  *
- * @param <CommT> The committable type of the {@link Committer}.
+ * @param <InputT> The committable type of the {@link Committer}.
  */
-final class BatchCommitterOperator<CommT> extends AbstractStreamOperator<CommT>
-        implements OneInputStreamOperator<CommT, CommT>, BoundedOneInput {
+final class BatchCommitterHandler<InputT, OutputT>
+        extends AbstractCommitterHandler<InputT, OutputT> {
 
     /** Responsible for committing the committable to the external system. */
-    private final Committer<CommT> committer;
+    private final Committer<InputT> committer;
 
-    /** Record all the committables until the end of the input. */
-    private final List<CommT> allCommittables;
+    /**
+     * The committer that is chained to this committer. It's either {@link
+     * GlobalBatchCommitterHandler} or {@link NoopCommitterHandler}.

Review comment:
       The `NoopCommitterHandler` avoids null checks inside this class. It's a 
matter of flavor but in general a `NoopCommitterHandler` is the proper OOP 
approach (also dubbed case-less programming because of VMT).

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -161,6 +163,12 @@ private FileSink(BucketsBuilder<IN, ? extends 
BucketsBuilder<IN, ?>> bucketsBuil
         return Optional.empty();
     }
 
+    @Override
+    public Collection<String> getCompatibleStateNames() {

Review comment:
       I don't understand your comment. Why would you favor a `List` here? The 
only difference between a collection and a list is that the latter is 
implicitly ordered and indexed. None of that matters here.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java
##########
@@ -81,9 +83,17 @@
     /** Return the serializer of the writer's state type. */
     Optional<SimpleVersionedSerializer<WriterStateT>> 
getWriterStateSerializer();
 
+    /**
+     * A list of state names of sinks from which the state can be restored. 
For example, the new
+     * file sink can resume from the state of an old streaming file sink as a 
drop-in replacement
+     * when resuming from a checkpoint/savepoint.
+     */
+    default Collection<String> getCompatibleStateNames() {

Review comment:
       I consider it more of a bugfix ;)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A wrapper around multiple {@link 
org.apache.flink.api.connector.sink.Committer} or {@link
+ * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states.
+ *
+ * @param <InputT>
+ * @param <OutputT>
+ */
+interface CommitterHandler<InputT, OutputT> extends AutoCloseable, 
Serializable {
+
+    /** Initializes the state of the committer and this handler. */
+    default void initializeState(StateInitializationContext context) throws 
Exception {}
+
+    /** Snapshots the state of the committer and this handler. */
+    default void snapshotState(StateSnapshotContext context) throws Exception 
{}
+
+    /**
+     * Processes the committables be either directly transforming them or by 
adding them to the
+     * internal state of this handler. The supplier should only be queried 
once.
+     *
+     * @return a list of output committables that is send downstream.
+     */
+    List<OutputT> processCommittables(
+            SupplierWithException<List<InputT>, IOException> 
committableSupplier)
+            throws IOException;
+
+    /**
+     * Called when no more committables are going to be added through {@link
+     * #processCommittables(SupplierWithException)}.
+     *
+     * @return a list of output committables that is send downstream.
+     */
+    default List<OutputT> endOfInput() throws IOException {
+        return Collections.emptyList();
+    }
+
+    default void close() throws Exception {}
+
+    /** Called when a checkpoint is completed and returns a list of output to 
be sent downstream. */
+    default Collection<OutputT> notifyCheckpointCompleted(long checkpointId) 
throws IOException {
+        return Collections.emptyList();
+    }
+
+    /** Swallows all committables and emits nothing. */
+    @SuppressWarnings("unchecked")
+    static <InputT, OutputT> CommitterHandler<InputT, OutputT> noop() {

Review comment:
       I'll remove it. Originally, `CommitterHandler` was public and the 
composition happened in the SinkTranslater. 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} 
for {@link
+ * CommitterOperator}.
+ *
+ * @param <CommT> the type of the committable
+ * @param <GlobalCommT> the type of the global committable
+ */
+public final class CommitterOperatorFactory<CommT, GlobalCommT>
+        extends AbstractStreamOperatorFactory<byte[]>
+        implements OneInputStreamOperatorFactory<byte[], byte[]> {
+
+    private final Sink<?, CommT, ?, GlobalCommT> sink;
+    private final boolean batch;
+
+    public CommitterOperatorFactory(Sink<?, CommT, ?, GlobalCommT> sink, 
boolean batch) {
+        this.sink = sink;
+        this.batch = batch;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T extends StreamOperator<byte[]>> T createStreamOperator(
+            StreamOperatorParameters<byte[]> parameters) {
+
+        SimpleVersionedSerializer<CommT> committableSerializer =
+                
sink.getCommittableSerializer().orElseThrow(this::noSerializerFound);
+        try {
+            CommitterHandler<CommT, GlobalCommT> committerHandler = 
getGlobalCommitterHandler();
+            if (batch) {
+                Optional<Committer<CommT>> committer = sink.createCommitter();
+                if (committer.isPresent()) {
+                    committerHandler =
+                            new BatchCommitterHandler<>(committer.get(), 
committerHandler);
+                }
+            }
+
+            checkState(

Review comment:
       It's always succeeding (`Precondition#checkState` fails on `false`) in 
your case. However, it could fail if there is no global committer in streaming 
and it fails if there is neither a committer nor a global committer in batch. 
But this is a programmatic error in `SinkTranslater`; hence the `checkState`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandler.java
##########
@@ -72,11 +58,12 @@ public void endInput() throws Exception {
             }
         }
         globalCommitter.endOfInput();
+        return Collections.emptyList();
     }
 
     @Override
     public void close() throws Exception {
-        super.close();
         globalCommitter.close();
+        super.close();

Review comment:
       You are supposed to close resources always in the opposite order of 
creation. That's also how try-with-resource and Guava's Closer work. The main 
idea is that your later resources may depend on the prior resources to finish 
properly.
   Here, it doesn't matter, but I'd still like to have it consistently correct 
in Flink.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandler.java
##########
@@ -72,11 +58,12 @@ public void endInput() throws Exception {
             }
         }
         globalCommitter.endOfInput();
+        return Collections.emptyList();
     }
 
     @Override
     public void close() throws Exception {
-        super.close();
         globalCommitter.close();
+        super.close();

Review comment:
       You are supposed to close resources always in the opposite order of 
creation. That's also how try-with-resource and Guava's Closer work. The main 
idea is that your later resources may depend on the prior resources to finish 
properly. (Think of a transaction that flushes, so you'd close the transaction 
before the client).
   Here, it doesn't matter, but I'd still like to have it consistently correct 
in Flink.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java
##########
@@ -33,44 +30,52 @@
  * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for 
executing {@link
  * Committer} in the batch execution mode.
  *
- * @param <CommT> The committable type of the {@link Committer}.
+ * @param <InputT> The committable type of the {@link Committer}.
  */
-final class BatchCommitterOperator<CommT> extends AbstractStreamOperator<CommT>
-        implements OneInputStreamOperator<CommT, CommT>, BoundedOneInput {
+final class BatchCommitterHandler<InputT, OutputT>
+        extends AbstractCommitterHandler<InputT, OutputT> {
 
     /** Responsible for committing the committable to the external system. */
-    private final Committer<CommT> committer;
+    private final Committer<InputT> committer;
 
-    /** Record all the committables until the end of the input. */
-    private final List<CommT> allCommittables;
+    /**
+     * The committer that is chained to this committer. It's either {@link
+     * GlobalBatchCommitterHandler} or {@link NoopCommitterHandler}.

Review comment:
       Here is some reference https://en.wikipedia.org/wiki/Null_object_pattern 
TL;DR `null` is an external construct in OOP.
   
   Also don't worry about performance here: First, we have a singleton pattern 
as an enum that's very efficient. Second, we are not on the hot path.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A wrapper around multiple {@link 
org.apache.flink.api.connector.sink.Committer} or {@link
+ * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states.
+ *
+ * @param <InputT>
+ * @param <OutputT>
+ */
+interface CommitterHandler<InputT, OutputT> extends AutoCloseable, 
Serializable {
+
+    /** Initializes the state of the committer and this handler. */
+    default void initializeState(StateInitializationContext context) throws 
Exception {}
+
+    /** Snapshots the state of the committer and this handler. */
+    default void snapshotState(StateSnapshotContext context) throws Exception 
{}
+
+    /**
+     * Processes the committables be either directly transforming them or by 
adding them to the
+     * internal state of this handler. The supplier should only be queried 
once.
+     *
+     * @return a list of output committables that is send downstream.
+     */
+    List<OutputT> processCommittables(
+            SupplierWithException<List<InputT>, IOException> 
committableSupplier)
+            throws IOException;
+
+    /**
+     * Called when no more committables are going to be added through {@link
+     * #processCommittables(SupplierWithException)}.
+     *
+     * @return a list of output committables that is send downstream.
+     */
+    default List<OutputT> endOfInput() throws IOException {
+        return Collections.emptyList();
+    }
+
+    default void close() throws Exception {}
+
+    /** Called when a checkpoint is completed and returns a list of output to 
be sent downstream. */
+    default Collection<OutputT> notifyCheckpointCompleted(long checkpointId) 
throws IOException {
+        return Collections.emptyList();
+    }
+
+    /** Swallows all committables and emits nothing. */
+    @SuppressWarnings("unchecked")
+    static <InputT, OutputT> CommitterHandler<InputT, OutputT> noop() {

Review comment:
       I can't fully remove it without introducing some ugliness: This method 
is for casting the singleton into the expected type. The 
`NoopCommitterHandler.INSTANCE` could actually be covariant to any type but you 
can't express that in Java. 
   I have moved it to `NoopCommitterHandler#getInstance()`. PTAL if it's less 
confusing. I'm also promoting `NoopCommitterHandler` to top-level.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A wrapper around multiple {@link 
org.apache.flink.api.connector.sink.Committer} or {@link
+ * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states.
+ *
+ * @param <InputT>
+ * @param <OutputT>
+ */
+interface CommitterHandler<InputT, OutputT> extends AutoCloseable, 
Serializable {
+
+    /** Initializes the state of the committer and this handler. */
+    default void initializeState(StateInitializationContext context) throws 
Exception {}
+
+    /** Snapshots the state of the committer and this handler. */
+    default void snapshotState(StateSnapshotContext context) throws Exception 
{}
+
+    /**
+     * Processes the committables by either directly transforming them or by 
adding them to the
+     * internal state of this handler. The supplier should only be queried 
once.
+     *
+     * @return a list of output committables that is send downstream.
+     */
+    List<OutputT> processCommittables(
+            SupplierWithException<List<InputT>, IOException> 
committableSupplier)
+            throws IOException;
+
+    /**
+     * Called when no more committables are going to be added through {@link
+     * #processCommittables(SupplierWithException)}.
+     *
+     * @return a list of output committables that is send downstream.
+     */
+    default List<OutputT> endOfInput() throws IOException {

Review comment:
       Good questions. I double-checked and EOI on streaming is really just 
meant for flushing data. A final checkpoint happens afterwards (especially 
after FLIP-147).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A wrapper around multiple {@link 
org.apache.flink.api.connector.sink.Committer} or {@link
+ * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states.
+ *
+ * @param <InputT>
+ * @param <OutputT>
+ */
+interface CommitterHandler<InputT, OutputT> extends AutoCloseable, 
Serializable {
+
+    /** Initializes the state of the committer and this handler. */
+    default void initializeState(StateInitializationContext context) throws 
Exception {}
+
+    /** Snapshots the state of the committer and this handler. */
+    default void snapshotState(StateSnapshotContext context) throws Exception 
{}
+
+    /**
+     * Processes the committables by either directly transforming them or by 
adding them to the
+     * internal state of this handler. The supplier should only be queried 
once.
+     *
+     * @return a list of output committables that is send downstream.
+     */
+    List<OutputT> processCommittables(
+            SupplierWithException<List<InputT>, IOException> 
committableSupplier)
+            throws IOException;
+
+    /**
+     * Called when no more committables are going to be added through {@link
+     * #processCommittables(SupplierWithException)}.
+     *
+     * @return a list of output committables that is send downstream.
+     */
+    default List<OutputT> endOfInput() throws IOException {

Review comment:
       Good questions. I double-checked and EOI on streaming is really just 
meant for flushing data. A final checkpoint happens afterwards (especially 
after FLIP-147). Most likely we don't need that for Batch after FLIP-147 but 
that's a future task.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
##########
@@ -139,6 +139,7 @@ public static void assertNoLateRecords(Iterable<Object> 
elements) {
 
         testHarness.processElements(
                 
input.stream().map(StreamRecord::new).collect(Collectors.toList()));
+        testHarness.prepareSnapshotPreBarrier(1);

Review comment:
       Since writer+committer are merged, we now need to simulate more of the 
actual calls. The writer creates the committables on 
`prepareSnapshotPreBarrier` and pass it to the committer which then does it's 
actual work on `snapshotState`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A wrapper around multiple {@link 
org.apache.flink.api.connector.sink.Committer} or {@link
+ * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states.
+ *
+ * @param <InputT>
+ * @param <OutputT>
+ */
+interface CommitterHandler<InputT, OutputT> extends AutoCloseable, 
Serializable {
+
+    /** Initializes the state of the committer and this handler. */
+    default void initializeState(StateInitializationContext context) throws 
Exception {}
+
+    /** Snapshots the state of the committer and this handler. */
+    default void snapshotState(StateSnapshotContext context) throws Exception 
{}

Review comment:
       They are called for all `CommitterHandler` though so we need to leave 
them here. Or we have to do instanceof in the operators.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java
##########
@@ -46,44 +54,81 @@
 public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends 
Serializable {
 
     /**
-     * Create a {@link SinkWriter}.
+     * Create a {@link SinkWriter}. If the application is resumed from a 
checkpoint or savepoint and
+     * the sink is stateful, it will receive the corresponding state obtained 
with {@link
+     * SinkWriter#snapshotState()} and serialized with {@link 
#getWriterStateSerializer()}. If no
+     * state exists, the first existing, compatible state specified in {@link
+     * #getCompatibleStateNames()} will be loaded and passed.
      *
      * @param context the runtime context.
-     * @param states the writer's state.
+     * @param states the writer's previous state.
      * @return A sink writer.
-     * @throws IOException if fail to create a writer.
+     * @throws IOException for any failure during creation.
+     * @see SinkWriter#snapshotState()
+     * @see #getWriterStateSerializer()
+     * @see #getCompatibleStateNames()
      */
     SinkWriter<InputT, CommT, WriterStateT> createWriter(
             InitContext context, List<WriterStateT> states) throws IOException;
 
     /**
-     * Creates a {@link Committer}.
+     * Any stateful sink needs to provide this state serializer and implement 
{@link
+     * SinkWriter#snapshotState()} properly. The respective state is used in 
{@link
+     * #createWriter(InitContext, List)} on recovery.
+     *
+     * @return the serializer of the writer's state type.
+     */
+    Optional<SimpleVersionedSerializer<WriterStateT>> 
getWriterStateSerializer();
+
+    /**
+     * Creates a {@link Committer} which is part of a 2-phase-commit protocol. 
The {@link
+     * SinkWriter} creates committables through {@link 
SinkWriter#prepareCommit(boolean)} in the
+     * first phase. The committables are then passed to this committer and 
persisted with {@link
+     * Committer#commit(List)}. If a committer is returned, the sink must also 
return a {@link
+     * #getCommittableSerializer()}.
      *
-     * @return A committer.
-     * @throws IOException if fail to create a committer.
+     * @return A committer for the 2-phase-commit protocol.
+     * @throws IOException for any failure during creation.
      */
     Optional<Committer<CommT>> createCommitter() throws IOException;
 
     /**
-     * Creates a {@link GlobalCommitter}.
+     * Creates a {@link GlobalCommitter} which is part of a 2-phase-commit 
protocol. The {@link
+     * SinkWriter} creates committables through {@link 
SinkWriter#prepareCommit(boolean)} in the
+     * first phase. The committables are then passed to the Committer and 
persisted with {@link
+     * Committer#commit(List)} which also can return an aggregated 
committable. This aggregated
+     * committables are passed to this {@link GlobalCommitter} of which only a 
single instance
+     * exists. If a global committer is returned, the sink must also return a 
{@link
+     * #getCommittableSerializer()} and {@link 
#getGlobalCommittableSerializer()}.
      *
-     * @return A global committer.
-     * @throws IOException if fail to create a global committer.
+     * @return A global committer for the 2-phase-commit protocol.
+     * @throws IOException for any failure during creation.
      */
     Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() 
throws IOException;
 
-    /** Returns the serializer of the committable type. */
+    /**
+     * Returns the serializer of the committable type. The serializer is 
required iff the sink has a
+     * {@link Committer} or {@link GlobalCommitter}.
+     */
     Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();
 
-    /** Returns the serializer of the aggregated committable type. */
+    /**
+     * Returns the serializer of the aggregated committable type. The 
serializer is required iff the
+     * sink has a {@link GlobalCommitter}.
+     */
     Optional<SimpleVersionedSerializer<GlobalCommT>> 
getGlobalCommittableSerializer();
 
-    /** Return the serializer of the writer's state type. */
-    Optional<SimpleVersionedSerializer<WriterStateT>> 
getWriterStateSerializer();
+    /**
+     * A list of state names of sinks from which the state can be restored. 
For example, the new
+     * file sink can resume from the state of an old streaming file sink as a 
drop-in replacement

Review comment:
       I can't exactly use a JavaDoc link because of circular dependency 
issues. But I have used the proper class names in the code tag to make it more 
obvious.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java
##########
@@ -48,41 +54,70 @@
 public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends 
Serializable {
 
     /**
-     * Create a {@link SinkWriter}.
+     * Create a {@link SinkWriter}. If the application is resumed from a 
checkpoint or savepoint and
+     * the sink is stateful, it will receive the corresponding state obtained 
with {@link
+     * SinkWriter#snapshotState()} and serialized with {@link 
#getWriterStateSerializer()}. If no
+     * state exists, the first existing, compatible state specified in {@link
+     * #getCompatibleStateNames()} will be loaded and passed.
      *
      * @param context the runtime context.
-     * @param states the writer's state.
+     * @param states the writer's previous state.
      * @return A sink writer.
-     * @throws IOException if fail to create a writer.
+     * @throws IOException for any failure during creation.
+     * @see SinkWriter#snapshotState()
+     * @see #getWriterStateSerializer()
+     * @see #getCompatibleStateNames()
      */
     SinkWriter<InputT, CommT, WriterStateT> createWriter(
             InitContext context, List<WriterStateT> states) throws IOException;
 
     /**
-     * Creates a {@link Committer}.
+     * Any stateful sink needs to provide this state serializer and implement 
{@link
+     * SinkWriter#snapshotState()} properly. The respective state is used in 
{@link
+     * #createWriter(InitContext, List)} on recovery.
+     *
+     * @return the serializer of the writer's state type.
+     */
+    Optional<SimpleVersionedSerializer<WriterStateT>> 
getWriterStateSerializer();
+
+    /**
+     * Creates a {@link Committer} which is part of a 2-phase-commit protocol. 
The {@link
+     * SinkWriter} creates committables through {@link 
SinkWriter#prepareCommit(boolean)} in the
+     * first phase. The committables are then passed to this committer and 
persisted with {@link
+     * Committer#commit(List)}. If a committer is returned, the sink must also 
return a {@link
+     * #getCommittableSerializer()}.
      *
-     * @return A committer.
-     * @throws IOException if fail to create a committer.
+     * @return A committer for the 2-phase-commit protocol.
+     * @throws IOException for any failure during creation.
      */
     Optional<Committer<CommT>> createCommitter() throws IOException;
 
     /**
-     * Creates a {@link GlobalCommitter}.
+     * Creates a {@link GlobalCommitter} which is part of a 2-phase-commit 
protocol. The {@link
+     * SinkWriter} creates committables through {@link 
SinkWriter#prepareCommit(boolean)} in the
+     * first phase. The committables are then passed to the Committer and 
persisted with {@link
+     * Committer#commit(List)} which also can return an aggregated 
committable. This aggregated

Review comment:
       You are right. Thanks for double-checking!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to