This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/jdbcPatch in repository https://gitbox.apache.org/repos/asf/beam.git
commit 220bbf85034583124d24d8e1f822442f2d5e2e42 Author: Danny McCormick <[email protected]> AuthorDate: Mon Feb 24 14:20:26 2025 -0500 Update JdbcIO.java --- .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index a31745754d0..906c07d501d 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -2663,6 +2663,7 @@ public class JdbcIO { Metrics.distribution(WriteFn.class, "milliseconds_per_batch"); private final WriteFnSpec<T, V> spec; + private Lock connectionLock = new ReentrantLock(); private @Nullable DataSource dataSource; private @Nullable Connection connection; private @Nullable PreparedStatement preparedStatement; @@ -2698,13 +2699,20 @@ public class JdbcIO { private Connection getConnection() throws SQLException { Connection connection = this.connection; + connectionLock.lock(); + try { + if (this.connection == null) { + DataSource validSource = checkStateNotNull(dataSource); + this.connection = validSource.getConnection(); + this.connection.setAutoCommit(false); + preparedStatement = + this.connection.prepareStatement(checkStateNotNull(spec.getStatement()).get()); + } + } finally { + connectionLock.unlock(); + } if (connection == null) { - DataSource validSource = checkStateNotNull(dataSource); - connection = validSource.getConnection(); - connection.setAutoCommit(false); - preparedStatement = - connection.prepareStatement(checkStateNotNull(spec.getStatement()).get()); - this.connection = connection; + connection = this.connection; KV<@Nullable String, String> tableWithSchema; if (Strings.isNullOrEmpty(spec.getTable()) && spec.getStatement() != null) { @@ -2769,8 +2777,10 @@ public class JdbcIO { BackOff backoff = checkStateNotNull(retryBackOff).backoff(); RetryStrategy retryStrategy = checkStateNotNull(spec.getRetryStrategy()); while (true) { + LOG.info("Getting prepared statement"); try (PreparedStatement preparedStatement = getConnection().prepareStatement(checkStateNotNull(spec.getStatement()).get())) { + LOG.info("Got prepared statement"); try { // add each record in the statement batch int recordsInBatch = 0;
