This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bed2e562ece Update ClickHouseIO to use the latest version of the 
ClickHouse JDBC driver (0.6.4) (#32229)
bed2e562ece is described below

commit bed2e562ecedbba8dd6f95205f181fb2c770ba77
Author: Mark Zitnik <[email protected]>
AuthorDate: Mon Aug 19 19:03:05 2024 +0300

    Update ClickHouseIO to use the latest version of the ClickHouse JDBC driver 
(0.6.4) (#32229)
    
    * Update jdbc version to 0.6.4
    
    * Update change log
    
    * Fix W201 Trailing whitespace
---
 CHANGES.md                                         |  1 +
 sdks/java/io/clickhouse/build.gradle               |  2 +-
 .../beam/sdk/io/clickhouse/ClickHouseIO.java       |  4 +--
 .../beam/sdk/io/clickhouse/ClickHouseWriter.java   |  6 ++---
 .../beam/sdk/io/clickhouse/ClickHouseIOTest.java   | 29 +++++++++++-----------
 5 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 7bfbb9cd1f3..1c0957b1fbf 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 * Improvements to the performance of BigqueryIO when using 
withPropagateSuccessfulStorageApiWrites(true) method (Java) 
([#31840](https://github.com/apache/beam/pull/31840)).
 * [Managed Iceberg] Added support for writing to partitioned tables 
([#32102](https://github.com/apache/beam/pull/32102))
+* Update ClickHouseIO to use the latest version of the ClickHouse JDBC driver 
([#32228](https://github.com/apache/beam/issues/32228)).
 
 ## New Features / Improvements
 
diff --git a/sdks/java/io/clickhouse/build.gradle 
b/sdks/java/io/clickhouse/build.gradle
index d61dcbe2660..70f3c1a6387 100644
--- a/sdks/java/io/clickhouse/build.gradle
+++ b/sdks/java/io/clickhouse/build.gradle
@@ -50,7 +50,7 @@ idea {
   }
 }
 
-def clickhouse_jdbc_version = "0.3.2-patch10"
+def clickhouse_jdbc_version = "0.6.4"
 
 dependencies {
   javacc "net.java.dev.javacc:javacc:7.0.9"
diff --git 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
index 7ef643488a2..688b08146f3 100644
--- 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
+++ 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.io.clickhouse;
 
-import com.clickhouse.client.ClickHouseFormat;
 import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.data.ClickHouseFormat;
 import com.clickhouse.jdbc.ClickHouseConnection;
 import com.clickhouse.jdbc.ClickHouseDataSource;
 import com.clickhouse.jdbc.ClickHouseStatement;
@@ -439,7 +439,7 @@ public class ClickHouseIO {
                       ClickHouseWriter.writeRow(out, schema(), row);
                     }
                   })
-              .sendAndWait(); // query happens in a separate thread
+              .executeAndWait(); // query happens in a separate thread
           buffer.clear();
           break;
         } catch (SQLException e) {
diff --git 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
index 09a6ced44d3..73735f56864 100644
--- 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
+++ 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.sdk.io.clickhouse;
 
-import com.clickhouse.client.ClickHouseOutputStream;
-import com.clickhouse.client.ClickHousePipedOutputStream;
-import com.clickhouse.client.data.BinaryStreamUtils;
+import com.clickhouse.data.ClickHouseOutputStream;
+import com.clickhouse.data.ClickHousePipedOutputStream;
+import com.clickhouse.data.format.BinaryStreamUtils;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
diff --git 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
index a203e32dae5..8e5dc7ebe38 100644
--- 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
+++ 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
@@ -393,20 +393,21 @@ public class ClickHouseIOTest extends BaseClickHouseTest {
 
       assertEquals("[2030-10-01, 2031-10-01]", rs.getString("f0"));
       assertEquals("[2030-10-09T08:07:06, 2031-10-09T08:07:06]", 
rs.getString("f1"));
-      assertEquals("[2.2, 3.3]", rs.getString("f2"));
-      assertEquals("[3.3, 4.4]", rs.getString("f3"));
-      assertEquals("[4, 5]", rs.getString("f4"));
-      assertEquals("[5, 6]", rs.getString("f5"));
-      assertEquals("[6, 7]", rs.getString("f6"));
-      assertEquals("[7, 8]", rs.getString("f7"));
-      assertEquals("[eight, nine]", rs.getString("f8"));
-      assertEquals("[9, 10]", rs.getString("f9"));
-      assertEquals("[10, 11]", rs.getString("f10"));
-      assertEquals("[11, 12]", rs.getString("f11"));
-      assertEquals("[12, 13]", rs.getString("f12"));
-      assertEquals("[abc, cde]", rs.getString("f13"));
-      assertEquals("[cde, abc]", rs.getString("f14"));
-      assertEquals("[1, 0]", rs.getString("f15"));
+      // Since comparing float/double values is not precise, we compare the 
string representation
+      assertEquals("[2.2,3.3]", rs.getString("f2"));
+      assertEquals("[3.3,4.4]", rs.getString("f3"));
+      assertArrayEquals(new byte[] {4, 5}, (byte[]) 
rs.getArray("f4").getArray());
+      assertArrayEquals(new short[] {5, 6}, (short[]) 
rs.getArray("f5").getArray());
+      assertArrayEquals(new int[] {6, 7}, (int[]) 
rs.getArray("f6").getArray());
+      assertArrayEquals(new long[] {7L, 8L}, (long[]) 
rs.getArray("f7").getArray());
+      assertArrayEquals(new String[] {"eight", "nine"}, (String[]) 
rs.getArray("f8").getArray());
+      assertArrayEquals(new byte[] {9, 10}, (byte[]) 
rs.getArray("f9").getArray());
+      assertArrayEquals(new short[] {10, 11}, (short[]) 
rs.getArray("f10").getArray());
+      assertArrayEquals(new int[] {11, 12}, (int[]) 
rs.getArray("f11").getArray());
+      assertArrayEquals(new long[] {12L, 13L}, (long[]) 
rs.getArray("f12").getArray());
+      assertArrayEquals(new String[] {"abc", "cde"}, (String[]) 
rs.getArray("f13").getArray());
+      assertArrayEquals(new String[] {"cde", "abc"}, (String[]) 
rs.getArray("f14").getArray());
+      assertArrayEquals(new boolean[] {true, false}, (boolean[]) 
rs.getArray("f15").getArray());
     }
   }
 

Reply via email to