This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a28590bddf25221c3d1b11b350e7ecec064e6a22
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Apr 15 22:21:18 2021 +0200

    [FLINK-22239][jdbc] Pool connections in JDBC XA sink
    
    Some databases like PostgreSQL and MySql allow at most
    one XA transaction per connection. Using new connection
    for each transaction (and pooling) allows to overcome
    this limitation.
---
 .../org/apache/flink/connector/jdbc/JdbcSink.java  |   4 +-
 .../jdbc/internal/JdbcBatchingOutputFormat.java    |  13 +-
 .../connector/jdbc/xa/JdbcXaSinkFunction.java      |  20 ++-
 .../apache/flink/connector/jdbc/xa/XaFacade.java   |  13 +-
 .../flink/connector/jdbc/xa/XaFacadeImpl.java      |  20 +--
 .../connector/jdbc/xa/XaFacadePoolingImpl.java     | 193 +++++++++++++++++++++
 .../flink/connector/jdbc/xa/XaGroupOpsImpl.java    |   2 +-
 .../connector/jdbc/xa/JdbcXaFacadeTestHelper.java  |  12 +-
 .../connector/jdbc/xa/h2/H2XaResourceWrapper.java  |  26 +--
 9 files changed, 248 insertions(+), 55 deletions(-)

diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
index 7a0ee41..bc12b42 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
@@ -30,7 +30,6 @@ import org.apache.flink.util.function.SerializableSupplier;
 
 import javax.sql.XADataSource;
 
-import java.util.Optional;
 import java.util.function.Function;
 
 /** Facade to create JDBC {@link SinkFunction sinks}. */
@@ -109,8 +108,7 @@ public class JdbcSink {
                 sql,
                 statementBuilder,
                 XaFacade.fromXaDataSourceSupplier(
-                        dataSourceSupplier,
-                        
Optional.ofNullable(exactlyOnceOptions.getTimeoutSec())),
+                        dataSourceSupplier, 
exactlyOnceOptions.getTimeoutSec()),
                 executionOptions,
                 exactlyOnceOptions);
     }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
index 33be5b7..5bf52bb 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -41,7 +41,6 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.concurrent.Executors;
@@ -191,9 +190,7 @@ public class JdbcBatchingOutputFormat<
                 }
                 try {
                     if (!connectionProvider.isConnectionValid()) {
-                        jdbcStatementExecutor.closeStatements();
-                        Connection connection = 
connectionProvider.reestablishConnection();
-                        jdbcStatementExecutor.prepareStatements(connection);
+                        updateExecutor(true);
                     }
                 } catch (Exception exception) {
                     LOG.error(
@@ -367,4 +364,12 @@ public class JdbcBatchingOutputFormat<
     static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] 
types) {
         return (st, record) -> setRecordToStatement(st, types, record);
     }
+
+    public void updateExecutor(boolean reconnect) throws SQLException, 
ClassNotFoundException {
+        jdbcStatementExecutor.closeStatements();
+        jdbcStatementExecutor.prepareStatements(
+                reconnect
+                        ? connectionProvider.reestablishConnection()
+                        : connectionProvider.getConnection());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
index 0748278..17ad730 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -220,10 +221,6 @@ public class JdbcXaSinkFunction<T> extends 
AbstractRichFunction
         super.open(configuration);
         xidGenerator.open();
         xaFacade.open();
-        outputFormat.setRuntimeContext(getRuntimeContext());
-        outputFormat.open(
-                getRuntimeContext().getIndexOfThisSubtask(),
-                getRuntimeContext().getNumberOfParallelSubtasks());
         hangingXids = new 
LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry());
         commitUpToCheckpoint(Optional.empty());
         if (options.isDiscoverAndRollbackOnRecovery()) {
@@ -234,6 +231,11 @@ public class JdbcXaSinkFunction<T> extends 
AbstractRichFunction
             xaGroupOps.recoverAndRollback();
         }
         beginTx(0L);
+        outputFormat.setRuntimeContext(getRuntimeContext());
+        // open format only after starting the transaction so it gets a ready 
to  use connection
+        outputFormat.open(
+                getRuntimeContext().getIndexOfThisSubtask(),
+                getRuntimeContext().getNumberOfParallelSubtasks());
     }
 
     @Override
