mzitnik commented on code in PR #37611:
URL: https://github.com/apache/beam/pull/37611#discussion_r2811220769
##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java:
##########
@@ -491,57 +633,106 @@ private static String tuplePreprocessing(String payload)
{
String.join(",", l).trim().replaceAll("Tuple\\(",
"Tuple('").replaceAll(",", ",'");
return content;
}
+
/**
- * Returns {@link TableSchema} for a given table.
+ * Returns {@link TableSchema} for a given table using JDBC URL format.
+ *
+ * <p><b>Deprecated:</b> Use {@link #getTableSchema(String, String, String,
Properties)} instead
+ * with separate URL, database, table, and properties parameters.
+ *
+ * <p>This method parses the JDBC URL to extract connection details and
properties. For new code,
+ * use the explicit parameter version for better clarity and control.
+ *
+ * <p>Example migration:
*
- * @param jdbcUrl jdbc connection url
+ * <pre>{@code
+ * // Old way (deprecated):
+ * TableSchema schema = ClickHouseIO.getTableSchema(
+ * "jdbc:clickhouse://localhost:8123/mydb?user=admin", "my_table");
+ *
+ * // New way:
+ * Properties props = new Properties();
+ * props.setProperty("user", "admin");
+ * TableSchema schema = ClickHouseIO.getTableSchema(
+ * "http://localhost:8123", "mydb", "my_table", props);
+ * }</pre>
+ *
+ * @param jdbcUrl JDBC connection URL (e.g.,
jdbc:clickhouse://host:port/database?param=value)
* @param table table name
* @return table schema
+ * @deprecated Use {@link #getTableSchema(String, String, String,
Properties)} with explicit
+ * parameters
*/
+ @Deprecated
public static TableSchema getTableSchema(String jdbcUrl, String table) {
- List<TableSchema.Column> columns = new ArrayList<>();
-
- try (ClickHouseConnection connection = new
ClickHouseDataSource(jdbcUrl).getConnection();
- Statement statement = connection.createStatement()) {
-
- ResultSet rs = null; // try-finally is used because findbugs doesn't
like try-with-resource
- try {
- rs = statement.executeQuery("DESCRIBE TABLE " +
quoteIdentifier(table));
-
- while (rs.next()) {
- String name = rs.getString("name");
- String type = rs.getString("type");
- String defaultTypeStr = rs.getString("default_type");
- String defaultExpression = rs.getString("default_expression");
+ ClickHouseJdbcUrlParser.ParsedJdbcUrl parsed =
ClickHouseJdbcUrlParser.parse(jdbcUrl);
+ return getTableSchema(
+ parsed.getClickHouseUrl(), parsed.getDatabase(), table,
parsed.getProperties());
+ }
- ColumnType columnType = null;
- if (type.toLowerCase().trim().startsWith("tuple(")) {
- String content = tuplePreprocessing(type);
- columnType = ColumnType.parse(content);
- } else {
- columnType = ColumnType.parse(type);
- }
- DefaultType defaultType =
DefaultType.parse(defaultTypeStr).orElse(null);
+ /**
+ * Returns {@link TableSchema} for a given table using ClickHouse Java
Client.
+ *
+ * @param clickHouseUrl ClickHouse connection url
+ * @param database ClickHouse database
+ * @param table table name
+ * @param properties connection properties
+ * @return table schema
+ * @since 2.72.0
+ */
+ public static TableSchema getTableSchema(
+ String clickHouseUrl, String database, String table, Properties
properties) {
+ List<TableSchema.Column> columns = new ArrayList<>();
- Object defaultValue;
- if (DefaultType.DEFAULT.equals(defaultType)
- && !Strings.isNullOrEmpty(defaultExpression)) {
- defaultValue = ColumnType.parseDefaultExpression(columnType,
defaultExpression);
- } else {
- defaultValue = null;
+ try {
+ String user = properties.getProperty("user", "default");
+ String password = properties.getProperty("password", "");
+
+ // Create ClickHouse Java Client
+ Client.Builder clientBuilder =
+ new Client.Builder()
+ .addEndpoint(clickHouseUrl)
+ .setUsername(user)
+ .setPassword(password)
+ .setDefaultDatabase(database)
+ .setClientName(
+ String.format("Apache Beam/%s",
ReleaseInfo.getReleaseInfo().getSdkVersion()));
+
+ try (Client client = clientBuilder.build()) {
Review Comment:
The new client has a `getTableSchema` that describes the table schema. I
recommend using it. Basically, it provides a TableSchema for a specific table.
##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java:
##########
@@ -80,12 +91,14 @@
*
* <h4>Deduplication</h4>
*
- * Deduplication is performed by ClickHouse if inserting to <a
- *
href="https://clickhouse.yandex/docs/en/single/#data-replication">ReplicatedMergeTree</a>
or <a
- *
href="https://clickhouse.yandex/docs/en/single/#distributed">Distributed</a>
table on top of
- * ReplicatedMergeTree. Without replication, inserting into regular MergeTree
can produce
- * duplicates, if insert fails, and then successfully retries. However, each
block is inserted
- * atomically, and you can configure block size with {@link
Write#withMaxInsertBlockSize(long)}.
+ * <p>Deduplication is performed by ClickHouse if inserting to <a
Review Comment:
Should we also add a line for
[SharedMergeTree](https://clickhouse.com/docs/cloud/reference/shared-merge-tree)
##########
sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java:
##########
@@ -72,45 +77,130 @@ public static void setup() throws IOException,
InterruptedException {
;
clickHouse.start();
LOG.info("Start Clickhouse");
+ clickHouseUrl = "http://" + clickHouse.getHost() + ":" +
clickHouse.getMappedPort(HTTP_PORT);
+ database = "default";
}
@AfterClass
public static void tearDown() {
- clickHouse.close();
- zookeeper.close();
+ if (clickHouse != null) {
+ clickHouse.close();
+ }
+ if (zookeeper != null) {
+ zookeeper.close();
+ }
}
@Before
- public void setUp() throws SQLException {
- connection = clickHouse.createConnection("");
+ public void setUp() throws Exception {
+ // Create ClickHouse Java Client
+ Client.Builder clientBuilder =
+ new Client.Builder()
+ .addEndpoint(clickHouseUrl)
+ .setUsername(clickHouse.getUsername())
+ .setPassword(clickHouse.getPassword())
+ .setDefaultDatabase(database);
+
+ client = clientBuilder.build();
}
@After
public void after() {
- if (connection != null) {
+ if (client != null) {
try {
- connection.close();
- } catch (SQLException e) {
- // failed to close connection, ignore
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close ClickHouse client", e);
} finally {
- connection = null;
+ client = null;
}
}
}
- boolean executeSql(String sql) throws SQLException {
- Statement statement = connection.createStatement();
- return statement.execute(sql);
+ /**
+ * Executes a SQL statement (DDL, DML, etc.).
+ *
+ * @param sql SQL statement to execute
+ * @return true if execution was successful
+ * @throws Exception if execution fails
+ */
+ boolean executeSql(String sql) throws Exception {
+ try {
+ client.query(sql).get(30, TimeUnit.SECONDS);
Review Comment:
same here lets have 30 as const
##########
sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java:
##########
@@ -55,7 +54,7 @@ private static boolean shouldAttempt(int i, long count) {
/** With sufficient block size, ClickHouse will atomically insert all or
nothing. */
@Test
- public void testAtomicInsert() throws SQLException {
+ public void testAtomicInsert() throws Exception {
int size = 100000;
Review Comment:
lets have it as const
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]