This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1b26192cb [Bug] [Connector-V2] Fix ClickhouseFile Committer
Serializable Problems (#3803)
1b26192cb is described below
commit 1b26192cb3e1df0699c1ced4f9ae35a8796a5bd1
Author: Hisoka <[email protected]>
AuthorDate: Mon Dec 26 17:27:57 2022 +0800
[Bug] [Connector-V2] Fix ClickhouseFile Committer Serializable Problems
(#3803)
---
.../sink/file/ClickhouseFileSinkAggCommitter.java | 24 +++++++++++++++++++---
.../clickhouse/sink/file/ClickhouseTable.java | 3 ++-
2 files changed, 23 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
index 80ce83aa5..5290ff0bd 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
@@ -37,10 +37,13 @@ import java.util.Map;
public class ClickhouseFileSinkAggCommitter implements
SinkAggregatedCommitter<CKFileCommitInfo, CKFileAggCommitInfo> {
- private final ClickhouseProxy proxy;
+ private transient ClickhouseProxy proxy;
private final ClickhouseTable clickhouseTable;
+ private final FileReaderOption fileReaderOption;
+
public ClickhouseFileSinkAggCommitter(FileReaderOption readerOption) {
+ fileReaderOption = readerOption;
proxy = new
ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode());
clickhouseTable =
proxy.getClickhouseTable(readerOption.getShardMetadata().getDatabase(),
readerOption.getShardMetadata().getTable());
@@ -76,13 +79,28 @@ public class ClickhouseFileSinkAggCommitter implements
SinkAggregatedCommitter<C
}
+ private ClickhouseProxy getProxy() {
+ if (proxy != null) {
+ return proxy;
+ }
+ synchronized (this) {
+ if (proxy != null) {
+ return proxy;
+ }
+ proxy = new
ClickhouseProxy(fileReaderOption.getShardMetadata().getDefaultShard().getNode());
+ return proxy;
+ }
+ }
+
@Override
public void close() throws IOException {
- proxy.close();
+ if (proxy != null) {
+ proxy.close();
+ }
}
private void attachFileToClickhouse(Shard shard, List<String>
clickhouseLocalFiles) throws ClickHouseException {
- ClickHouseRequest<?> request = proxy.getClickhouseConnection(shard);
+ ClickHouseRequest<?> request =
getProxy().getClickhouseConnection(shard);
for (String clickhouseLocalFile : clickhouseLocalFiles) {
ClickHouseResponse response = request.query(String.format("ALTER
TABLE %s ATTACH PART '%s'",
clickhouseTable.getLocalTableName(),
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
index a6c8e0fe0..b26a2264f 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
@@ -19,10 +19,11 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
-public class ClickhouseTable {
+public class ClickhouseTable implements Serializable {
private String database;
private String tableName;