nlu90 commented on a change in pull request #12999:
URL: https://github.com/apache/pulsar/pull/12999#discussion_r761586418
##########
File path:
pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
##########
@@ -18,9 +18,23 @@
*/
package org.apache.pulsar.io.jdbc;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.ClickHouseConnection;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+import java.sql.DriverManager;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+@Slf4j
Review comment:
proper format
##########
File path:
pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
##########
@@ -29,4 +43,48 @@
)
public class ClickHouseJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
+ ClickHouseProperties properties = new ClickHouseProperties();
+ String username = jdbcSinkConfig.getUserName();
+ String password = jdbcSinkConfig.getPassword();
+
+ if (username != null) {
+ properties.setUser(jdbcSinkConfig.getUserName());
+ }
+ if (password != null) {
+ properties.setPassword(jdbcSinkConfig.getPassword());
+ }
+
+ jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+ if (jdbcUrl == null) {
+ throw new IllegalArgumentException("Required jdbc Url
not set.");
+ }
+ // keep connection stable
+ properties.setConnectionTimeout(300000);
+ properties.setSocketTimeout(300000);
+ // load balance strategy
+ BalancedClickhouseDataSource ckDataSource = new
BalancedClickhouseDataSource(jdbcUrl, properties);
+ // to check clickhouse node health
+ ckDataSource.scheduleActualization(60, TimeUnit.SECONDS);
+ final ClickHouseConnection ckConnection =
ckDataSource.getConnection();
+ super.connection = (ckConnection);
+ ckConnection.setAutoCommit(false);
+
+ log.info("Opened jdbc ckConnection: {}, autoCommit: {}",
jdbcUrl, ckConnection.getAutoCommit());
+
+ tableName = jdbcSinkConfig.getTableName();
+ tableId = JdbcUtils.getTableId(ckConnection, tableName);
+ // Init PreparedStatement include insert, delete, update
+ initStatement();
+
+ int timeoutMs = jdbcSinkConfig.getTimeoutMs();
+ batchSize = jdbcSinkConfig.getBatchSize();
+ incomingList = Lists.newArrayList();
+ swapList = Lists.newArrayList();
+ isFlushing = new AtomicBoolean(false);
+
+ flushExecutor = Executors.newScheduledThreadPool(1);
+ flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs,
timeoutMs, TimeUnit.MILLISECONDS);
Review comment:
These code are duplicated in`JdbcAbstractSink`, try to reuse them
instead of copy
##########
File path:
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
##########
@@ -43,13 +44,13 @@
@Slf4j
public abstract class JdbcAbstractSink<T> implements Sink<T> {
// ----- Runtime fields
- private JdbcSinkConfig jdbcSinkConfig;
+ protected JdbcSinkConfig jdbcSinkConfig;
Review comment:
most of the fields can keep `private` if code is reused properly
##########
File path:
pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
##########
@@ -29,4 +43,48 @@
)
public class ClickHouseJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
+ ClickHouseProperties properties = new ClickHouseProperties();
+ String username = jdbcSinkConfig.getUserName();
+ String password = jdbcSinkConfig.getPassword();
+
+ if (username != null) {
+ properties.setUser(jdbcSinkConfig.getUserName());
+ }
+ if (password != null) {
+ properties.setPassword(jdbcSinkConfig.getPassword());
+ }
+
+ jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+ if (jdbcUrl == null) {
+ throw new IllegalArgumentException("Required jdbc Url
not set.");
+ }
Review comment:
move to the beginning of the method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]