alpreu commented on a change in pull request #18302:
URL: https://github.com/apache/flink/pull/18302#discussion_r780959595



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/GlobalCommitter.java
##########
@@ -57,11 +57,12 @@
     GlobalCommT combine(List<CommT> committables) throws IOException;
 
     /**
-     * Commit the given list of {@link GlobalCommT}.
+     * Commits the given list of {@link GlobalCommT} and returns a list of 
{@link GlobalCommT} that
+     * need to be re-committed. The elements of the return list must be a 
subset of the input list,
+     * so that successful committables can be inferred.
      *
      * @param globalCommittables a list of {@link GlobalCommT}.
-     * @return A list of {@link GlobalCommT} needed to re-commit, which is 
needed in case we
-     *     implement a "commit-with-retry" pattern.
+     * @return a list of {@link GlobalCommT} that need to be * re-committed.

Review comment:
       ```suggestion
        * @return a list of {@link GlobalCommT} that need to be re-committed.
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The {@code Committer} is responsible for committing the data staged by the 
{@link
+ * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a 2pc 
protocol.

Review comment:
       ```suggestion
    * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a 
two-phase commit protocol.
   ```
   
   IMO being explicit is generally better than using abbreviations

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A {@link Sink} for exactly-once semantics using a two-phase commit 
protocol. The {@link Sink}
+ * consists of a {@link SinkWriter} that performs the precommits and a {@link 
Committer} that
+ * actually commits the data. To facilitate the separation the {@link 
SinkWriter} creates
+ * <i>committables</i> on checkpoint or end of input and the sends it to the 
{@link Committer}.
+ *
+ * <p>The {@link TwoPhaseCommittingSink} needs to be serializable. All 
configuration should be
+ * validated eagerly. The respective sink writers and committers are transient 
and will only be
+ * created in the subtasks on the taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <CommT> The type of the committables.
+ */
+@PublicEvolving
+public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
+
+    /**
+     * Create a {@link PrecommittingSinkWriter} that creates committables on 
checkpoint or end of
+     * input.
+     *
+     * @param context the runtime context.
+     * @return A sink writer for the two-phase commit protocol.
+     * @throws IOException for any failure during creation.
+     */
+    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) 
throws IOException;
+
+    /**
+     * Creates a {@link Committer} that permanently makes the previously 
written data visible
+     * through {@link Committer#commit(Collection)}.
+     *
+     * @return A committer for the two-phase commit protocol.
+     * @throws IOException for any failure during creation.
+     */
+    Committer<CommT> createCommitter() throws IOException;
+
+    /** Returns the serializer of the committable type. */
+    SimpleVersionedSerializer<CommT> getCommittableSerializer();
+
+    /** A {@link SinkWriter} that performs the first part of a 2pc protocol. */

Review comment:
       ```suggestion
       /** A {@link SinkWriter} that performs the first part of a two-phase 
commit protocol. */
   ```

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
##########
@@ -806,6 +809,54 @@ public void testConflictSlotSharingGroup() {
         env.getStreamGraph();
     }
 
+    @Test
+    public void testTrackTransformationsByIdentity() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final Transformation<?> noopTransformation = env.fromSequence(1, 
2).getTransformation();
+
+        final StreamGraphGenerator generator =
+                new StreamGraphGenerator(
+                        Lists.newArrayList(
+                                noopTransformation,
+                                new 
FailingTransformation(noopTransformation.hashCode())),

Review comment:
       Is there a benefit here using Guava instead of `Arrays.asList`?

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The {@code Committer} is responsible for committing the data staged by the 
{@link
+ * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a 2pc 
protocol.
+ *
+ * <p>A commit must be idempotent: If some failure occurs in Flink during 
commit phase, Flink will
+ * restart from previous checkpoint and re-attempt to commit all committables. 
Thus, some or all
+ * committables may have already been committed. These {@link CommitRequest}s 
must not change the
+ * external system and implementers are asked to signal {@link 
CommitRequest#alreadyCommitted()}.
+ *
+ * @param <CommT> The type of information needed to commit the staged data
+ */
+@PublicEvolving
+public interface Committer<CommT> extends AutoCloseable {
+    /**
+     * Commit the given list of {@link CommT}.
+     *
+     * @param committables A list of commit requests staged by the sink writer.
+     * @throws IOException for reasons that may yield a complete restart of 
the job.
+     */
+    void commit(Collection<CommitRequest<CommT>> committables)
+            throws IOException, InterruptedException;
+
+    /**
+     * A request to commit a specific committable.
+     *
+     * @param <CommT>
+     */
+    @PublicEvolving
+    interface CommitRequest<CommT> {
+
+        /** Returns the committable. */
+        CommT getCommittable();
+
+        /**
+         * Returns how often this particular committable has been retried. 
Starts at 0 for the first
+         * attempt.
+         */
+        int getNumberOfRetries();
+
+        /**
+         * The commit failed for known reason and should not be retried.
+         *
+         * <p>Currently calling this method only logs the error, discards the 
comittable and
+         * continues. In the future the behaviour might be configurable.

Review comment:
       I checked the usages (also of the other methods in the `CommitRequest` 
interface but I don't see any reference implementation. Is the docstring 
correct as it is then? 

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A {@link Sink} for exactly-once semantics using a two-phase commit 
protocol. The {@link Sink}
+ * consists of a {@link SinkWriter} that performs the precommits and a {@link 
Committer} that
+ * actually commits the data. To facilitate the separation the {@link 
SinkWriter} creates
+ * <i>committables</i> on checkpoint or end of input and the sends it to the 
{@link Committer}.
+ *
+ * <p>The {@link TwoPhaseCommittingSink} needs to be serializable. All 
configuration should be
+ * validated eagerly. The respective sink writers and committers are transient 
and will only be
+ * created in the subtasks on the taskmanagers.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <CommT> The type of the committables.
+ */
+@PublicEvolving
+public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
+
+    /**
+     * Create a {@link PrecommittingSinkWriter} that creates committables on 
checkpoint or end of
+     * input.
+     *
+     * @param context the runtime context.
+     * @return A sink writer for the two-phase commit protocol.
+     * @throws IOException for any failure during creation.
+     */
+    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) 
throws IOException;
+
+    /**
+     * Creates a {@link Committer} that permanently makes the previously 
written data visible
+     * through {@link Committer#commit(Collection)}.
+     *
+     * @return A committer for the two-phase commit protocol.
+     * @throws IOException for any failure during creation.
+     */
+    Committer<CommT> createCommitter() throws IOException;
+
+    /** Returns the serializer of the committable type. */
+    SimpleVersionedSerializer<CommT> getCommittableSerializer();
+
+    /** A {@link SinkWriter} that performs the first part of a 2pc protocol. */
+    @PublicEvolving
+    interface PrecommittingSinkWriter<InputT, CommT> extends 
SinkWriter<InputT> {
+        /**
+         * Prepare for a commit.
+         *
+         * <p>This method will be called after {@link #flush(boolean)} and 
before {@link
+         * StatefulSinkWriter#snapshotState(long)}.
+         *
+         * @return The data to commit as the second step of the 2pc protocol.

Review comment:
       ```suggestion
            * @return The data to commit as the second step of the two-phase 
commit protocol.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to