kl0u commented on a change in pull request #13576:
URL: https://github.com/apache/flink/pull/13576#discussion_r503149402



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/Committer.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.connector.sink;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for committing the data to the external 
system.
+ *
+ * @param <CommT> The type of the committable data.
+ */
+@Experimental
+public interface Committer<CommT> extends AutoCloseable {
+
+       /**
+        * Commit the given collection of {@link CommT}.
+        * @param committable the data needed to be committed.

Review comment:
       Same as class javadoc.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/Committer.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.connector.sink;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for committing the data to the external 
system.
+ *
+ * @param <CommT> The type of the committable data.

Review comment:
       Given that it may only be metadata, and not the actual data, how about 
"The type of information needed to commit data staged by the sink."?

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/Committer.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.connector.sink;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for committing the data to the external 
system.
+ *
+ * @param <CommT> The type of the committable data.
+ */
+@Experimental
+public interface Committer<CommT> extends AutoCloseable {
+
+       /**
+        * Commit the given collection of {@link CommT}.
+        * @param committable the data needed to be committed.
+        * @return a collection of {@link CommT} that is needed to re-commit 
latter.

Review comment:
       Probably mention that this is needed in case we need to implement a 
"commit -with -retry" (just to make it clearer). Also it may make sense to 
mention when we are going to retry (e.g. at the next notify checkpoint 
complete).

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/GlobalCommitter.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.connector.sink;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.List;
+
+/**
+ * The {@link GlobalCommitter} is responsible for committing an aggregated 
committable, which we called global committable.
+ *
+ * @param <CommT>         The type of the committable data
+ * @param <GlobalCommT>   The type of the aggregated committable
+ */
+@Experimental
+public interface GlobalCommitter<CommT, GlobalCommT> extends 
Committer<GlobalCommT> {
+
+       /**
+        * Find out which global committables need to be retried when 
recovering from the failure.
+        * @param globalCommittables the global committable that are properly 
not committed in the previous attempt.
+        * @return the global committables that should be committed again.
+        */
+       List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> 
globalCommittables);
+
+       /**
+        * Compute an aggregated committable from a collection of committables.
+        * @param committables a collection of committables that are needed to 
combine

Review comment:
       The collection of commitables to be combined into a Global Committable.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.connector.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This interface lets the sink developer build a simple sink topology, which 
could guarantee the exactly once
+ * semantics in both batch and stream execution mode if there is a {@link 
Committer} or {@link GlobalCommitter}.
+ * 1. The {@link Writer} is responsible for producing the committable.
+ * 2. The {@link Committer} is responsible for committing a single committable.
+ * 3. The {@link GlobalCommitter} is responsible for committing an aggregated 
committable, which we called the global
+ *    committable. There is only one instance of the {@link GlobalCommitter}.

Review comment:
       Maybe write more explicitly that "The GlobalCommitter is always executed 
with a parallelism of 1.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.connector.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This interface lets the sink developer build a simple sink topology, which 
could guarantee the exactly once
+ * semantics in both batch and stream execution mode if there is a {@link 
Committer} or {@link GlobalCommitter}.
+ * 1. The {@link Writer} is responsible for producing the committable.
+ * 2. The {@link Committer} is responsible for committing a single committable.
+ * 3. The {@link GlobalCommitter} is responsible for committing an aggregated 
committable, which we called the global

Review comment:
       "called" -> "call"

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/GlobalCommitter.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.connector.sink;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.List;
+
+/**
+ * The {@link GlobalCommitter} is responsible for committing an aggregated 
committable, which we called global committable.
+ *
+ * @param <CommT>         The type of the committable data
+ * @param <GlobalCommT>   The type of the aggregated committable
+ */
+@Experimental
+public interface GlobalCommitter<CommT, GlobalCommT> extends 
Committer<GlobalCommT> {
+
+       /**
+        * Find out which global committables need to be retried when 
recovering from the failure.
+        * @param globalCommittables the global committable that are properly 
not committed in the previous attempt.

Review comment:
       What about: "A list of < GlobalCommT> for which we want to verify which 
ones were successfully committed and which ones did not."

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/GlobalCommitter.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.connector.sink;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.List;
+
+/**
+ * The {@link GlobalCommitter} is responsible for committing an aggregated 
committable, which we called global committable.
+ *
+ * @param <CommT>         The type of the committable data
+ * @param <GlobalCommT>   The type of the aggregated committable
+ */
+@Experimental
+public interface GlobalCommitter<CommT, GlobalCommT> extends 
Committer<GlobalCommT> {
+
+       /**
+        * Find out which global committables need to be retried when 
recovering from the failure.
+        * @param globalCommittables the global committable that are properly 
not committed in the previous attempt.
+        * @return the global committables that should be committed again.
+        */
+       List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> 
globalCommittables);
+
+       /**
+        * Compute an aggregated committable from a collection of committables.
+        * @param committables a collection of committables that are needed to 
combine
+        * @return an aggregated committable
+        */
+       GlobalCommT combine(List<CommT> committables);
+
+       /**
+        * Commit the given collection of {@link GlobalCommT}.
+        * @param globalCommittables a collection of {@link GlobalCommT}.
+        * @return a collection of {@link GlobalCommT} that is needed to 
re-commit latter.
+        * @throws Exception if the commit operation fail and do not want to 
retry any more.
+        */
+       List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws 
Exception;
+
+       /**
+        * There is no committable any more.

Review comment:
       "There is no committable any more." -> "Signals that there is no 
committable any more."

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/Writer.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.connector.sink;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.List;
+
+/**
+ * The interface is responsible for writing data and handling any potential 
tmp area used to write yet un-staged data, e.g. in-progress files.

Review comment:
       "The" -> "This"

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/GlobalCommitter.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.api.connector.sink;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.List;
+
+/**
+ * The {@link GlobalCommitter} is responsible for committing an aggregated 
committable, which we called global committable.
+ *
+ * @param <CommT>         The type of the committable data
+ * @param <GlobalCommT>   The type of the aggregated committable
+ */
+@Experimental
+public interface GlobalCommitter<CommT, GlobalCommT> extends 
Committer<GlobalCommT> {
+
+       /**
+        * Find out which global committables need to be retried when 
recovering from the failure.
+        * @param globalCommittables the global committable that are properly 
not committed in the previous attempt.
+        * @return the global committables that should be committed again.
+        */
+       List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> 
globalCommittables);
+
+       /**
+        * Compute an aggregated committable from a collection of committables.
+        * @param committables a collection of committables that are needed to 
combine
+        * @return an aggregated committable
+        */
+       GlobalCommT combine(List<CommT> committables);
+
+       /**
+        * Commit the given collection of {@link GlobalCommT}.
+        * @param globalCommittables a collection of {@link GlobalCommT}.
+        * @return a collection of {@link GlobalCommT} that is needed to 
re-commit latter.

Review comment:
       Align with the message in the `Committer` if you change it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to