chamikaramj commented on code in PR #37611:
URL: https://github.com/apache/beam/pull/37611#discussion_r2818211175


##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java:
##########
@@ -130,9 +145,57 @@ public class ClickHouseIO {
   public static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = 
Duration.standardDays(1000);
   public static final Duration DEFAULT_INITIAL_BACKOFF = 
Duration.standardSeconds(5);
 
+  /**
+   * Creates a write transform using a JDBC URL format.
+   *
+   * <p><b>Deprecated:</b> Use {@link #write(String, String, String)} instead 
with separate URL,
+   * database, and table parameters.
+   *
+   * <p>This method is provided for backward compatibility. It parses the JDBC 
URL to extract the
+   * connection URL, database name, and any connection properties specified in 
the query string.
+   * Properties can be overridden later using {@link 
Write#withProperties(Properties)}.
+   *
+   * <p>Example:
+   *
+   * <pre>{@code
+   * // Old way (deprecated):
+   * 
ClickHouseIO.write("jdbc:clickhouse://localhost:8123/mydb?user=admin&password=secret",
 "table")
+   *
+   * // New way:
+   * ClickHouseIO.write("http://localhost:8123";, "mydb", "table")
+   *   .withProperties(props)
+   * }</pre>
+   *
+   * <p><b>Property Precedence:</b> Properties from the JDBC URL can be 
overridden by calling {@link

Review Comment:
   This seems error prone. Might it be better to fail if properties in the JDBC 
URL and the properties map are different ? 



##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java:
##########
@@ -431,25 +552,46 @@ private void flush() throws Exception {
       }
 
       batchSize.update(buffer.size());
+
+      // Serialize rows to RowBinary format
+      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+
+      // Wrap ByteArrayOutputStream with ClickHouseOutputStream
+      try (com.clickhouse.data.ClickHouseOutputStream outputStream =
+          com.clickhouse.data.ClickHouseOutputStream.of(byteStream)) {
+        for (Row row : buffer) {
+          ClickHouseWriter.writeRow(outputStream, schema(), row);

Review Comment:
   Usual, pattern in Beam sinks is to write data to a temporary storage system 
(for example, shuffle, files) and finalize that write in a secondary step from 
a single DoFn (for example, BQ load jobs, file writes). Datastore (GCP) for 
example, overrides re-written entities so in that case, we just let the storage 
system handle it.



##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java:
##########
@@ -431,25 +552,46 @@ private void flush() throws Exception {
       }
 
       batchSize.update(buffer.size());
+
+      // Serialize rows to RowBinary format
+      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+
+      // Wrap ByteArrayOutputStream with ClickHouseOutputStream
+      try (com.clickhouse.data.ClickHouseOutputStream outputStream =
+          com.clickhouse.data.ClickHouseOutputStream.of(byteStream)) {
+        for (Row row : buffer) {
+          ClickHouseWriter.writeRow(outputStream, schema(), row);

Review Comment:
   I don't see an effort to dedup retries from the runner side here. Runners 
(for example, Dataflow) may retry failed bundles and such bundles can be of 
arbitrary size. This could result in a large amount of duplicates being written 
the the storage system unless handled correctly. If this dedup is somehow done 
by the storage system itself, this should be oK.



##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java:
##########
@@ -114,8 +129,8 @@
  * <tr><td>{@link TableSchema.TypeName#TUPLE}</td> <td>{@link 
Schema.TypeName#ROW}</td></tr>
  * </table>
  *
- * Nullable row columns are supported through Nullable type in ClickHouse. Low 
cardinality hint is
- * supported through LowCardinality DataType in ClickHouse.
+ * <p>Nullable row columns are supported through Nullable type in ClickHouse. 
Low cardinality hint
+ * is supported through LowCardinality DataType in ClickHouse.

Review Comment:
   Link to https://clickhouse.com/docs/sql-reference/data-types/LowCardinality ?



##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java:
##########
@@ -114,8 +129,8 @@
  * <tr><td>{@link TableSchema.TypeName#TUPLE}</td> <td>{@link 
Schema.TypeName#ROW}</td></tr>
  * </table>
  *
- * Nullable row columns are supported through Nullable type in ClickHouse. Low 
cardinality hint is
- * supported through LowCardinality DataType in ClickHouse.
+ * <p>Nullable row columns are supported through Nullable type in ClickHouse. 
Low cardinality hint

Review Comment:
   Link to https://clickhouse.com/docs/sql-reference/data-types/nullable ?



##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParser.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+/**
+ * Utility class for parsing ClickHouse JDBC URLs and extracting connection 
parameters.
+ *
+ * <p>Used for supporting backward compatibility with the deprecated {@link
+ * ClickHouseIO#write(String, String)} method that accepts JDBC URLs. New code 
should use {@link
+ * ClickHouseIO#write(String, String, String)} with explicit parameters 
instead.
+ *
+ * @deprecated Use {@link ClickHouseIO#write(String, String, String)} with 
separate clickHouseUrl,
+ *     database, and table parameters instead of JDBC URL format.
+ */
+@Deprecated
+class ClickHouseJdbcUrlParser {
+
+  /**
+   * Represents parsed components of a ClickHouse JDBC URL.
+   *
+   * <p>Contains the extracted HTTP/HTTPS URL, database name, and connection 
properties from a JDBC
+   * URL string.
+   *
+   * @deprecated This class supports the deprecated JDBC URL-based API. Use 
separate parameters for
+   *     clickHouseUrl, database, and properties instead.
+   */
+  @Deprecated
+  static class ParsedJdbcUrl {
+    private final String clickHouseUrl;
+    private final String database;
+    private final Properties properties;
+
+    ParsedJdbcUrl(String clickHouseUrl, String database, Properties 
properties) {
+      this.clickHouseUrl = clickHouseUrl;
+      this.database = database;
+      this.properties = properties;
+    }
+
+    public String getClickHouseUrl() {
+      return clickHouseUrl;
+    }
+
+    public String getDatabase() {
+      return database;
+    }
+
+    public Properties getProperties() {
+      return properties;
+    }
+  }
+
+  /**
+   * Parses a ClickHouse JDBC URL into its components.
+   *
+   * <p>Supported formats:
+   *
+   * <ul>
+   *   <li>jdbc:clickhouse://host:port/database?param=value
+   *   <li>jdbc:clickhouse:http://host:port/database?param=value
+   *   <li>jdbc:clickhouse:https://host:port/database?param=value
+   *   <li>jdbc:ch://host:port/database?param=value (ClickHouse JDBC driver 
shorthand)
+   * </ul>
+   *
+   * @param jdbcUrl the JDBC URL to parse
+   * @return ParsedJdbcUrl containing the HTTP/HTTPS URL, database, and 
properties
+   * @throws IllegalArgumentException if the URL format is invalid
+   */
+  static ParsedJdbcUrl parse(String jdbcUrl) {
+    if (Strings.isNullOrEmpty(jdbcUrl)) {
+      throw new IllegalArgumentException("JDBC URL cannot be null or empty");
+    }
+
+    String actualUrl = extractHttpUrl(jdbcUrl);
+
+    try {
+      URI uri = new URI(actualUrl);
+
+      validateScheme(uri.getScheme());
+      String host = validateAndGetHost(uri.getHost(), jdbcUrl);
+      int port = getPortOrDefault(uri.getPort(), uri.getScheme());
+
+      String clickHouseUrl = String.format("%s://%s:%d", uri.getScheme(), 
host, port);
+      String database = extractDatabase(uri.getPath());
+      Properties properties = extractProperties(uri.getQuery());

Review Comment:
   Is this parsing guaranteed to work for all JDBC URLs ?  For example, can v2 
fail due to incomplete URLs not including all necessary properties ? If so it's 
good to fail early here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to