kl0u commented on a change in pull request #13576: URL: https://github.com/apache/flink/pull/13576#discussion_r503149402
########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/Committer.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.api.connector.sink; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; + +/** + * This interface is responsible for committing the data to the external system. + * + * @param <CommT> The type of the committable data. + */ +@Experimental +public interface Committer<CommT> extends AutoCloseable { + + /** + * Commit the given collection of {@link CommT}. + * @param committable the data needed to be committed. Review comment: Same as class javadoc. ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/Committer.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.api.connector.sink; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; + +/** + * This interface is responsible for committing the data to the external system. + * + * @param <CommT> The type of the committable data. Review comment: Given that it may only be metadata, and not the actual data, how about "The type of information needed to commit data staged by the sink."? ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/Committer.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.api.connector.sink; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; + +/** + * This interface is responsible for committing the data to the external system. + * + * @param <CommT> The type of the committable data. + */ +@Experimental +public interface Committer<CommT> extends AutoCloseable { + + /** + * Commit the given collection of {@link CommT}. + * @param committable the data needed to be committed. + * @return a collection of {@link CommT} that is needed to re-commit latter. Review comment: Probably mention that this is needed in case we need to implement a "commit -with -retry" (just to make it clearer). Also it may make sense to mention when we are going to retry (e.g. at the next notify checkpoint complete). ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/GlobalCommitter.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.api.connector.sink; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; + +/** + * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called global committable. + * + * @param <CommT> The type of the committable data + * @param <GlobalCommT> The type of the aggregated committable + */ +@Experimental +public interface GlobalCommitter<CommT, GlobalCommT> extends Committer<GlobalCommT> { + + /** + * Find out which global committables need to be retried when recovering from the failure. + * @param globalCommittables the global committable that are properly not committed in the previous attempt. + * @return the global committables that should be committed again. + */ + List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables); + + /** + * Compute an aggregated committable from a collection of committables. + * @param committables a collection of committables that are needed to combine Review comment: The collection of commitables to be combined into a Global Committable. ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.api.connector.sink; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; + +/** + * This interface lets the sink developer build a simple sink topology, which could guarantee the exactly once + * semantics in both batch and stream execution mode if there is a {@link Committer} or {@link GlobalCommitter}. + * 1. The {@link Writer} is responsible for producing the committable. + * 2. The {@link Committer} is responsible for committing a single committable. + * 3. The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called the global + * committable. There is only one instance of the {@link GlobalCommitter}. Review comment: Maybe write more explicitly that "The GlobalCommitter is always executed with a parallelism of 1. ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.api.connector.sink; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; + +/** + * This interface lets the sink developer build a simple sink topology, which could guarantee the exactly once + * semantics in both batch and stream execution mode if there is a {@link Committer} or {@link GlobalCommitter}. + * 1. The {@link Writer} is responsible for producing the committable. + * 2. The {@link Committer} is responsible for committing a single committable. + * 3. The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called the global Review comment: "called" -> "call" ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/GlobalCommitter.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.api.connector.sink; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; + +/** + * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called global committable. + * + * @param <CommT> The type of the committable data + * @param <GlobalCommT> The type of the aggregated committable + */ +@Experimental +public interface GlobalCommitter<CommT, GlobalCommT> extends Committer<GlobalCommT> { + + /** + * Find out which global committables need to be retried when recovering from the failure. + * @param globalCommittables the global committable that are properly not committed in the previous attempt. Review comment: What about: "A list of < GlobalCommT> for which we want to verify which ones were successfully committed and which ones did not." ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/GlobalCommitter.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.api.connector.sink; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; + +/** + * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called global committable. + * + * @param <CommT> The type of the committable data + * @param <GlobalCommT> The type of the aggregated committable + */ +@Experimental +public interface GlobalCommitter<CommT, GlobalCommT> extends Committer<GlobalCommT> { + + /** + * Find out which global committables need to be retried when recovering from the failure. + * @param globalCommittables the global committable that are properly not committed in the previous attempt. + * @return the global committables that should be committed again. + */ + List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables); + + /** + * Compute an aggregated committable from a collection of committables. + * @param committables a collection of committables that are needed to combine + * @return an aggregated committable + */ + GlobalCommT combine(List<CommT> committables); + + /** + * Commit the given collection of {@link GlobalCommT}. + * @param globalCommittables a collection of {@link GlobalCommT}. + * @return a collection of {@link GlobalCommT} that is needed to re-commit latter. + * @throws Exception if the commit operation fail and do not want to retry any more. + */ + List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws Exception; + + /** + * There is no committable any more. Review comment: "There is no committable any more." -> "Signals that there is no committable any more." ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/Writer.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.api.connector.sink; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; + +/** + * The interface is responsible for writing data and handling any potential tmp area used to write yet un-staged data, e.g. in-progress files. Review comment: "The" -> "This" ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/GlobalCommitter.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.api.connector.sink; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; + +/** + * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called global committable. + * + * @param <CommT> The type of the committable data + * @param <GlobalCommT> The type of the aggregated committable + */ +@Experimental +public interface GlobalCommitter<CommT, GlobalCommT> extends Committer<GlobalCommT> { + + /** + * Find out which global committables need to be retried when recovering from the failure. + * @param globalCommittables the global committable that are properly not committed in the previous attempt. + * @return the global committables that should be committed again. + */ + List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables); + + /** + * Compute an aggregated committable from a collection of committables. + * @param committables a collection of committables that are needed to combine + * @return an aggregated committable + */ + GlobalCommT combine(List<CommT> committables); + + /** + * Commit the given collection of {@link GlobalCommT}. + * @param globalCommittables a collection of {@link GlobalCommT}. + * @return a collection of {@link GlobalCommT} that is needed to re-commit latter. Review comment: Align with the message in the `Committer` if you change it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
