This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 97e5ccf0c6f Upgrade ClickHouseIO to use ClickHouse Java Client V2 
(#37611)
97e5ccf0c6f is described below

commit 97e5ccf0c6f000eecef8edafb2e17babc417b767
Author: Bentsi Leviav <[email protected]>
AuthorDate: Tue Feb 17 22:01:22 2026 +0200

    Upgrade ClickHouseIO to use ClickHouse Java Client V2 (#37611)
    
    * update links
    
    * add jdbc string parser to support backward compatibility
    
    * add tests to jdbc parser
    
    * add tests to check backward compatibility with the new jdbc parser
    
    * bumping up the java client version
    
    * set hardcoded api version due to 
https://github.com/testcontainers/testcontainers-java/issues/11212
    
    * update the base clickhouse test to use the recent java client
    
    * refactor ClickHouseIO to use the new java client while maintaining 
backward compatibility with previous API
    
    * Adjust tests to use the new java client
    
    * move numbers to CONSTs
    
    * make consts static
    
    * add github issue for better tracking
    
    * add explanation of SharedMergeTree
    
    * another const
    
    * Update CHANGES.md
    
    * correct clickhouse website
    
    * add issue link instead of pr
    
    * add documentation links
    
    * Fix old yandex documentation links
    
    * formating
    
    * fail on JDBC options and properties mismatch
    
    * add link to CH deduplication strategies
---
 CHANGES.md                                         |   3 +-
 sdks/java/io/clickhouse/build.gradle               |   7 +-
 .../beam/sdk/io/clickhouse/ClickHouseIO.java       | 393 ++++++++++++++++-----
 .../sdk/io/clickhouse/ClickHouseJdbcUrlParser.java | 261 ++++++++++++++
 .../apache/beam/sdk/io/clickhouse/TableSchema.java |   8 +-
 .../beam/sdk/io/clickhouse/AtomicInsertTest.java   |  28 +-
 .../beam/sdk/io/clickhouse/BaseClickHouseTest.java | 140 ++++++--
 .../ClickHouseIOJdbcBackwardCompatibilityTest.java |  98 +++++
 .../ClickHouseIOPropertyMergingTest.java           | 211 +++++++++++
 .../beam/sdk/io/clickhouse/ClickHouseIOTest.java   | 142 ++++----
 .../io/clickhouse/ClickHouseJdbcUrlParserTest.java | 341 ++++++++++++++++++
 11 files changed, 1442 insertions(+), 190 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index bd24be1989d..1b72f92d278 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
 
 * Add Datadog IO support (Java) 
([#37318](https://github.com/apache/beam/issues/37318)).
 * Remove Pubsublite IO support, since service will be deprecated in March 
2026. ([#37375](https://github.com/apache/beam/issues/37375)).
+* (Java) ClickHouse - migrating from the legacy JDBC driver (v0.6.3) to 
ClickHouse Java Client v2 (v0.9.6). See the [class 
documentation](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.html)
 for migration guide   ([#37610](https://github.com/apache/beam/issues/37610)).
 
 ## New Features / Improvements
 
@@ -2357,4 +2358,4 @@ Schema Options, it will be removed in version `2.23.0`. 
([BEAM-9704](https://iss
 
 ## Highlights
 
-- For versions 2.19.0 and older release notes are available on [Apache Beam 
Blog](https://beam.apache.org/blog/).
\ No newline at end of file
+- For versions 2.19.0 and older release notes are available on [Apache Beam 
Blog](https://beam.apache.org/blog/).
diff --git a/sdks/java/io/clickhouse/build.gradle 
b/sdks/java/io/clickhouse/build.gradle
index 70f3c1a6387..4923bf32a43 100644
--- a/sdks/java/io/clickhouse/build.gradle
+++ b/sdks/java/io/clickhouse/build.gradle
@@ -31,7 +31,7 @@ applyJavaNature(
 )
 
 description = "Apache Beam :: SDKs :: Java :: IO :: ClickHouse"
-ext.summary = "IO to write to ClickHouse (https://clickhouse.yandex)."
+ext.summary = "IO to write to ClickHouse (https://clickhouse.com)."
 
 // Match the output directory for generated code with the package, to be more 
tool-friendly
 def generatedJavaccSourceDir = "${project.buildDir}/generated/javacc"
@@ -50,7 +50,7 @@ idea {
   }
 }
 
-def clickhouse_jdbc_version = "0.6.4"
+def clickhouse_java_client_version = "0.9.6"
 
 dependencies {
   javacc "net.java.dev.javacc:javacc:7.0.9"
@@ -59,11 +59,12 @@ dependencies {
   implementation library.java.joda_time
   implementation library.java.slf4j_api
   implementation library.java.vendored_guava_32_1_2_jre
-  implementation "com.clickhouse:clickhouse-jdbc:$clickhouse_jdbc_version:all"
+  implementation "com.clickhouse:client-v2:$clickhouse_java_client_version:all"
   testImplementation library.java.slf4j_api
   testImplementation library.java.junit
   testImplementation library.java.hamcrest
   testImplementation library.java.testcontainers_clickhouse
+  testImplementation 
"com.clickhouse:clickhouse-jdbc:$clickhouse_java_client_version:all"
   testRuntimeOnly library.java.slf4j_jdk14
   testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
 }
diff --git 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
index 52dca7cfa64..fc00a1e420e 100644
--- 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
+++ 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
@@ -17,18 +17,17 @@
  */
 package org.apache.beam.sdk.io.clickhouse;
 
-import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.client.api.Client;
+import com.clickhouse.client.api.insert.InsertResponse;
+import com.clickhouse.client.api.query.GenericRecord;
+import com.clickhouse.client.api.query.Records;
 import com.clickhouse.data.ClickHouseFormat;
-import com.clickhouse.jdbc.ClickHouseConnection;
-import com.clickhouse.jdbc.ClickHouseDataSource;
-import com.clickhouse.jdbc.ClickHouseStatement;
 import com.google.auto.value.AutoValue;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
@@ -63,15 +62,27 @@ import org.slf4j.LoggerFactory;
  *
  * <h3>Writing to ClickHouse</h3>
  *
- * <p>To write to ClickHouse, use {@link ClickHouseIO#write(String, String)}, 
which writes elements
- * from input {@link PCollection}. It's required that your ClickHouse cluster 
already has table you
- * are going to insert into.
+ * <p>To write to ClickHouse, use {@link ClickHouseIO#write(String, String, 
String)}, which writes
+ * elements from input {@link PCollection}. It's required that your ClickHouse 
cluster already has
+ * table you are going to insert into.
  *
  * <pre>{@code
+ * // New way (recommended):
+ * Properties props = new Properties();
+ * props.setProperty("user", "admin");
+ * props.setProperty("password", "secret");
+ *
+ * pipeline
+ *   .apply(...)
+ *   .apply(
+ *     ClickHouseIO.<POJO>write("http://localhost:8123";, "default", "my_table")
+ *       .withProperties(props));
+ *
+ * // Old way (deprecated):
  * pipeline
  *   .apply(...)
  *   .apply(
- *     ClickHouseIO.<POJO>write("jdbc:clickhouse:localhost:8123/default", 
"my_table"));
+ *     ClickHouseIO.<POJO>write("jdbc:clickhouse://localhost:8123/default", 
"my_table"));
  * }</pre>
  *
  * <p>Optionally, you can provide connection settings, for instance, specify 
insert block size with
@@ -80,14 +91,21 @@ import org.slf4j.LoggerFactory;
  *
  * <h4>Deduplication</h4>
  *
- * Deduplication is performed by ClickHouse if inserting to <a
- * 
href="https://clickhouse.yandex/docs/en/single/#data-replication";>ReplicatedMergeTree</a>
 or <a
- * 
href="https://clickhouse.yandex/docs/en/single/#distributed";>Distributed</a> 
table on top of
- * ReplicatedMergeTree. Without replication, inserting into regular MergeTree 
can produce
- * duplicates, if insert fails, and then successfully retries. However, each 
block is inserted
- * atomically, and you can configure block size with {@link 
Write#withMaxInsertBlockSize(long)}.
+ * <p>Deduplication is performed by ClickHouse if inserting to <a
+ * 
href="https://clickhouse.com/docs/engines/table-engines/mergetree-family/replication";>ReplicatedMergeTree</a>
+ * or <a
+ * 
href="https://clickhouse.com/docs/engines/table-engines/special/distributed";>Distributed</a>
+ * table on top of ReplicatedMergeTree. Without replication, inserting into 
regular MergeTree can
+ * produce duplicates, if insert fails, and then successfully retries. 
However, each block is
+ * inserted atomically, and you can configure block size with {@link
+ * Write#withMaxInsertBlockSize(long)}.
  *
- * <p>Deduplication is performed using checksums of inserted blocks.
+ * <p>Deduplication is performed using checksums of inserted blocks. For <a
+ * 
href="https://clickhouse.com/docs/engines/table-engines/mergetree-family/shared-merge-tree";>SharedMergeTree</a>
+ * tables in ClickHouse Cloud, deduplication behavior is similar to 
ReplicatedMergeTree. For more
+ * information about deduplication, please visit the <a
+ * 
href="https://clickhouse.com/docs/guides/developer/deduplication";>Deduplication 
strategies
+ * documentation</a>
  *
  * <h4>Mapping between Beam and ClickHouse types</h4>
  *
@@ -114,8 +132,10 @@ import org.slf4j.LoggerFactory;
  * <tr><td>{@link TableSchema.TypeName#TUPLE}</td> <td>{@link 
Schema.TypeName#ROW}</td></tr>
  * </table>
  *
- * Nullable row columns are supported through Nullable type in ClickHouse. Low 
cardinality hint is
- * supported through LowCardinality DataType in ClickHouse.
+ * <p>Nullable row columns are supported through <a
+ * 
href="https://clickhouse.com/docs/sql-reference/data-types/nullable";>Nullable 
type</a> in
+ * ClickHouse. <a 
href="https://clickhouse.com/docs/sql-reference/data-types/LowCardinality";>Low
+ * cardinality hint </a> is supported through LowCardinality DataType in 
ClickHouse.
  *
  * <p>Nested rows should be unnested using {@link Select#flattenedSchema()}. 
Type casting should be
  * done using {@link org.apache.beam.sdk.schemas.transforms.Cast} before 
{@link ClickHouseIO}.
@@ -130,9 +150,57 @@ public class ClickHouseIO {
   public static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = 
Duration.standardDays(1000);
   public static final Duration DEFAULT_INITIAL_BACKOFF = 
Duration.standardSeconds(5);
 
+  /**
+   * Creates a write transform using a JDBC URL format.
+   *
+   * <p><b>Deprecated:</b> Use {@link #write(String, String, String)} instead 
with separate URL,
+   * database, and table parameters.
+   *
+   * <p>This method is provided for backward compatibility. It parses the JDBC 
URL to extract the
+   * connection URL, database name, and any connection properties specified in 
the query string.
+   * Properties can be overridden later using {@link 
Write#withProperties(Properties)}.
+   *
+   * <p>Example:
+   *
+   * <pre>{@code
+   * // Old way (deprecated):
+   * 
ClickHouseIO.write("jdbc:clickhouse://localhost:8123/mydb?user=admin&password=secret",
 "table")
+   *
+   * // New way:
+   * ClickHouseIO.write("http://localhost:8123";, "mydb", "table")
+   *   .withProperties(props)
+   * }</pre>
+   *
+   * <p><b>Property Precedence:</b> Properties from the JDBC URL can be 
overridden by calling {@link
+   * Write#withProperties(Properties)}. Later calls to withProperties() 
override earlier settings.
+   *
+   * @param jdbcUrl JDBC connection URL (e.g., 
jdbc:clickhouse://host:port/database?param=value)
+   * @param table table name
+   * @return a {@link PTransform} writing data to ClickHouse
+   * @deprecated Use {@link #write(String, String, String)} with explicit URL, 
database, and table
+   */
+  @Deprecated
   public static <T> Write<T> write(String jdbcUrl, String table) {
+    ClickHouseJdbcUrlParser.ParsedJdbcUrl parsed = 
ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
     return new AutoValue_ClickHouseIO_Write.Builder<T>()
-        .jdbcUrl(jdbcUrl)
+        .clickHouseUrl(parsed.getClickHouseUrl())
+        .database(parsed.getDatabase())
+        .table(table)
+        .properties(parsed.getProperties()) // Start with JDBC URL properties
+        .maxInsertBlockSize(DEFAULT_MAX_INSERT_BLOCK_SIZE)
+        .initialBackoff(DEFAULT_INITIAL_BACKOFF)
+        .maxRetries(DEFAULT_MAX_RETRIES)
+        .maxCumulativeBackoff(DEFAULT_MAX_CUMULATIVE_BACKOFF)
+        .build()
+        .withInsertDeduplicate(true)
+        .withInsertDistributedSync(true);
+  }
+
+  public static <T> Write<T> write(String clickHouseUrl, String database, 
String table) {
+    return new AutoValue_ClickHouseIO_Write.Builder<T>()
+        .clickHouseUrl(clickHouseUrl)
+        .database(database)
         .table(table)
         .properties(new Properties())
         .maxInsertBlockSize(DEFAULT_MAX_INSERT_BLOCK_SIZE)
@@ -148,7 +216,9 @@ public class ClickHouseIO {
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
 
-    public abstract String jdbcUrl();
+    public abstract String clickHouseUrl();
+
+    public abstract String database();
 
     public abstract String table();
 
@@ -176,7 +246,7 @@ public class ClickHouseIO {
     public PDone expand(PCollection<T> input) {
       TableSchema tableSchema = tableSchema();
       if (tableSchema == null) {
-        tableSchema = getTableSchema(jdbcUrl(), table());
+        tableSchema = getTableSchema(clickHouseUrl(), database(), table(), 
properties());
       }
 
       String sdkVersion = ReleaseInfo.getReleaseInfo().getSdkVersion();
@@ -192,7 +262,8 @@ public class ClickHouseIO {
 
       WriteFn<T> fn =
           new AutoValue_ClickHouseIO_WriteFn.Builder<T>()
-              .jdbcUrl(jdbcUrl())
+              .clickHouseUrl(clickHouseUrl())
+              .database(database())
               .table(table())
               .maxInsertBlockSize(maxInsertBlockSize())
               .schema(tableSchema)
@@ -212,7 +283,8 @@ public class ClickHouseIO {
      *
      * @param value number of rows
      * @return a {@link PTransform} writing data to ClickHouse
-     * @see <a 
href="https://clickhouse.yandex/docs/en/single/#max_insert_block_size";>ClickHouse
+     * @see <a
+     *     
href="https://clickhouse.com/docs/operations/settings/settings#max_insert_block_size";>ClickHouse
      *     documentation</a>
      */
     public Write<T> withMaxInsertBlockSize(long value) {
@@ -238,7 +310,8 @@ public class ClickHouseIO {
      *
      * @param value number of replicas, 0 for disabling, null for server 
default
      * @return a {@link PTransform} writing data to ClickHouse
-     * @see <a 
href="https://clickhouse.yandex/docs/en/single/#insert_quorum";>ClickHouse
+     * @see <a
+     *     
href="https://clickhouse.com/docs/operations/settings/settings#insert_quorum";>ClickHouse
      *     documentation</a>
      */
     public Write<T> withInsertQuorum(@Nullable Long value) {
@@ -305,11 +378,56 @@ public class ClickHouseIO {
       return toBuilder().tableSchema(tableSchema).build();
     }
 
+    /**
+     * Set connection properties (user, password, etc.).
+     *
+     * <p><b>Important:</b> If using the deprecated JDBC URL-based {@link 
#write(String, String)}
+     * method, this will fail if any properties specified here conflict with 
properties already
+     * extracted from the JDBC URL. This prevents accidental property 
conflicts.
+     *
+     * <p>For the new API {@link #write(String, String, String)}, properties 
can be set freely since
+     * there are no URL-embedded properties to conflict with.
+     *
+     * @param properties connection properties
+     * @return a {@link PTransform} writing data to ClickHouse
+     * @throws IllegalArgumentException if properties is null or if any 
property conflicts with
+     *     existing properties (e.g., from JDBC URL)
+     */
+    public Write<T> withProperties(Properties properties) {
+      if (properties == null) {
+        throw new IllegalArgumentException("Properties cannot be null");
+      }
+
+      // Check for conflicts with existing properties
+      Properties existing = properties();
+      for (String key : properties.stringPropertyNames()) {
+        if (existing.containsKey(key)) {
+          String existingValue = existing.getProperty(key);
+          String newValue = properties.getProperty(key);
+          if (!existingValue.equals(newValue)) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Property conflict: '%s' is already set to '%s' (likely 
from JDBC URL), "
+                        + "but attempting to set it to '%s'. "
+                        + "Please use either JDBC URL properties OR 
withProperties(), not both for the same keys.",
+                    key, existingValue, newValue));
+          }
+        }
+      }
+
+      // Merge properties: new properties are added to existing ones
+      Properties merged = new Properties();
+      merged.putAll(existing);
+      merged.putAll(properties);
+      return toBuilder().properties(merged).build();
+    }
     /** Builder for {@link Write}. */
     @AutoValue.Builder
     abstract static class Builder<T> {
 
-      public abstract Builder<T> jdbcUrl(String jdbcUrl);
+      public abstract Builder<T> clickHouseUrl(String clickHouseUrl);
+
+      public abstract Builder<T> database(String database);
 
       public abstract Builder<T> table(String table);
 
@@ -348,7 +466,7 @@ public class ClickHouseIO {
     private static final String RETRY_ATTEMPT_LOG =
         "Error writing to ClickHouse. Retry attempt[{}]";
 
-    private ClickHouseConnection connection;
+    private Client client;
     private FluentBackoff retryBackoff;
     private final List<Row> buffer = new ArrayList<>();
     private final Distribution batchSize = Metrics.distribution(Write.class, 
"batch_size");
@@ -360,7 +478,9 @@ public class ClickHouseIO {
     @FieldAccess("filterFields")
     final FieldAccessDescriptor fieldAccessDescriptor = 
FieldAccessDescriptor.withAllFields();
 
-    public abstract String jdbcUrl();
+    public abstract String clickHouseUrl();
+
+    public abstract String database();
 
     public abstract String table();
 
@@ -387,9 +507,36 @@ public class ClickHouseIO {
     }
 
     @Setup
-    public void setup() throws SQLException {
+    public void setup() throws Exception {
+
+      String user = properties().getProperty("user", "default");
+      String password = properties().getProperty("password", "");
+
+      // add the options to the client builder
+      Map<String, String> options =
+          properties().stringPropertyNames().stream()
+              .filter(key -> !key.equals("user") && !key.equals("password"))
+              .collect(Collectors.toMap(key -> key, 
properties()::getProperty));
+
+      // Create ClickHouse Java Client
+      Client.Builder clientBuilder =
+          new Client.Builder()
+              .addEndpoint(clickHouseUrl())
+              .setUsername(user)
+              .setPassword(password)
+              .setDefaultDatabase(database())
+              .setOptions(options)
+              .setClientName(
+                  String.format("Apache Beam/%s", 
ReleaseInfo.getReleaseInfo().getSdkVersion()));
+
+      // Add optional compression if specified in properties
+      String compress = properties().getProperty("compress", "false");
+      if (Boolean.parseBoolean(compress)) {
+        clientBuilder.compressServerResponse(true);
+        clientBuilder.compressClientRequest(true);
+      }
 
-      connection = new ClickHouseDataSource(jdbcUrl(), 
properties()).getConnection();
+      client = clientBuilder.build();
 
       retryBackoff =
           FluentBackoff.DEFAULT
@@ -400,7 +547,9 @@ public class ClickHouseIO {
 
     @Teardown
     public void tearDown() throws Exception {
-      connection.close();
+      if (client != null) {
+        client.close();
+      }
     }
 
     @StartBundle
@@ -431,25 +580,46 @@ public class ClickHouseIO {
       }
 
       batchSize.update(buffer.size());
+
+      // Serialize rows to RowBinary format
+      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+
+      // Wrap ByteArrayOutputStream with ClickHouseOutputStream
+      try (com.clickhouse.data.ClickHouseOutputStream outputStream =
+          com.clickhouse.data.ClickHouseOutputStream.of(byteStream)) {
+        for (Row row : buffer) {
+          ClickHouseWriter.writeRow(outputStream, schema(), row);
+        }
+        outputStream.flush();
+      }
+      byte[] data = byteStream.toByteArray();
+
       while (true) {
-        try (ClickHouseStatement statement = connection.createStatement()) {
-          statement
-              .unwrap(ClickHouseRequest.class)
-              .write()
-              .table(table())
-              .format(ClickHouseFormat.RowBinary)
-              .data(
-                  out -> {
-                    for (Row row : buffer) {
-                      ClickHouseWriter.writeRow(out, schema(), row);
-                    }
-                  })
-              .executeAndWait(); // query happens in a separate thread
+        try {
+
+          // Perform the insert using ClickHouse Java Client
+          InsertResponse response =
+              client
+                  .insert(
+                      table(), new java.io.ByteArrayInputStream(data), 
ClickHouseFormat.RowBinary)
+                  .get();
+
+          if (response != null) {
+            LOG.debug(
+                "Successfully inserted {} rows out of {} into table {}. total 
size written {} bytes",
+                response.getWrittenRows(),
+                buffer.size(),
+                table(),
+                response.getWrittenBytes());
+          } else {
+            LOG.debug("Successfully inserted {} rows into table {}", 
buffer.size(), table());
+          }
+
           buffer.clear();
           break;
-        } catch (SQLException e) {
+        } catch (Exception e) {
           if (!BackOffUtils.next(Sleeper.DEFAULT, backOff)) {
-            throw e;
+            throw new RuntimeException("Failed to write to ClickHouse after 
retries", e);
           } else {
             retries.inc();
             LOG.warn(RETRY_ATTEMPT_LOG, attempt, e);
@@ -462,7 +632,9 @@ public class ClickHouseIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
 
-      public abstract Builder<T> jdbcUrl(String jdbcUrl);
+      public abstract Builder<T> clickHouseUrl(String clickHouseUrl);
+
+      public abstract Builder<T> database(String database);
 
       public abstract Builder<T> table(String table);
 
@@ -491,57 +663,106 @@ public class ClickHouseIO {
         String.join(",", l).trim().replaceAll("Tuple\\(", 
"Tuple('").replaceAll(",", ",'");
     return content;
   }
+
   /**
-   * Returns {@link TableSchema} for a given table.
+   * Returns {@link TableSchema} for a given table using JDBC URL format.
+   *
+   * <p><b>Deprecated:</b> Use {@link #getTableSchema(String, String, String, 
Properties)} instead
+   * with separate URL, database, table, and properties parameters.
+   *
+   * <p>This method parses the JDBC URL to extract connection details and 
properties. For new code,
+   * use the explicit parameter version for better clarity and control.
    *
-   * @param jdbcUrl jdbc connection url
+   * <p>Example migration:
+   *
+   * <pre>{@code
+   * // Old way (deprecated):
+   * TableSchema schema = ClickHouseIO.getTableSchema(
+   *     "jdbc:clickhouse://localhost:8123/mydb?user=admin", "my_table");
+   *
+   * // New way:
+   * Properties props = new Properties();
+   * props.setProperty("user", "admin");
+   * TableSchema schema = ClickHouseIO.getTableSchema(
+   *     "http://localhost:8123";, "mydb", "my_table", props);
+   * }</pre>
+   *
+   * @param jdbcUrl JDBC connection URL (e.g., 
jdbc:clickhouse://host:port/database?param=value)
    * @param table table name
    * @return table schema
+   * @deprecated Use {@link #getTableSchema(String, String, String, 
Properties)} with explicit
+   *     parameters
    */
+  @Deprecated
   public static TableSchema getTableSchema(String jdbcUrl, String table) {
-    List<TableSchema.Column> columns = new ArrayList<>();
-
-    try (ClickHouseConnection connection = new 
ClickHouseDataSource(jdbcUrl).getConnection();
-        Statement statement = connection.createStatement()) {
-
-      ResultSet rs = null; // try-finally is used because findbugs doesn't 
like try-with-resource
-      try {
-        rs = statement.executeQuery("DESCRIBE TABLE " + 
quoteIdentifier(table));
-
-        while (rs.next()) {
-          String name = rs.getString("name");
-          String type = rs.getString("type");
-          String defaultTypeStr = rs.getString("default_type");
-          String defaultExpression = rs.getString("default_expression");
+    ClickHouseJdbcUrlParser.ParsedJdbcUrl parsed = 
ClickHouseJdbcUrlParser.parse(jdbcUrl);
+    return getTableSchema(
+        parsed.getClickHouseUrl(), parsed.getDatabase(), table, 
parsed.getProperties());
+  }
 
-          ColumnType columnType = null;
-          if (type.toLowerCase().trim().startsWith("tuple(")) {
-            String content = tuplePreprocessing(type);
-            columnType = ColumnType.parse(content);
-          } else {
-            columnType = ColumnType.parse(type);
-          }
-          DefaultType defaultType = 
DefaultType.parse(defaultTypeStr).orElse(null);
+  /**
+   * Returns {@link TableSchema} for a given table using ClickHouse Java 
Client.
+   *
+   * @param clickHouseUrl ClickHouse connection url
+   * @param database ClickHouse database
+   * @param table table name
+   * @param properties connection properties
+   * @return table schema
+   * @since 2.72.0
+   */
+  public static TableSchema getTableSchema(
+      String clickHouseUrl, String database, String table, Properties 
properties) {
+    List<TableSchema.Column> columns = new ArrayList<>();
 
-          Object defaultValue;
-          if (DefaultType.DEFAULT.equals(defaultType)
-              && !Strings.isNullOrEmpty(defaultExpression)) {
-            defaultValue = ColumnType.parseDefaultExpression(columnType, 
defaultExpression);
-          } else {
-            defaultValue = null;
+    try {
+      String user = properties.getProperty("user", "default");
+      String password = properties.getProperty("password", "");
+
+      // Create ClickHouse Java Client
+      Client.Builder clientBuilder =
+          new Client.Builder()
+              .addEndpoint(clickHouseUrl)
+              .setUsername(user)
+              .setPassword(password)
+              .setDefaultDatabase(database)
+              .setClientName(
+                  String.format("Apache Beam/%s", 
ReleaseInfo.getReleaseInfo().getSdkVersion()));
+
+      try (Client client = clientBuilder.build()) {
+        String query = "DESCRIBE TABLE " + quoteIdentifier(table);
+
+        try (Records records = client.queryRecords(query).get()) {
+          for (GenericRecord record : records) {
+            String name = record.getString("name");
+            String type = record.getString("type");
+            String defaultTypeStr = record.getString("default_type");
+            String defaultExpression = record.getString("default_expression");
+
+            ColumnType columnType;
+            if (type.toLowerCase().trim().startsWith("tuple(")) {
+              String content = tuplePreprocessing(type);
+              columnType = ColumnType.parse(content);
+            } else {
+              columnType = ColumnType.parse(type);
+            }
+            DefaultType defaultType = 
DefaultType.parse(defaultTypeStr).orElse(null);
+
+            Object defaultValue;
+            if (DefaultType.DEFAULT.equals(defaultType)
+                && !Strings.isNullOrEmpty(defaultExpression)) {
+              defaultValue = ColumnType.parseDefaultExpression(columnType, 
defaultExpression);
+            } else {
+              defaultValue = null;
+            }
+
+            columns.add(TableSchema.Column.of(name, columnType, defaultType, 
defaultValue));
           }
-
-          columns.add(TableSchema.Column.of(name, columnType, defaultType, 
defaultValue));
-        }
-      } finally {
-        if (rs != null) {
-          rs.close();
         }
       }
 
       return TableSchema.of(columns.toArray(new TableSchema.Column[0]));
-    } catch (SQLException e) {
-      throw new RuntimeException(e);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to get table schema for table: " + 
table, e);
     }
   }
 
diff --git 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParser.java
 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParser.java
new file mode 100644
index 00000000000..92cd1eaeacd
--- /dev/null
+++ 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParser.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+/**
+ * Utility class for parsing ClickHouse JDBC URLs and extracting connection 
parameters.
+ *
+ * <p>Used for supporting backward compatibility with the deprecated {@link
+ * ClickHouseIO#write(String, String)} method that accepts JDBC URLs. New code 
should use {@link
+ * ClickHouseIO#write(String, String, String)} with explicit parameters 
instead.
+ *
+ * @deprecated Use {@link ClickHouseIO#write(String, String, String)} with 
separate clickHouseUrl,
+ *     database, and table parameters instead of JDBC URL format.
+ */
+@Deprecated
+class ClickHouseJdbcUrlParser {
+
+  /**
+   * Represents parsed components of a ClickHouse JDBC URL.
+   *
+   * <p>Contains the extracted HTTP/HTTPS URL, database name, and connection 
properties from a JDBC
+   * URL string.
+   *
+   * @deprecated This class supports the deprecated JDBC URL-based API. Use 
separate parameters for
+   *     clickHouseUrl, database, and properties instead.
+   */
+  @Deprecated
+  static class ParsedJdbcUrl {
+    private final String clickHouseUrl;
+    private final String database;
+    private final Properties properties;
+
+    ParsedJdbcUrl(String clickHouseUrl, String database, Properties 
properties) {
+      this.clickHouseUrl = clickHouseUrl;
+      this.database = database;
+      this.properties = properties;
+    }
+
+    public String getClickHouseUrl() {
+      return clickHouseUrl;
+    }
+
+    public String getDatabase() {
+      return database;
+    }
+
+    public Properties getProperties() {
+      return properties;
+    }
+  }
+
+  /**
+   * Parses a ClickHouse JDBC URL into its components.
+   *
+   * <p>Supported formats:
+   *
+   * <ul>
+   *   <li>jdbc:clickhouse://host:port/database?param=value
+   *   <li>jdbc:clickhouse:http://host:port/database?param=value
+   *   <li>jdbc:clickhouse:https://host:port/database?param=value
+   *   <li>jdbc:ch://host:port/database?param=value (ClickHouse JDBC driver 
shorthand)
+   * </ul>
+   *
+   * @param jdbcUrl the JDBC URL to parse
+   * @return ParsedJdbcUrl containing the HTTP/HTTPS URL, database, and 
properties
+   * @throws IllegalArgumentException if the URL format is invalid
+   */
+  static ParsedJdbcUrl parse(String jdbcUrl) {
+    if (Strings.isNullOrEmpty(jdbcUrl)) {
+      throw new IllegalArgumentException("JDBC URL cannot be null or empty");
+    }
+
+    String actualUrl = extractHttpUrl(jdbcUrl);
+
+    try {
+      URI uri = new URI(actualUrl);
+
+      validateScheme(uri.getScheme());
+      String host = validateAndGetHost(uri.getHost(), jdbcUrl);
+      int port = getPortOrDefault(uri.getPort(), uri.getScheme());
+
+      String clickHouseUrl = String.format("%s://%s:%d", uri.getScheme(), 
host, port);
+      String database = extractDatabase(uri.getPath());
+      Properties properties = extractProperties(uri.getQuery());
+
+      return new ParsedJdbcUrl(clickHouseUrl, database, properties);
+
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException("Invalid JDBC URL format: " + 
jdbcUrl, e);
+    } catch (java.io.UnsupportedEncodingException e) {
+      throw new IllegalArgumentException("Failed to decode URL parameters: " + 
jdbcUrl, e);
+    }
+  }
+
+  /**
+   * Extracts and normalizes the HTTP/HTTPS URL from a JDBC URL.
+   *
+   * @param jdbcUrl the JDBC URL to process
+   * @return normalized HTTP/HTTPS URL
+   * @throws IllegalArgumentException if the URL format is invalid
+   */
+  private static String extractHttpUrl(String jdbcUrl) {
+    // Remove jdbc: prefix
+    String urlWithoutJdbc = jdbcUrl;
+    if (jdbcUrl.toLowerCase().startsWith("jdbc:")) {
+      urlWithoutJdbc = jdbcUrl.substring(5);
+    }
+
+    // Handle jdbc:clickhouse: or jdbc:ch: prefix
+    String actualUrl;
+    if (urlWithoutJdbc.toLowerCase().startsWith("clickhouse:")) {
+      actualUrl = urlWithoutJdbc.substring(11);
+    } else if (urlWithoutJdbc.toLowerCase().startsWith("ch:")) {
+      actualUrl = urlWithoutJdbc.substring(3);
+    } else {
+      throw new IllegalArgumentException(
+          "Invalid JDBC URL format. Expected 'jdbc:clickhouse:' or 'jdbc:ch:' 
prefix. Got: "
+              + jdbcUrl);
+    }
+
+    // Check if URL already has a scheme and validate it
+    if (actualUrl.toLowerCase().startsWith("http://";)
+        || actualUrl.toLowerCase().startsWith("https://";)) {
+      return actualUrl;
+    }
+
+    // Check for invalid schemes before prepending http://
+    if (actualUrl.contains("://")) {
+      // Extract the scheme part
+      int schemeEnd = actualUrl.indexOf("://");
+      String scheme = actualUrl.substring(0, schemeEnd).toLowerCase();
+      if (!scheme.equals("http") && !scheme.equals("https")) {
+        throw new IllegalArgumentException(
+            "Invalid scheme in JDBC URL. Expected 'http' or 'https'. Got: " + 
scheme);
+      }
+    }
+
+    // If URL doesn't start with http:// or https://, assume http://
+    if (actualUrl.startsWith("//")) {
+      actualUrl = "http:" + actualUrl;
+    } else {
+      actualUrl = "http://"; + actualUrl;
+    }
+
+    return actualUrl;
+  }
+
+  /**
+   * Validates the URI scheme.
+   *
+   * @param scheme the scheme to validate
+   * @throws IllegalArgumentException if scheme is invalid
+   */
+  private static void validateScheme(String scheme) {
+    if (scheme == null || (!scheme.equals("http") && !scheme.equals("https"))) 
{
+      throw new IllegalArgumentException(
+          "Invalid scheme. Expected 'http' or 'https'. Got: " + scheme);
+    }
+  }
+
+  /**
+   * Validates and returns the host from the URI.
+   *
+   * @param host the host to validate
+   * @param jdbcUrl the original JDBC URL (for error reporting)
+   * @return the validated host
+   * @throws IllegalArgumentException if host is invalid
+   */
+  private static String validateAndGetHost(String host, String jdbcUrl) {
+    if (Strings.isNullOrEmpty(host)) {
+      throw new IllegalArgumentException("Host cannot be empty in JDBC URL: " 
+ jdbcUrl);
+    }
+    return host;
+  }
+
+  /**
+   * Returns the port or default port based on scheme.
+   *
+   * @param port the port from URI (-1 if not specified)
+   * @param scheme the URI scheme (http or https)
+   * @return the port number
+   */
+  private static int getPortOrDefault(int port, String scheme) {
+    if (port == -1) {
+      return scheme.equals("https") ? 8443 : 8123; // Default ClickHouse ports
+    }
+    return port;
+  }
+
+  /**
+   * Extracts database name from URI path.
+   *
+   * @param path the URI path
+   * @return the database name, or "default" if not specified
+   */
+  private static String extractDatabase(String path) {
+    if (Strings.isNullOrEmpty(path)) {
+      return "default";
+    }
+
+    // Remove leading slash
+    String pathWithoutSlash = path.startsWith("/") ? path.substring(1) : path;
+    return pathWithoutSlash.isEmpty() ? "default" : pathWithoutSlash;
+  }
+
+  /**
+   * Extracts connection properties from URI query string.
+   *
+   * @param query the URI query string
+   * @return Properties object containing the parsed parameters
+   * @throws java.io.UnsupportedEncodingException if URL decoding fails
+   */
+  private static Properties extractProperties(String query)
+      throws java.io.UnsupportedEncodingException {
+    Properties properties = new Properties();
+
+    if (Strings.isNullOrEmpty(query)) {
+      return properties;
+    }
+
+    // Use Guava Splitter instead of String.split()
+    for (String param : Splitter.on('&').split(query)) {
+      // Split key-value pairs, handling parameters without values
+      List<String> parts = Splitter.on('=').limit(2).splitToList(param);
+
+      if (parts.size() == 2) {
+        String key = java.net.URLDecoder.decode(parts.get(0), "UTF-8");
+        String value = java.net.URLDecoder.decode(parts.get(1), "UTF-8");
+        properties.setProperty(key, value);
+      } else if (parts.size() == 1) {
+        // Parameter without value (e.g., ?compress)
+        String key = java.net.URLDecoder.decode(parts.get(0), "UTF-8");
+        properties.setProperty(key, "true");
+      }
+    }
+
+    return properties;
+  }
+}
diff --git 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
index b89a88b3fae..baee77c5f9a 100644
--- 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
+++ 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
@@ -29,7 +29,10 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
-/** A descriptor for ClickHouse table schema. */
+/**
+ * A descriptor for ClickHouse table schema. To be updated with ClickHouse 
table schema API -
+ * https://github.com/apache/beam/issues/37613
+ */
 @AutoValue
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
@@ -185,7 +188,8 @@ public abstract class TableSchema implements Serializable {
   /**
    * An enumeration of possible kinds of default values in ClickHouse.
    *
-   * @see <a 
href="https://clickhouse.yandex/docs/en/single/#default-values";>ClickHouse
+   * @see <a
+   *     
href="https://clickhouse.com/docs/sql-reference/statements/create/table#default_values";>ClickHouse
    *     documentation</a>
    */
   public enum DefaultType {
diff --git 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java
 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java
index 3a881ff0459..73a822b8ec0 100644
--- 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java
+++ 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.clickhouse;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.sql.SQLException;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.beam.sdk.Pipeline;
@@ -48,6 +47,7 @@ public class AtomicInsertTest extends BaseClickHouseTest {
 
   private static final int MIN_ATTEMPTS = 2;
   private static final int MAX_ATTEMPTS = 20; // should be enough to succeed 
at least once
+  static final int TEST_BATCH_SIZE = 100000;
 
   private static boolean shouldAttempt(int i, long count) {
     return i < MIN_ATTEMPTS || (count == 0 && i < MAX_ATTEMPTS);
@@ -55,8 +55,7 @@ public class AtomicInsertTest extends BaseClickHouseTest {
 
   /** With sufficient block size, ClickHouse will atomically insert all or 
nothing. */
   @Test
-  public void testAtomicInsert() throws SQLException {
-    int size = 100000;
+  public void testAtomicInsert() throws Exception {
     int done = 0;
 
     // inserts to such table fail with 60% chance for 1M batch size
@@ -64,16 +63,16 @@ public class AtomicInsertTest extends BaseClickHouseTest {
         "CREATE TABLE test_atomic_insert ("
             + "  f0 Int64, "
             + "  f1 Int64 MATERIALIZED CAST(if((rand() % "
-            + size
+            + TEST_BATCH_SIZE
             + ") = 0, '', '1') AS Int64)"
             + ") ENGINE=MergeTree ORDER BY (f0)");
 
     pipeline
         // make sure we get one big bundle
-        .apply(RangeBundle.of(size))
+        .apply(RangeBundle.of(TEST_BATCH_SIZE))
         .apply(
-            ClickHouseIO.<Row>write(clickHouse.getJdbcUrl(), 
"test_atomic_insert")
-                .withMaxInsertBlockSize(size)
+            ClickHouseIO.<Row>write(clickHouseUrl, database, 
"test_atomic_insert")
+                .withMaxInsertBlockSize(TEST_BATCH_SIZE)
                 .withInitialBackoff(Duration.millis(1))
                 .withMaxRetries(2));
 
@@ -84,7 +83,7 @@ public class AtomicInsertTest extends BaseClickHouseTest {
     }
 
     // each insert is atomic, so we get exactly done * size elements
-    assertEquals(((long) done) * size, count);
+    assertEquals(((long) done) * TEST_BATCH_SIZE, count);
     assertTrue("insert didn't succeed after " + MAX_ATTEMPTS + " attempts", 
count > 0L);
   }
 
@@ -93,25 +92,24 @@ public class AtomicInsertTest extends BaseClickHouseTest {
    * replicated tables, it will deduplicate blocks.
    */
   @Test
-  public void testIdempotentInsert() throws SQLException {
-    int size = 100000;
+  public void testIdempotentInsert() throws Exception {
 
     // inserts to such table fail with 60% chance for 1M batch size
     executeSql(
         "CREATE TABLE test_idempotent_insert ("
             + "  f0 Int64, "
             + "  f1 Int64 MATERIALIZED CAST(if((rand() % "
-            + size
+            + TEST_BATCH_SIZE
             + ") = 0, '', '1') AS Int64)"
             + ") 
ENGINE=ReplicatedMergeTree('/clickHouse/tables/0/test_idempotent_insert', 
'replica_0') "
             + "ORDER BY (f0)");
 
     pipeline
         // make sure we get one big bundle
-        .apply(RangeBundle.of(size))
+        .apply(RangeBundle.of(TEST_BATCH_SIZE))
         .apply(
-            ClickHouseIO.<Row>write(clickHouse.getJdbcUrl(), 
"test_idempotent_insert")
-                .withMaxInsertBlockSize(size)
+            ClickHouseIO.<Row>write(clickHouseUrl, database, 
"test_idempotent_insert")
+                .withMaxInsertBlockSize(TEST_BATCH_SIZE)
                 .withInitialBackoff(Duration.millis(1))
                 .withMaxRetries(2));
 
@@ -122,7 +120,7 @@ public class AtomicInsertTest extends BaseClickHouseTest {
     }
 
     // inserts should be deduplicated, so we get exactly `size` elements
-    assertEquals(size, count);
+    assertEquals(TEST_BATCH_SIZE, count);
     assertTrue("insert didn't succeed after " + MAX_ATTEMPTS + " attempts", 
count > 0L);
   }
 
diff --git 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
index 5f90f3f3184..d3f6c398252 100644
--- 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
+++ 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
@@ -17,11 +17,13 @@
  */
 package org.apache.beam.sdk.io.clickhouse;
 
+import static org.testcontainers.containers.ClickHouseContainer.HTTP_PORT;
+
+import com.clickhouse.client.api.Client;
+import com.clickhouse.client.api.query.GenericRecord;
+import com.clickhouse.client.api.query.Records;
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -41,14 +43,19 @@ import org.testcontainers.containers.Network;
 public class BaseClickHouseTest {
 
   public static ClickHouseContainer clickHouse;
+  public static String clickHouseUrl;
+  public static String database;
   public static Network network;
   public static GenericContainer zookeeper;
+  static final int CLIENT_TIMEOUT = 30;
+
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseClickHouseTest.class);
 
-  private Connection connection;
+  private Client client;
 
   @BeforeClass
   public static void setup() throws IOException, InterruptedException {
+    System.setProperty("api.version", "1.44");
     network = Network.newNetwork();
 
     zookeeper =
@@ -72,45 +79,130 @@ public class BaseClickHouseTest {
     ;
     clickHouse.start();
     LOG.info("Start Clickhouse");
+    clickHouseUrl = "http://"; + clickHouse.getHost() + ":" + 
clickHouse.getMappedPort(HTTP_PORT);
+    database = "default";
   }
 
   @AfterClass
   public static void tearDown() {
-    clickHouse.close();
-    zookeeper.close();
+    if (clickHouse != null) {
+      clickHouse.close();
+    }
+    if (zookeeper != null) {
+      zookeeper.close();
+    }
   }
 
   @Before
-  public void setUp() throws SQLException {
-    connection = clickHouse.createConnection("");
+  public void setUp() throws Exception {
+    // Create ClickHouse Java Client
+    Client.Builder clientBuilder =
+        new Client.Builder()
+            .addEndpoint(clickHouseUrl)
+            .setUsername(clickHouse.getUsername())
+            .setPassword(clickHouse.getPassword())
+            .setDefaultDatabase(database);
+
+    client = clientBuilder.build();
   }
 
   @After
   public void after() {
-    if (connection != null) {
+    if (client != null) {
       try {
-        connection.close();
-      } catch (SQLException e) {
-        // failed to close connection, ignore
+        client.close();
+      } catch (Exception e) {
+        LOG.warn("Failed to close ClickHouse client", e);
       } finally {
-        connection = null;
+        client = null;
       }
     }
   }
 
-  boolean executeSql(String sql) throws SQLException {
-    Statement statement = connection.createStatement();
-    return statement.execute(sql);
+  /**
+   * Executes a SQL statement (DDL, DML, etc.).
+   *
+   * @param sql SQL statement to execute
+   * @return true if execution was successful
+   * @throws Exception if execution fails
+   */
+  boolean executeSql(String sql) throws Exception {
+    try {
+      client.query(sql).get(CLIENT_TIMEOUT, TimeUnit.SECONDS);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Failed to execute SQL: {}", sql, e);
+      throw e;
+    }
+  }
+
+  /**
+   * Executes a query and returns the results.
+   *
+   * @param sql SQL query to execute
+   * @return Records containing query results
+   * @throws Exception if query fails
+   */
+  Records executeQuery(String sql) throws Exception {
+    try {
+      return client.queryRecords(sql).get(CLIENT_TIMEOUT, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.error("Failed to execute query: {}", sql, e);
+      throw e;
+    }
+  }
+
+  /**
+   * Executes a query and returns the first column of the first row as a long. 
Useful for COUNT
+   * queries or other single-value results.
+   *
+   * @param sql SQL query to execute
+   * @return long value from first column of first row
+   * @throws Exception if query fails or result is empty
+   */
+  long executeQueryAsLong(String sql) throws Exception {
+    try (Records records = executeQuery(sql)) {
+      for (GenericRecord record : records) {
+        // Get the first column value - assuming it's numeric
+        return record.getLong(1); // Column index is 1-based
+      }
+      throw new IllegalStateException("Query returned no results: " + sql);
+    } catch (Exception e) {
+      LOG.error("Failed to execute query as long: {}", sql, e);
+      throw e;
+    }
   }
 
-  ResultSet executeQuery(String sql) throws SQLException {
-    Statement statement = connection.createStatement();
-    return statement.executeQuery(sql);
+  /**
+   * Executes a query and returns the first column of the first row as a 
String.
+   *
+   * @param sql SQL query to execute
+   * @return String value from first column of first row
+   * @throws Exception if query fails or result is empty
+   */
+  String executeQueryAsString(String sql) throws Exception {
+    try (Records records = executeQuery(sql)) {
+      for (GenericRecord record : records) {
+        return record.getString(1); // Column index is 1-based
+      }
+      throw new IllegalStateException("Query returned no results: " + sql);
+    } catch (Exception e) {
+      LOG.error("Failed to execute query as string: {}", sql, e);
+      throw e;
+    }
   }
 
-  long executeQueryAsLong(String sql) throws SQLException {
-    ResultSet rs = executeQuery(sql);
-    rs.next();
-    return rs.getLong(1);
+  /**
+   * Checks if the ClickHouse server is alive and responsive.
+   *
+   * @return true if server responds to ping
+   */
+  boolean isServerAlive() {
+    try {
+      return client != null && client.ping();
+    } catch (Exception e) {
+      LOG.warn("Server ping failed", e);
+      return false;
+    }
   }
 }
diff --git 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOJdbcBackwardCompatibilityTest.java
 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOJdbcBackwardCompatibilityTest.java
new file mode 100644
index 00000000000..3a4a00421a4
--- /dev/null
+++ 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOJdbcBackwardCompatibilityTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.clickhouse.ClickHouseIO.Write;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration tests for JDBC URL backward compatibility. */
+@RunWith(JUnit4.class)
+public class ClickHouseIOJdbcBackwardCompatibilityTest {
+
+  @Test
+  public void testDeprecatedWriteMethodWithBasicJdbcUrl() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb";
+    String table = "test_table";
+
+    @SuppressWarnings("deprecation")
+    Write<?> write = ClickHouseIO.write(jdbcUrl, table);
+
+    assertEquals("http://localhost:8123";, write.clickHouseUrl());
+    assertEquals("testdb", write.database());
+    assertEquals(table, write.table());
+  }
+
+  @Test
+  public void testDeprecatedWriteMethodWithParameters() {
+    String jdbcUrl = 
"jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret";
+    String table = "test_table";
+
+    @SuppressWarnings("deprecation")
+    Write<?> write = ClickHouseIO.write(jdbcUrl, table);
+
+    assertEquals("http://localhost:8123";, write.clickHouseUrl());
+    assertEquals("testdb", write.database());
+    assertEquals(table, write.table());
+    assertEquals("admin", write.properties().getProperty("user"));
+    assertEquals("secret", write.properties().getProperty("password"));
+  }
+
+  @Test
+  public void testDeprecatedWriteMethodPreservesDefaults() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb";
+    String table = "test_table";
+
+    @SuppressWarnings("deprecation")
+    Write<?> write = ClickHouseIO.write(jdbcUrl, table);
+
+    // Verify defaults are set
+    assertEquals(ClickHouseIO.DEFAULT_MAX_INSERT_BLOCK_SIZE, 
write.maxInsertBlockSize());
+    assertEquals(ClickHouseIO.DEFAULT_MAX_RETRIES, write.maxRetries());
+    assertEquals(ClickHouseIO.DEFAULT_INITIAL_BACKOFF, write.initialBackoff());
+    assertEquals(ClickHouseIO.DEFAULT_MAX_CUMULATIVE_BACKOFF, 
write.maxCumulativeBackoff());
+    assertTrue(write.insertDeduplicate());
+    assertTrue(write.insertDistributedSync());
+  }
+
+  @Test
+  public void testNewWriteMethodEquivalence() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin";
+    String table = "test_table";
+
+    // Old way (deprecated)
+    @SuppressWarnings("deprecation")
+    Write<?> oldWrite = ClickHouseIO.write(jdbcUrl, table);
+
+    // New way
+    Write<?> newWrite =
+        ClickHouseIO.write("http://localhost:8123";, "testdb", table)
+            .withProperties(oldWrite.properties());
+
+    // Should produce equivalent configurations
+    assertEquals(oldWrite.clickHouseUrl(), newWrite.clickHouseUrl());
+    assertEquals(oldWrite.database(), newWrite.database());
+    assertEquals(oldWrite.table(), newWrite.table());
+    assertEquals(
+        oldWrite.properties().getProperty("user"), 
newWrite.properties().getProperty("user"));
+  }
+}
diff --git 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOPropertyMergingTest.java
 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOPropertyMergingTest.java
new file mode 100644
index 00000000000..5bd0687f532
--- /dev/null
+++ 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOPropertyMergingTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Properties;
+import org.apache.beam.sdk.io.clickhouse.ClickHouseIO.Write;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for property conflict detection in ClickHouseIO. */
+@RunWith(JUnit4.class)
+public class ClickHouseIOPropertyMergingTest {
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testDeprecatedWriteExtractsPropertiesFromJdbcUrl() {
+    String jdbcUrl =
+        
"jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret&compress=true";
+    String table = "test_table";
+
+    Write<?> write = ClickHouseIO.write(jdbcUrl, table);
+
+    Properties props = write.properties();
+    assertEquals("admin", props.getProperty("user"));
+    assertEquals("secret", props.getProperty("password"));
+    assertEquals("true", props.getProperty("compress"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  @SuppressWarnings("deprecation")
+  public void testWithPropertiesConflictThrows() {
+    String jdbcUrl = 
"jdbc:clickhouse://localhost:8123/testdb?user=admin&password=old_secret";
+    String table = "test_table";
+
+    Properties conflictingProps = new Properties();
+    conflictingProps.setProperty("password", "new_secret"); // Conflicts!
+
+    ClickHouseIO.write(jdbcUrl, table).withProperties(conflictingProps); // 
Should throw
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testWithPropertiesNoConflictWhenSameValue() {
+    String jdbcUrl = 
"jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret";
+    String table = "test_table";
+
+    Properties sameProps = new Properties();
+    sameProps.setProperty("user", "admin"); // Same value - OK
+    sameProps.setProperty("password", "secret"); // Same value - OK
+
+    Write<?> write = ClickHouseIO.write(jdbcUrl, 
table).withProperties(sameProps);
+
+    assertEquals("admin", write.properties().getProperty("user"));
+    assertEquals("secret", write.properties().getProperty("password"));
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testWithPropertiesAddsNewPropertiesWithoutConflict() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin";
+    String table = "test_table";
+
+    Properties additionalProps = new Properties();
+    additionalProps.setProperty("socket_timeout", "30000"); // New property - 
OK
+    additionalProps.setProperty("compress", "true"); // New property - OK
+
+    Write<?> write = ClickHouseIO.write(jdbcUrl, 
table).withProperties(additionalProps);
+
+    Properties finalProps = write.properties();
+    assertEquals("admin", finalProps.getProperty("user"));
+    assertEquals("30000", finalProps.getProperty("socket_timeout"));
+    assertEquals("true", finalProps.getProperty("compress"));
+  }
+
+  @Test
+  public void testNewWriteMethodWithProperties() {
+    Properties props = new Properties();
+    props.setProperty("user", "admin");
+    props.setProperty("password", "secret");
+
+    Write<?> write =
+        ClickHouseIO.write("http://localhost:8123";, "testdb", 
"test_table").withProperties(props);
+
+    Properties finalProps = write.properties();
+    assertEquals("admin", finalProps.getProperty("user"));
+    assertEquals("secret", finalProps.getProperty("password"));
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testEmptyPropertiesDoesNotAffectExisting() {
+    String jdbcUrl = 
"jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret";
+    String table = "test_table";
+
+    Properties emptyProps = new Properties();
+
+    Write<?> write = ClickHouseIO.write(jdbcUrl, 
table).withProperties(emptyProps);
+
+    Properties finalProps = write.properties();
+    assertEquals("admin", finalProps.getProperty("user"));
+    assertEquals("secret", finalProps.getProperty("password"));
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testWithPropertiesConflictHasDetailedMessage() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?compress=false";
+    String table = "test_table";
+
+    Properties conflictingProps = new Properties();
+    conflictingProps.setProperty("compress", "true"); // Different value
+
+    try {
+      ClickHouseIO.write(jdbcUrl, table).withProperties(conflictingProps);
+      fail("Expected IllegalArgumentException for property conflict");
+    } catch (IllegalArgumentException e) {
+      // Verify error message is helpful
+      assertTrue(e.getMessage().contains("compress"));
+      assertTrue(e.getMessage().contains("false"));
+      assertTrue(e.getMessage().contains("true"));
+      assertTrue(e.getMessage().contains("conflict"));
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  @SuppressWarnings("deprecation")
+  public void testMultipleWithPropertiesCallsWithConflict() {
+    String jdbcUrl = 
"jdbc:clickhouse://localhost:8123/testdb?password=original";
+    String table = "test_table";
+
+    Properties props1 = new Properties();
+    props1.setProperty("compress", "true"); // New property - OK
+
+    Properties props2 = new Properties();
+    props2.setProperty("password", "secret2"); // Conflicts with JDBC URL!
+
+    ClickHouseIO.write(jdbcUrl, 
table).withProperties(props1).withProperties(props2);
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testMultipleWithPropertiesCallsWithoutConflict() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin";
+    String table = "test_table";
+
+    Properties props1 = new Properties();
+    props1.setProperty("compress", "true"); // New property - OK
+
+    Properties props2 = new Properties();
+    props2.setProperty("socket_timeout", "30000"); // New property - OK
+
+    Write<?> write =
+        ClickHouseIO.write(jdbcUrl, 
table).withProperties(props1).withProperties(props2);
+
+    Properties finalProps = write.properties();
+    assertEquals("admin", finalProps.getProperty("user")); // From JDBC URL
+    assertEquals("true", finalProps.getProperty("compress")); // From first 
withProperties
+    assertEquals("30000", finalProps.getProperty("socket_timeout")); // From 
second withProperties
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  @SuppressWarnings("deprecation")
+  public void testCannotOverrideJdbcUrlProperties() {
+    // This test verifies the NEW behavior: conflicts are not allowed
+    String jdbcUrl = 
"jdbc:clickhouse://localhost:8123/testdb?user=url_user&password=url_pass";
+    String table = "test_table";
+
+    Properties conflictingProps = new Properties();
+    conflictingProps.setProperty("user", "explicit_user"); // Conflict!
+
+    ClickHouseIO.write(jdbcUrl, table).withProperties(conflictingProps); // 
Should throw
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testCanAddPropertiesToJdbcUrlWithoutConflict() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin";
+    String table = "test_table";
+
+    Properties additionalProps = new Properties();
+    additionalProps.setProperty("password", "secret"); // New - no conflict
+    additionalProps.setProperty("compress", "true"); // New - no conflict
+
+    Write<?> write = ClickHouseIO.write(jdbcUrl, 
table).withProperties(additionalProps);
+
+    Properties finalProps = write.properties();
+    assertEquals("admin", finalProps.getProperty("user"));
+    assertEquals("secret", finalProps.getProperty("password"));
+    assertEquals("true", finalProps.getProperty("compress"));
+  }
+}
diff --git 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
index 8e5dc7ebe38..64f7f86177b 100644
--- 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
+++ 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
@@ -19,10 +19,14 @@ package org.apache.beam.sdk.io.clickhouse;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-import java.sql.ResultSet;
+import com.clickhouse.client.api.query.GenericRecord;
+import com.clickhouse.client.api.query.Records;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Objects;
+import java.util.Properties;
 import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.Schema;
@@ -155,15 +159,17 @@ public class ClickHouseIOTest extends BaseClickHouseTest {
 
     pipeline.run().waitUntilFinish();
 
-    try (ResultSet rs = executeQuery("SELECT * FROM test_named_tuples")) {
-      rs.next();
-      assertEquals("[tuple, true]", rs.getString("t0"));
+    try (Records records = executeQuery("SELECT * FROM test_named_tuples")) {
+      for (GenericRecord record : records) {
+        assertArrayEquals(new Object[] {"tuple", true}, record.getTuple("t0"));
+      }
     }
 
-    try (ResultSet rs = executeQuery("SELECT t0.f0 as f0, t0.f1 as f1 FROM 
test_named_tuples")) {
-      rs.next();
-      assertEquals("tuple", rs.getString("f0"));
-      assertEquals("true", rs.getString("f1"));
+    try (Records records = executeQuery("SELECT t0.f0 as f0, t0.f1 as f1 FROM 
test_named_tuples")) {
+      for (GenericRecord record : records) {
+        assertEquals("tuple", record.getString("f0"));
+        assertTrue(record.getBoolean("f1"));
+      }
     }
   }
 
@@ -202,17 +208,24 @@ public class ClickHouseIOTest extends BaseClickHouseTest {
 
     pipeline.run().waitUntilFinish();
 
-    try (ResultSet rs = executeQuery("SELECT * FROM 
test_named_complex_tuples")) {
-      rs.next();
-      assertEquals("[[test, [10, 20], 1.0.0], mobile]", rs.getString("prop"));
+    try (Records records = executeQuery("SELECT * FROM 
test_named_complex_tuples")) {
+      for (GenericRecord record : records) {
+        //        Object[] propValue = record.getTuple("prop");
+        // Adjust assertion based on actual output
+        assertArrayEquals(
+            new Object[] {new Object[] {"test", new Object[] {10L, 20L}, 
"1.0.0"}, "mobile"},
+            record.getTuple("prop"));
+        //        assertEquals("(('test',[10,20],'1.0.0'),'mobile')", 
propValue);
+      }
     }
 
-    try (ResultSet rs =
+    try (Records records =
         executeQuery(
             "SELECT prop.browser.name as name, prop.browser.size as size FROM 
test_named_complex_tuples")) {
-      rs.next();
-      assertEquals("test", rs.getString("name"));
-      assertEquals("[10, 20]", rs.getString("size"));
+      for (GenericRecord record : records) {
+        assertEquals("test", record.getString("name"));
+        assertArrayEquals(new Object[] {10L, 20L}, record.getTuple("size"));
+      }
     }
   }
 
@@ -292,29 +305,32 @@ public class ClickHouseIOTest extends BaseClickHouseTest {
 
     pipeline.run().waitUntilFinish();
 
-    try (ResultSet rs = executeQuery("SELECT * FROM test_primitive_types")) {
-      rs.next();
-
-      assertEquals("2030-10-01", rs.getString("f0"));
-      assertEquals("2030-10-09 08:07:06", rs.getString("f1"));
-      assertEquals("2.2", rs.getString("f2"));
-      assertEquals("3.3", rs.getString("f3"));
-      assertEquals("4", rs.getString("f4"));
-      assertEquals("5", rs.getString("f5"));
-      assertEquals("6", rs.getString("f6"));
-      assertEquals("7", rs.getString("f7"));
-      assertEquals("eight", rs.getString("f8"));
-      assertEquals("9", rs.getString("f9"));
-      assertEquals("10", rs.getString("f10"));
-      assertEquals("11", rs.getString("f11"));
-      assertEquals("12", rs.getString("f12"));
-      assertEquals("abc", rs.getString("f13"));
-      assertEquals("cde", rs.getString("f14"));
-      assertArrayEquals(new byte[] {'q', 'w', 'e'}, rs.getBytes("f15"));
-      assertArrayEquals(new byte[] {'a', 's', 'd'}, rs.getBytes("f16"));
-      assertArrayEquals(new byte[] {'z', 'x', 'c'}, rs.getBytes("f17"));
-      assertEquals("true", rs.getString("f18"));
-      assertEquals("lowcardenality", rs.getString("f19"));
+    try (Records records = executeQuery("SELECT * FROM test_primitive_types")) 
{
+      for (GenericRecord record : records) {
+        assertEquals("2030-10-01", record.getString("f0"));
+        assertEquals("2030-10-09 08:07:06", record.getString("f1"));
+        assertEquals("2.2", record.getString("f2"));
+        assertEquals("3.3", record.getString("f3"));
+        assertEquals("4", record.getString("f4"));
+        assertEquals("5", record.getString("f5"));
+        assertEquals("6", record.getString("f6"));
+        assertEquals("7", record.getString("f7"));
+        assertEquals("eight", record.getString("f8"));
+        assertEquals("9", record.getString("f9"));
+        assertEquals("10", record.getString("f10"));
+        assertEquals("11", record.getString("f11"));
+        assertEquals("12", record.getString("f12"));
+        assertEquals("abc", record.getString("f13"));
+        assertEquals("cde", record.getString("f14"));
+        assertArrayEquals(
+            new byte[] {'q', 'w', 'e'}, 
record.getString("f15").getBytes(StandardCharsets.UTF_8));
+        assertArrayEquals(
+            new byte[] {'a', 's', 'd'}, 
record.getString("f16").getBytes(StandardCharsets.UTF_8));
+        assertArrayEquals(
+            new byte[] {'z', 'x', 'c'}, 
record.getString("f17").getBytes(StandardCharsets.UTF_8));
+        assertEquals("true", record.getString("f18"));
+        assertEquals("lowcardenality", record.getString("f19"));
+      }
     }
   }
 
@@ -388,26 +404,28 @@ public class ClickHouseIOTest extends BaseClickHouseTest {
 
     pipeline.run().waitUntilFinish();
 
-    try (ResultSet rs = executeQuery("SELECT * FROM 
test_array_of_primitive_types")) {
-      rs.next();
-
-      assertEquals("[2030-10-01, 2031-10-01]", rs.getString("f0"));
-      assertEquals("[2030-10-09T08:07:06, 2031-10-09T08:07:06]", 
rs.getString("f1"));
-      // Since comparing float/double values is not precise, we compare the 
string representation
-      assertEquals("[2.2,3.3]", rs.getString("f2"));
-      assertEquals("[3.3,4.4]", rs.getString("f3"));
-      assertArrayEquals(new byte[] {4, 5}, (byte[]) 
rs.getArray("f4").getArray());
-      assertArrayEquals(new short[] {5, 6}, (short[]) 
rs.getArray("f5").getArray());
-      assertArrayEquals(new int[] {6, 7}, (int[]) 
rs.getArray("f6").getArray());
-      assertArrayEquals(new long[] {7L, 8L}, (long[]) 
rs.getArray("f7").getArray());
-      assertArrayEquals(new String[] {"eight", "nine"}, (String[]) 
rs.getArray("f8").getArray());
-      assertArrayEquals(new byte[] {9, 10}, (byte[]) 
rs.getArray("f9").getArray());
-      assertArrayEquals(new short[] {10, 11}, (short[]) 
rs.getArray("f10").getArray());
-      assertArrayEquals(new int[] {11, 12}, (int[]) 
rs.getArray("f11").getArray());
-      assertArrayEquals(new long[] {12L, 13L}, (long[]) 
rs.getArray("f12").getArray());
-      assertArrayEquals(new String[] {"abc", "cde"}, (String[]) 
rs.getArray("f13").getArray());
-      assertArrayEquals(new String[] {"cde", "abc"}, (String[]) 
rs.getArray("f14").getArray());
-      assertArrayEquals(new boolean[] {true, false}, (boolean[]) 
rs.getArray("f15").getArray());
+    try (Records records = executeQuery("SELECT * FROM 
test_array_of_primitive_types")) {
+      for (GenericRecord record : records) {
+        // Date/time arrays as strings
+        assertEquals("[2030-10-01, 2031-10-01]", record.getString("f0"));
+        assertEquals("[2030-10-09 08:07:06, 2031-10-09 08:07:06]", 
record.getString("f1"));
+        assertEquals("[2.2, 3.3]", record.getString("f2"));
+        assertEquals("[3.3, 4.4]", record.getString("f3"));
+
+        // Use the proper typed array methods
+        assertArrayEquals(new byte[] {4, 5}, record.getByteArray("f4")); // 
Int8
+        assertArrayEquals(new short[] {5, 6}, record.getShortArray("f5")); // 
Int16
+        assertArrayEquals(new int[] {6, 7}, record.getIntArray("f6")); // Int32
+        assertArrayEquals(new long[] {7L, 8L}, record.getLongArray("f7")); // 
Int64
+        assertArrayEquals(new String[] {"eight", "nine"}, 
record.getStringArray("f8")); // String
+        assertArrayEquals(new short[] {9, 10}, record.getShortArray("f9")); // 
UInt8 -> short
+        assertArrayEquals(new int[] {10, 11}, record.getIntArray("f10")); // 
UInt16 -> int
+        assertArrayEquals(new long[] {11, 12}, record.getLongArray("f11")); // 
UInt32 -> long
+        assertEquals("[12, 13]", record.getString("f12")); // UInt64
+        assertEquals("[abc, cde]", record.getString("f13")); // FixedString
+        assertEquals("[cde, abc]", record.getString("f14")); // FixedString
+        assertArrayEquals(new boolean[] {true, false}, 
record.getBooleanArray("f15"));
+      }
     }
   }
 
@@ -475,6 +493,12 @@ public class ClickHouseIOTest extends BaseClickHouseTest {
   }
 
   private <T> ClickHouseIO.Write<T> write(String table) {
-    return ClickHouseIO.<T>write(clickHouse.getJdbcUrl(), 
table).withMaxRetries(0);
+    Properties properties = new Properties();
+    properties.setProperty("user", clickHouse.getUsername());
+    properties.setProperty("password", clickHouse.getPassword());
+
+    return ClickHouseIO.<T>write(clickHouseUrl, database, table)
+        .withProperties(properties)
+        .withMaxRetries(0);
   }
 }
diff --git 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParserTest.java
 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParserTest.java
new file mode 100644
index 00000000000..4b994522d9b
--- /dev/null
+++ 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParserTest.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Properties;
+import org.apache.beam.sdk.io.clickhouse.ClickHouseJdbcUrlParser.ParsedJdbcUrl;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ClickHouseJdbcUrlParser}. */
+@RunWith(JUnit4.class)
+public class ClickHouseJdbcUrlParserTest {
+
+  @Test
+  public void testBasicJdbcUrl() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/default";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("default", parsed.getDatabase());
+    assertTrue(parsed.getProperties().isEmpty());
+  }
+
+  @Test
+  public void testJdbcUrlWithCustomDatabase() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+    assertTrue(parsed.getProperties().isEmpty());
+  }
+
+  @Test
+  public void testJdbcUrlWithoutPort() {
+    String jdbcUrl = "jdbc:clickhouse://localhost/mydb";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+  }
+
+  @Test
+  public void testJdbcUrlWithoutDatabase() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("default", parsed.getDatabase());
+  }
+
+  @Test
+  public void testJdbcUrlWithTrailingSlash() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("default", parsed.getDatabase());
+  }
+
+  @Test
+  public void testJdbcUrlWithHttpPrefix() {
+    String jdbcUrl = "jdbc:clickhouse:http://localhost:8123/mydb";;
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+  }
+
+  @Test
+  public void testJdbcUrlWithHttpsPrefix() {
+    String jdbcUrl = "jdbc:clickhouse:https://localhost:8443/mydb";;
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("https://localhost:8443";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+  }
+
+  @Test
+  public void testJdbcUrlWithHttpsWithoutPort() {
+    String jdbcUrl = "jdbc:clickhouse:https://localhost/mydb";;
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("https://localhost:8443";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+  }
+
+  @Test
+  public void testJdbcUrlWithSingleParameter() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb?user=admin";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+    assertEquals("admin", parsed.getProperties().getProperty("user"));
+  }
+
+  @Test
+  public void testJdbcUrlWithMultipleParameters() {
+    String jdbcUrl =
+        
"jdbc:clickhouse://localhost:8123/mydb?user=admin&password=secret&compress=true";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+
+    Properties props = parsed.getProperties();
+    assertEquals("admin", props.getProperty("user"));
+    assertEquals("secret", props.getProperty("password"));
+    assertEquals("true", props.getProperty("compress"));
+  }
+
+  @Test
+  public void testJdbcUrlWithUrlEncodedParameters() {
+    String jdbcUrl = 
"jdbc:clickhouse://localhost:8123/mydb?user=my%20user&password=p%40ssw0rd";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    Properties props = parsed.getProperties();
+    assertEquals("my user", props.getProperty("user"));
+    assertEquals("p@ssw0rd", props.getProperty("password"));
+  }
+
+  @Test
+  public void testJdbcUrlWithParameterWithoutValue() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb?compress";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("true", parsed.getProperties().getProperty("compress"));
+  }
+
+  @Test
+  public void testJdbcUrlShorthandCh() {
+    String jdbcUrl = "jdbc:ch://localhost:8123/mydb";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+  }
+
+  @Test
+  public void testJdbcUrlWithRemoteHost() {
+    String jdbcUrl = 
"jdbc:clickhouse://clickhouse.example.com:9000/production";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://clickhouse.example.com:9000";, 
parsed.getClickHouseUrl());
+    assertEquals("production", parsed.getDatabase());
+  }
+
+  @Test
+  public void testJdbcUrlWithIpAddress() {
+    String jdbcUrl = "jdbc:clickhouse://192.168.1.100:8123/mydb";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://192.168.1.100:8123";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+  }
+
+  @Test
+  public void testJdbcUrlWithComplexQueryString() {
+    String jdbcUrl =
+        "jdbc:clickhouse://localhost:8123/mydb?"
+            + "user=admin&password=secret&"
+            + "socket_timeout=30000&"
+            + "connection_timeout=10000&"
+            + "compress=true";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    Properties props = parsed.getProperties();
+    assertEquals("admin", props.getProperty("user"));
+    assertEquals("secret", props.getProperty("password"));
+    assertEquals("30000", props.getProperty("socket_timeout"));
+    assertEquals("10000", props.getProperty("connection_timeout"));
+    assertEquals("true", props.getProperty("compress"));
+  }
+
+  @Test
+  public void testJdbcUrlCaseInsensitivePrefix() {
+    String jdbcUrl = "JDBC:CLICKHOUSE://localhost:8123/mydb";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+  }
+
+  @Test
+  public void testJdbcUrlWithDatabaseContainingUnderscore() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/my_database_name";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("my_database_name", parsed.getDatabase());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNullJdbcUrl() {
+    ClickHouseJdbcUrlParser.parse(null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyJdbcUrl() {
+    ClickHouseJdbcUrlParser.parse("");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidJdbcPrefix() {
+    ClickHouseJdbcUrlParser.parse("jdbc:mysql://localhost:3306/mydb");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidSchemeFtp() {
+    ClickHouseJdbcUrlParser.parse("jdbc:clickhouse:ftp://localhost:8123/mydb";);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidSchemeGopher() {
+    
ClickHouseJdbcUrlParser.parse("jdbc:clickhouse:gopher://localhost:8123/mydb";);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidSchemeFile() {
+    
ClickHouseJdbcUrlParser.parse("jdbc:clickhouse:file://localhost:8123/mydb");
+  }
+
+  @Test
+  public void testValidHttpSchemeExplicit() {
+    String jdbcUrl = "jdbc:clickhouse:http://localhost:8123/mydb";;
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+  }
+
+  @Test
+  public void testValidHttpsSchemeExplicit() {
+    String jdbcUrl = "jdbc:clickhouse:https://localhost:8443/mydb";;
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("https://localhost:8443";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+  }
+
+  @Test
+  public void testImplicitHttpScheme() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testMissingHost() {
+    ClickHouseJdbcUrlParser.parse("jdbc:clickhouse://:8123/mydb");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testMalformedUrl() {
+    
ClickHouseJdbcUrlParser.parse("jdbc:clickhouse://localhost:invalid_port/mydb");
+  }
+
+  @Test
+  public void testJdbcUrlWithoutJdbcPrefix() {
+    // Should still work if user somehow passes URL without jdbc: prefix
+    String jdbcUrl = "clickhouse://localhost:8123/mydb";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("mydb", parsed.getDatabase());
+  }
+
+  @Test
+  public void testBackwardCompatibilityScenario() {
+    // Simulating a real-world legacy JDBC URL
+    String legacyJdbcUrl =
+        "jdbc:clickhouse://prod-clickhouse.internal:8123/analytics?"
+            + "user=analytics_user&"
+            + "password=secure123&"
+            + "compress=true&"
+            + "socket_timeout=60000";
+
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(legacyJdbcUrl);
+
+    assertEquals("http://prod-clickhouse.internal:8123";, 
parsed.getClickHouseUrl());
+    assertEquals("analytics", parsed.getDatabase());
+
+    Properties props = parsed.getProperties();
+    assertEquals("analytics_user", props.getProperty("user"));
+    assertEquals("secure123", props.getProperty("password"));
+    assertEquals("true", props.getProperty("compress"));
+    assertEquals("60000", props.getProperty("socket_timeout"));
+  }
+
+  @Test
+  public void testJdbcUrlWithMultipleSlashesInPath() {
+    // Edge case: malformed URL with multiple slashes
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123//mydb";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    // URI parsing should normalize this
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("/mydb", parsed.getDatabase()); // Will have leading slash
+  }
+
+  @Test
+  public void testJdbcUrlWithQueryButNoDatabase() {
+    String jdbcUrl = "jdbc:clickhouse://localhost:8123?user=admin";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    assertEquals("http://localhost:8123";, parsed.getClickHouseUrl());
+    assertEquals("default", parsed.getDatabase());
+    assertEquals("admin", parsed.getProperties().getProperty("user"));
+  }
+
+  @Test
+  public void testJdbcUrlWithEmptyQueryParameter() {
+    String jdbcUrl = 
"jdbc:clickhouse://localhost:8123/mydb?user=&password=secret";
+    ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+    Properties props = parsed.getProperties();
+    assertEquals("", props.getProperty("user"));
+    assertEquals("secret", props.getProperty("password"));
+  }
+}

Reply via email to