rkhachatryan commented on a change in pull request #10847:
URL: https://github.com/apache/flink/pull/10847#discussion_r554126563



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * JDBC exactly once sink options.
+ *
+ * <p><b>maxCommitAttempts</b> - maximum number of commit attempts to make per 
transaction; must be
+ * > 0; state size is proportional to the product of max number of in-flight 
snapshots and this
+ * number.
+ *
+ * <p><b>allowOutOfOrderCommits</b> - If true, all prepared transactions will 
be attempted to commit
+ * regardless of any transient failures during this operation. This may lead 
to inconsistency.
+ * Default: false.
+ *
+ * <p><b>recoveredAndRollback</b> - whether to rollback prepared transactions 
known to XA RM on
+ * startup (after committing <b>known</b> transactions, i.e. restored from 
state).
+ *
+ * <p>NOTE that setting this parameter to true may:
+ *
+ * <ol>
+ *   <li>interfere with other subtasks or applications (one subtask rolling 
back transactions
+ *       prepared by the other one (and known to it))
+ *   <li>block when using with some non-MVCC databases, if there are 
ended-not-prepared transactions
+ * </ol>
+ *
+ * See also {@link org.apache.flink.connector.jdbc.xa.XaFacade#recover()}
+ */
+public class JdbcExactlyOnceOptions implements Serializable {
+
+    private static final boolean DEFAULT_RECOVERED_AND_ROLLBACK = false;
+    private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3;
+    private static final boolean DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS = false;
+
+    private final boolean discoverAndRollbackOnRecovery;
+    private final int maxCommitAttempts;
+    private final boolean allowOutOfOrderCommits;
+    private final Integer timeoutSec;
+
+    private JdbcExactlyOnceOptions(
+            boolean discoverAndRollbackOnRecovery,
+            int maxCommitAttempts,
+            boolean allowOutOfOrderCommits,
+            Optional<Integer> timeoutSec) {
+        this.discoverAndRollbackOnRecovery = discoverAndRollbackOnRecovery;
+        this.maxCommitAttempts = maxCommitAttempts;
+        this.allowOutOfOrderCommits = allowOutOfOrderCommits;
+        this.timeoutSec = timeoutSec.orElse(null);
+        Preconditions.checkArgument(this.maxCommitAttempts > 0, 
"maxCommitAttempts should be > 0");
+    }
+
+    public static JdbcExactlyOnceOptions defaults() {
+        return builder().build();
+    }
+
+    public boolean isDiscoverAndRollbackOnRecovery() {
+        return discoverAndRollbackOnRecovery;
+    }
+
+    public boolean isAllowOutOfOrderCommits() {
+        return allowOutOfOrderCommits;
+    }
+
+    public int getMaxCommitAttempts() {
+        return maxCommitAttempts;
+    }
+
+    public Integer getTimeoutSec() {
+        return timeoutSec;
+    }
+
+    public static JDBCExactlyOnceOptionsBuilder builder() {
+        return new JDBCExactlyOnceOptionsBuilder();
+    }
+
+    /** JDBCExactlyOnceOptionsBuilder. */
+    public static class JDBCExactlyOnceOptionsBuilder {
+        private boolean recoveredAndRollback = DEFAULT_RECOVERED_AND_ROLLBACK;
+        private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
+        private boolean allowOutOfOrderCommits = 
DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS;
+        private Optional<Integer> timeoutSec = Optional.empty();
+
+        public JDBCExactlyOnceOptionsBuilder withRecoveredAndRollback(

Review comment:
       Sure, good point




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to