This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/jdbcPatch
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 220bbf85034583124d24d8e1f822442f2d5e2e42
Author: Danny McCormick <[email protected]>
AuthorDate: Mon Feb 24 14:20:26 2025 -0500

    Update JdbcIO.java
---
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   | 22 ++++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index a31745754d0..906c07d501d 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -2663,6 +2663,7 @@ public class JdbcIO {
         Metrics.distribution(WriteFn.class, "milliseconds_per_batch");
 
     private final WriteFnSpec<T, V> spec;
+    private Lock connectionLock = new ReentrantLock();
     private @Nullable DataSource dataSource;
     private @Nullable Connection connection;
     private @Nullable PreparedStatement preparedStatement;
@@ -2698,13 +2699,20 @@ public class JdbcIO {
 
     private Connection getConnection() throws SQLException {
       Connection connection = this.connection;
+      connectionLock.lock();
+      try {
+        if (this.connection == null) {
+          DataSource validSource = checkStateNotNull(dataSource);
+          this.connection = validSource.getConnection();
+          this.connection.setAutoCommit(false);
+          preparedStatement =
+              
this.connection.prepareStatement(checkStateNotNull(spec.getStatement()).get());
+        }
+      } finally {
+        connectionLock.unlock();
+      }
       if (connection == null) {
-        DataSource validSource = checkStateNotNull(dataSource);
-        connection = validSource.getConnection();
-        connection.setAutoCommit(false);
-        preparedStatement =
-            
connection.prepareStatement(checkStateNotNull(spec.getStatement()).get());
-        this.connection = connection;
+        connection = this.connection;
 
         KV<@Nullable String, String> tableWithSchema;
         if (Strings.isNullOrEmpty(spec.getTable()) && spec.getStatement() != 
null) {
@@ -2769,8 +2777,10 @@ public class JdbcIO {
       BackOff backoff = checkStateNotNull(retryBackOff).backoff();
       RetryStrategy retryStrategy = checkStateNotNull(spec.getRetryStrategy());
       while (true) {
+        LOG.info("Getting prepared statement");
         try (PreparedStatement preparedStatement =
             
getConnection().prepareStatement(checkStateNotNull(spec.getStatement()).get())) 
{
+          LOG.info("Got prepared statement");
           try {
             // add each record in the statement batch
             int recordsInBatch = 0;

Reply via email to