@@ -264,7 +266,7 @@ public class JdbcXaSinkFunction<T> extends 
AbstractRichFunction
         if (currentXid != null && xaFacade.isOpen()) {
             try {
                 LOG.debug("remove current transaction before closing, xid={}", 
currentXid);
-                xaFacade.failOrRollback(currentXid);
+                xaFacade.failAndRollback(currentXid);
             } catch (Exception e) {
                 LOG.warn("unable to fail/rollback current transaction, 
xid={}", currentXid, e);
             }
@@ -292,16 +294,22 @@ public class JdbcXaSinkFunction<T> extends 
AbstractRichFunction
                     "empty XA transaction (skip), xid: {}, checkpoint {}",
                     currentXid,
                     checkpointId);
+        } catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
         }
         currentXid = null;
     }
 
     /** @param checkpointId to associate with the new transaction. */
-    private void beginTx(long checkpointId) {
+    private void beginTx(long checkpointId) throws Exception {
         Preconditions.checkState(currentXid == null, "currentXid not null");
         currentXid = xidGenerator.generateXid(getRuntimeContext(), 
checkpointId);
         hangingXids.offerLast(currentXid);
         xaFacade.start(currentXid);
+        if (checkpointId > 0) {
+            // associate outputFormat with a new connection that might have 
been opened in start()
+            outputFormat.updateExecutor(false);
+        }
     }
 
     private void commitUpToCheckpoint(Optional<Long> checkpointInclusive) {
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
index 1b9b5ab..4614645 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
@@ -27,7 +27,6 @@ import javax.transaction.xa.Xid;
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.Optional;
 import java.util.function.Supplier;
 
 /**
@@ -40,7 +39,7 @@ import java.util.function.Supplier;
  *   <li>{@link #open}
  *   <li>{@link #start} transaction
  *   <li>{@link #getConnection}, write some data
- *   <li>{@link #endAndPrepare} (or {@link #failOrRollback})
+ *   <li>{@link #endAndPrepare} (or {@link #failAndRollback})
  *   <li>{@link #commit} / {@link #rollback}
  *   <li>{@link #close}
  * </ol>
@@ -52,8 +51,8 @@ public interface XaFacade extends JdbcConnectionProvider, 
Serializable, AutoClos
 
     /** @return a non-serializable instance. */
     static XaFacade fromXaDataSourceSupplier(
-            Supplier<XADataSource> dataSourceSupplier, Optional<Integer> 
timeoutSec) {
-        return new XaFacadeImpl(dataSourceSupplier, timeoutSec);
+            Supplier<XADataSource> dataSourceSupplier, Integer timeoutSec) {
+        return new XaFacadePoolingImpl(() -> new 
XaFacadeImpl(dataSourceSupplier, timeoutSec));
     }
 
     void open() throws Exception;
@@ -61,10 +60,10 @@ public interface XaFacade extends JdbcConnectionProvider, 
Serializable, AutoClos
     boolean isOpen();
 
     /** Start a new transaction. */
-    void start(Xid xid) throws TransientXaException;
+    void start(Xid xid) throws Exception;
 
     /** End and then prepare the transaction. Transaction can't be resumed 
afterwards. */
-    void endAndPrepare(Xid xid) throws TransientXaException, 
EmptyXaTransactionException;
+    void endAndPrepare(Xid xid) throws Exception;
 
     /**
      * Commit previously prepared transaction.
@@ -81,7 +80,7 @@ public interface XaFacade extends JdbcConnectionProvider, 
Serializable, AutoClos
      * End transaction as {@link javax.transaction.xa.XAResource#TMFAIL 
failed}; in case of error,
      * try to roll it back.
      */
-    void failOrRollback(Xid xid) throws TransientXaException;
+    void failAndRollback(Xid xid) throws TransientXaException;
 
     /**
      * Note: this can block on some non-MVCC databases if there are ended not 
prepared transactions.
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java
index 52c8ecf..bfe41e6 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.jdbc.xa;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -80,14 +81,14 @@ class XaFacadeImpl implements XaFacade {
     private transient Connection connection;
     private transient XAConnection xaConnection;
 
-    /** @return a non-serializable instance. */
+    @VisibleForTesting
     static XaFacadeImpl fromXaDataSource(XADataSource ds) {
-        return new XaFacadeImpl(() -> ds, empty());
+        return new XaFacadeImpl(() -> ds, null);
     }
 
-    XaFacadeImpl(Supplier<XADataSource> dataSourceSupplier, Optional<Integer> 
timeoutSec) {
+    XaFacadeImpl(Supplier<XADataSource> dataSourceSupplier, Integer 
timeoutSec) {
         this.dataSourceSupplier = 
Preconditions.checkNotNull(dataSourceSupplier);
-        this.timeoutSec = timeoutSec.orElse(null);
+        this.timeoutSec = timeoutSec;
     }
 
     @Override
@@ -115,10 +116,6 @@ class XaFacadeImpl implements XaFacade {
             connection.close();
             connection = null;
         }
-        if (xaConnection != null) {
-            xaConnection.close();
-            xaConnection = null;
-        }
         xaResource = null;
     }
 
@@ -173,12 +170,15 @@ class XaFacadeImpl implements XaFacade {
     }
 
     @Override
-    public void failOrRollback(Xid xid) {
+    public void failAndRollback(Xid xid) {
         execute(
                 Command.fromRunnable(
                         "end (fail)",
                         xid,
-                        () -> xaResource.end(xid, XAResource.TMFAIL),
+                        () -> {
+                            xaResource.end(xid, XAResource.TMFAIL);
+                            xaResource.rollback(xid);
+                        },
                         err -> {
                             if (err.errorCode >= XA_RBBASE) {
                                 rollback(xid);
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadePoolingImpl.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadePoolingImpl.java
new file mode 100644
index 0000000..04d4855
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadePoolingImpl.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A "pooling" implementation of {@link XaFacade}. Some database implement XA 
such that one
+ * connection is limited to a single transaction. As a workaround, this 
implementation creates a new
+ * XA resource after each xa_start call is made (and associates it with the 
xid to commit later).
+ */
+@Internal
+class XaFacadePoolingImpl implements XaFacade {
+    private static final long serialVersionUID = 1L;
+
+    public interface FacadeSupplier extends Serializable, Supplier<XaFacade> {}
+
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(XaFacadePoolingImpl.class);
+    private final FacadeSupplier facadeSupplier;
+    private transient XaFacade active;
+    private transient Map<Xid, XaFacade> mappedToXids;
+    private transient Deque<XaFacade> pooled;
+
+    XaFacadePoolingImpl(FacadeSupplier facadeSupplier) {
+        this.facadeSupplier = facadeSupplier;
+    }
+
+    @Override
+    public void open() throws Exception {
+        checkState(active == null);
+        pooled = new LinkedList<>();
+        mappedToXids = new HashMap<>();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return active != null && active.isOpen();
+    }
+
+    @Override
+    public void start(Xid xid) throws Exception {
+        checkState(active == null);
+        if (pooled.isEmpty()) {
+            active = facadeSupplier.get();
+            active.open();
+        } else {
+            active = pooled.poll();
+        }
+        active.start(xid);
+        mappedToXids.put(xid, active);
+    }
+
+    /**
+     * Must be called after {@link #start(Xid)} with the same {@link Xid}.
+     *
+     * @see XaFacade#endAndPrepare(Xid)
+     */
+    @Override
+    public void endAndPrepare(Xid xid) throws Exception {
+        checkState(active == mappedToXids.get(xid));
+        try {
+            active.endAndPrepare(xid);
+        } finally {
+            active = null;
+        }
+    }
+
+    @Override
+    public void commit(Xid xid, boolean ignoreUnknown) throws 
TransientXaException {
+        runForXid(xid, facade -> facade.commit(xid, ignoreUnknown));
+    }
+
+    @Override
+    public void rollback(Xid xid) throws TransientXaException {
+        runForXid(xid, facade -> facade.rollback(xid));
+    }
+
+    @Override
+    public void failAndRollback(Xid xid) throws TransientXaException {
+        runForXid(xid, facade -> facade.failAndRollback(xid));
+    }
+
+    @Override
+    public Collection<Xid> recover() throws TransientXaException {
+        return peekPooled().recover();
+    }
+
+    @Override
+    public void close() throws Exception {
+        for (XaFacade facade : mappedToXids.values()) {
+            facade.close();
+        }
+        for (XaFacade facade : pooled) {
+            facade.close();
+        }
+        if (active != null && active.isOpen()) {
+            active.close();
+        }
+    }
+
+    @Nullable
+    @Override
+    public Connection getConnection() {
+        return active.getConnection();
+    }
+
+    @Override
+    public boolean isConnectionValid() throws SQLException {
+        return active.isConnectionValid();
+    }
+
+    @Override
+    public Connection getOrEstablishConnection() throws SQLException, 
ClassNotFoundException {
+        return active.getOrEstablishConnection();
+    }
+
+    @Override
+    public void closeConnection() {
+        active.closeConnection();
+    }
+
+    @Override
+    public Connection reestablishConnection() throws SQLException, 
ClassNotFoundException {
+        return active.reestablishConnection();
+    }
+
+    // WARN: action MUST leave the facade in IDLE state (i.e. not 
start/end/prepare any tx)
+    private void runForXid(Xid xid, ThrowingConsumer<XaFacade, 
TransientXaException> action) {
+        XaFacade mapped = mappedToXids.remove(xid);
+        if (mapped == null) {
+            // a transaction can be not known during recovery
+            LOG.debug("No XA resource found associated with XID: {}", xid);
+            action.accept(peekPooled());
+        } else {
+            LOG.debug("Found mapped XA resource for XID: {} {}", xid, mapped);
+            try {
+                action.accept(mapped);
+            } finally {
+                pooled.offer(mapped);
+            }
+        }
+    }
+
+    // WARN: the returned facade MUST be left in IDLE state (i.e. not 
start/end/prepare any tx)
+    private XaFacade peekPooled() {
+        XaFacade xaFacade = pooled.peek();
+        if (xaFacade == null) {
+            xaFacade = facadeSupplier.get();
+            try {
+                xaFacade.open();
+            } catch (Exception e) {
+                rethrow(e);
+            }
+            pooled.offer(xaFacade);
+        }
+        return xaFacade;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java
index 396b165..c9159b8 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java
@@ -87,7 +87,7 @@ class XaGroupOpsImpl implements XaGroupOps {
         }
         for (Xid x : xids) {
             try {
-                xaFacade.failOrRollback(x);
+                xaFacade.failAndRollback(x);
                 result.succeeded(x);
             } catch (TransientXaException e) {
                 LOG.info("unable to fail/rollback transaction, xid={}: {}", x, 
e.getMessage());
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java
index ac9cdd9..46a1141 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java
@@ -39,14 +39,24 @@ class JdbcXaFacadeTestHelper implements AutoCloseable {
     private final XADataSource xaDataSource;
     private final String table;
     private final String dbUrl;
+    private final String user;
+    private final String pass;
     private final XaFacade xaFacade;
 
     JdbcXaFacadeTestHelper(XADataSource xaDataSource, String dbUrl, String 
table) throws Exception {
+        this(xaDataSource, dbUrl, table, "", "");
+    }
+
+    JdbcXaFacadeTestHelper(
+            XADataSource xaDataSource, String dbUrl, String table, String 
user, String pass)
+            throws Exception {
         this.xaDataSource = xaDataSource;
         this.dbUrl = dbUrl;
         this.table = table;
         this.xaFacade = XaFacadeImpl.fromXaDataSource(this.xaDataSource);
         this.xaFacade.open();
+        this.user = user;
+        this.pass = pass;
     }
 
     void assertPreparedTxCountEquals(int expected) {
@@ -77,7 +87,7 @@ class JdbcXaFacadeTestHelper implements AutoCloseable {
 
     private List<Integer> getInsertedIds() throws SQLException {
         List<Integer> dbContents = new ArrayList<>();
-        try (Connection connection = DriverManager.getConnection(dbUrl)) {
+        try (Connection connection = DriverManager.getConnection(dbUrl, user, 
pass)) {
             
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
             connection.setReadOnly(true);
             try (Statement st = connection.createStatement()) {
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java
index bdf50d3..0a8572f 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java
@@ -232,17 +232,9 @@ public class H2XaResourceWrapper implements XAResource {
     }
 
     private void finalizeTx(ThrowingRunnable<XAException> runnable) throws 
XAException {
-        // underlying implementation sets autocommit on just after commit or 
rollback
-        // to prevent the actual change we turn it on the lower level and then 
revert
-        withSessionAutocommitOn(
-                () ->
-                        // underlying implementation nulls out current 
transaction just after commit
-                        // or rollback
-                        // which prevents it from being prepared afterwards
-                        withCurrentTransaction(runnable));
-    }
-
-    private void withCurrentTransaction(ThrowingRunnable<XAException> 
runnable) throws XAException {
+        // underlying implementation nulls out current transaction just after 
commit
+        // or rollback
+        // which prevents it from being prepared afterwards
         Object current = getCurrentTransaction();
         try {
             runnable.run();
@@ -252,16 +244,4 @@ public class H2XaResourceWrapper implements XAResource {
             setCurrentTransaction(current);
         }
     }
-
-    private void withSessionAutocommitOn(ThrowingRunnable<XAException> 
runnable)
-            throws XAException {
-        JdbcConnection conn = getJdbcConnection();
-        boolean autoCommit = conn.getSession().getAutoCommit();
-        conn.getSession().setAutoCommit(true);
-        try {
-            runnable.run();
-        } finally {
-            conn.getSession().setAutoCommit(autoCommit);
-        }
-    }
 }

Reply via email to