This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 04e1743d9 [Improve][Connector-V2][Clickhouse] Unified exception for
Clickhouse source & sink connector (#3563)
04e1743d9 is described below
commit 04e1743d9e9f9675bed5f0813b4c50ab9cba9d15
Author: FWLamb <[email protected]>
AuthorDate: Tue Nov 29 10:51:29 2022 +0800
[Improve][Connector-V2][Clickhouse] Unified exception for Clickhouse source
& sink connector (#3563)
* unified exception
* unified exception
* update
* resolve conflicts
* resolve conflicts
* resolve conflicts
* resolve conflicts
* resolve conflicts
Co-authored-by: yangbinbin <[email protected]>
---
.../connector-v2/Error-Quick-Reference-Manual.md | 13 +++-
.../config/ClickhouseFileCopyMethod.java | 5 +-
.../exception/ClickhouseConnectorErrorCode.java | 49 +++++++++++++++
.../exception/ClickhouseConnectorException.java | 35 +++++++++++
.../clickhouse/sink/client/ClickhouseProxy.java | 54 ++++++++--------
.../clickhouse/sink/client/ClickhouseSink.java | 15 +++--
.../sink/client/ClickhouseSinkWriter.java | 10 +--
.../clickhouse/sink/client/ShardRouter.java | 10 +--
.../clickhouse/sink/file/ClickhouseFileSink.java | 38 ++++++-----
.../sink/file/ClickhouseFileSinkWriter.java | 73 +++++++++++-----------
.../clickhouse/sink/file/FileTransferFactory.java | 4 +-
.../clickhouse/sink/file/RsyncFileTransfer.java | 19 +++---
.../clickhouse/sink/file/ScpFileTransfer.java | 18 +++---
.../sink/inject/ArrayInjectFunction.java | 5 +-
.../sink/inject/StringInjectFunction.java | 5 +-
.../clickhouse/source/ClickhouseSource.java | 14 +++--
.../seatunnel/clickhouse/util/TypeConvertUtil.java | 8 ++-
17 files changed, 260 insertions(+), 115 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 707609d7a..4ed61a0df 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -46,7 +46,7 @@ problems encountered by users.
| CASSANDRA-02 | Add batch SeaTunnelRow data into a batch failed | When users
encounter this error code, it means that cassandra has some problems, please
check it whether is work |
| CASSANDRA-03 | Close cql session of cassandra failed | When users
encounter this error code, it means that cassandra has some problems, please
check it whether is work |
| CASSANDRA-04 | No data in source table | When users
encounter this error code, it means that source cassandra table has no data,
please check it |
-| CASSANDRA-05 | Parse ip address from string field field | When users
encounter this error code, it means that upstream data does not match ip
address format, please check it
|
+| CASSANDRA-05 | Parse ip address from string failed | When users
encounter this error code, it means that upstream data does not match ip
address format, please check it
|
## Slack Connector Error Codes
@@ -158,3 +158,14 @@ problems encountered by users.
|---------|-------------------------------|-----------------------------------------------------------------------------------------------------------|
| HUDI-01 | Create ParquetMetadata failed | When the user encounters this
error code, it indicates that ParquetMetadata creation failed. Please check |
| HUDI-02 | Kerberos Authorized failed | When the user encounters this
error code, it indicates that Kerberos authorization failed. Please check |
+
+## Clickhouse Connector Error Codes
+
+| code | description
| solution
|
+|---------------|---------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| CLICKHOUSE-01 | Field is not existed in target table
| When users encounter this error code, it means that the fields
of upstream data don't meet with target clickhouse table, please check target
clickhouse table structure |
+| CLICKHOUSE-02 | Can’t find password of shard node
| When users encounter this error code, it means that no password
is configured for each node, please check
|
+| CLICKHOUSE-03 | Can’t delete directory
| When users encounter this error code, it means that the
directory does not exist or does not have permission, please check
|
+| CLICKHOUSE-04 | Ssh operation failed, such as
(login,connect,authentication,close) etc... | When users encounter this error
code, it means that the ssh request failed, please check your network
environment |
+| CLICKHOUSE-05 | Get cluster list from clickhouse failed
| When users encounter this error code, it means that the
clickhouse cluster is not configured correctly, please check
|
+| CLICKHOUSE-06 | Shard key not found in table
| When users encounter this error code, it means that the shard
key of the distributed table is not configured, please check
|
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
index cec1f48bb..251dd31de 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+
public enum ClickhouseFileCopyMethod {
SCP("scp"),
RSYNC("rsync"),
@@ -37,6 +40,6 @@ public enum ClickhouseFileCopyMethod {
return clickhouseFileCopyMethod;
}
}
- throw new IllegalArgumentException("Unknown ClickhouseFileCopyMethod:
" + name);
+ throw new
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Unknown
ClickhouseFileCopyMethod: " + name);
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorErrorCode.java
new file mode 100644
index 000000000..3fd0f9f41
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorErrorCode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum ClickhouseConnectorErrorCode implements SeaTunnelErrorCode {
+
+ FIELD_NOT_IN_TABLE("CLICKHOUSE-01", "Field is not existed in target
table"),
+ PASSWORD_NOT_FOUND_IN_SHARD_NODE("CLICKHOUSE-02", "Can’t find password of
shard node"),
+ DELETE_DIRECTORY_FIELD("CLICKHOUSE-03", "Can’t delete directory"),
+ SSH_OPERATION_FAILED("CLICKHOUSE-04", "Ssh operation failed, such as
(login,connect,authentication,close) etc..."),
+ CLUSTER_LIST_GET_FAILED("CLICKHOUSE-05", "Get cluster list from clickhouse
failed"),
+ SHARD_KEY_NOT_FOUND("CLICKHOUSE-06", "Shard key not found in table"),
+ FILE_NOT_EXISTS("CLICKHOUSE-07", "Clickhouse local file not exists");
+
+ private final String code;
+ private final String description;
+
+ ClickhouseConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorException.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorException.java
new file mode 100644
index 000000000..421a83e72
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class ClickhouseConnectorException extends SeaTunnelRuntimeException {
+ public ClickhouseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public ClickhouseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public ClickhouseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
index 3a3aa082c..67b4db7aa 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
@@ -48,7 +52,7 @@ public class ClickhouseProxy {
public ClickhouseProxy(ClickHouseNode node) {
this.client = ClickHouseClient.newInstance(node.getProtocol());
this.clickhouseRequest =
-
client.connect(node).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
+
client.connect(node).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
}
@@ -58,7 +62,7 @@ public class ClickhouseProxy {
public ClickHouseRequest<?> getClickhouseConnection(Shard shard) {
ClickHouseClient c = shardToDataSource
- .computeIfAbsent(shard, s ->
ClickHouseClient.newInstance(s.getNode().getProtocol()));
+ .computeIfAbsent(shard, s ->
ClickHouseClient.newInstance(s.getNode().getProtocol()));
return
c.connect(shard.getNode()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
}
@@ -77,12 +81,12 @@ public class ClickhouseProxy {
// engineFull field will be like : Distributed(cluster,
database, table[, sharding_key[, policy_name]])
String engineFull = record.getValue(0).asString();
List<String> infos =
Arrays.stream(engineFull.substring(12).split(","))
- .map(s -> s.replace("'",
"").trim()).collect(Collectors.toList());
+ .map(s -> s.replace("'",
"").trim()).collect(Collectors.toList());
return new DistributedEngine(infos.get(0), infos.get(1),
infos.get(2).replace("\\)", "").trim());
}
- throw new RuntimeException("Cannot get distributed table from
clickhouse, resultSet is empty");
+ throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot
get distributed table from clickhouse, resultSet is empty");
} catch (ClickHouseException e) {
- throw new RuntimeException("Cannot get distributed table from
clickhouse", e);
+ throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot
get distributed table from clickhouse", e);
}
}
@@ -103,7 +107,7 @@ public class ClickhouseProxy {
try (ClickHouseResponse response =
request.query(sql).executeAndWait()) {
response.records().forEach(r ->
schema.put(r.getValue(0).asString(), r.getValue(1).asString()));
} catch (ClickHouseException e) {
- throw new RuntimeException("Cannot get table schema from
clickhouse", e);
+ throw new
ClickhouseConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, "Cannot
get table schema from clickhouse", e);
}
return schema;
}
@@ -124,16 +128,16 @@ public class ClickhouseProxy {
try (ClickHouseResponse response =
connection.query(sql).executeAndWait()) {
response.records().forEach(r -> {
shardList.add(new Shard(
- r.getValue(0).asInteger(),
- r.getValue(1).asInteger(),
- r.getValue(2).asInteger(),
- r.getValue(3).asString(),
- r.getValue(4).asString(),
- port, database, username, password));
+ r.getValue(0).asInteger(),
+ r.getValue(1).asInteger(),
+ r.getValue(2).asInteger(),
+ r.getValue(3).asString(),
+ r.getValue(4).asString(),
+ port, database, username, password));
});
return shardList;
} catch (ClickHouseException e) {
- throw new RuntimeException("Cannot get cluster shard list from
clickhouse", e);
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.CLUSTER_LIST_GET_FAILED,
"Cannot get cluster shard list from clickhouse", e);
}
}
@@ -149,7 +153,7 @@ public class ClickhouseProxy {
try (ClickHouseResponse response =
clickhouseRequest.query(sql).executeAndWait()) {
List<ClickHouseRecord> records =
response.stream().collect(Collectors.toList());
if (records.isEmpty()) {
- throw new RuntimeException("Cannot get table from clickhouse,
resultSet is empty");
+ throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot
get table from clickhouse, resultSet is empty");
}
ClickHouseRecord record = records.get(0);
String engine = record.getValue(0).asString();
@@ -160,11 +164,11 @@ public class ClickhouseProxy {
if ("Distributed".equals(engine)) {
distributedEngine =
getClickhouseDistributedTable(clickhouseRequest, database, table);
String localTableSQL = String.format("select
engine,create_table_query from system.tables where database = '%s' and name =
'%s'",
- distributedEngine.getDatabase(),
distributedEngine.getTable());
+ distributedEngine.getDatabase(),
distributedEngine.getTable());
try (ClickHouseResponse rs =
clickhouseRequest.query(localTableSQL).executeAndWait()) {
List<ClickHouseRecord> localTableRecords =
rs.stream().collect(Collectors.toList());
if (localTableRecords.isEmpty()) {
- throw new RuntimeException("Cannot get table from
clickhouse, resultSet is empty");
+ throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot
get table from clickhouse, resultSet is empty");
}
String localEngine =
localTableRecords.get(0).getValue(0).asString();
String createLocalTableDDL =
localTableRecords.get(0).getValue(1).asString();
@@ -172,16 +176,16 @@ public class ClickhouseProxy {
}
}
return new ClickhouseTable(
- database,
- table,
- distributedEngine,
- engine,
- createTableDDL,
- engineFull,
- dataPaths,
- getClickhouseTableSchema(clickhouseRequest, table));
+ database,
+ table,
+ distributedEngine,
+ engine,
+ createTableDDL,
+ engineFull,
+ dataPaths,
+ getClickhouseTableSchema(clickhouseRequest, table));
} catch (ClickHouseException e) {
- throw new RuntimeException("Cannot get clickhouse table", e);
+ throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot
get clickhouse table", e);
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 028fe4a5c..b87c8307e 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -29,6 +29,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -40,7 +41,10 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
@@ -85,12 +89,15 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
}
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
+ throw new ClickhouseConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SINK, result.getMsg()));
}
Map<String, Object> defaultConfig = ImmutableMap.<String,
Object>builder()
.put(BULK_SIZE.key(), BULK_SIZE.defaultValue())
.put(SPLIT_MODE.key(), SPLIT_MODE.defaultValue())
- .build();
+ .build();
config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
@@ -123,7 +130,7 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
ClickhouseTable table =
proxy.getClickhouseTable(config.getString(DATABASE.key()),
config.getString(TABLE.key()));
if (!"Distributed".equals(table.getEngine())) {
- throw new IllegalArgumentException("split mode only support
table which engine is " +
+ throw new
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "split mode only
support table which engine is " +
"'Distributed' engine at now");
}
if (config.hasPath(SHARDING_KEY.key())) {
@@ -157,7 +164,7 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
// check if the fields exist in schema
for (String field : fields) {
if (!tableSchema.containsKey(field)) {
- throw new RuntimeException("Field " + field + " does not
exist in table " + config.getString(TABLE.key()));
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE,
"Field " + field + " does not exist in table " + config.getString(TABLE.key()));
}
}
} else {
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index 79411ed5f..b103ecccc 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -20,7 +20,9 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ArrayInjectFunction;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.BigDecimalInjectFunction;
@@ -122,7 +124,7 @@ public class ClickhouseSinkWriter implements
SinkWriter<SeaTunnelRow, CKCommitIn
intHolder.setValue(0);
}
} catch (SQLException e) {
- throw new RuntimeException("Failed to close prepared
statement.", e);
+ throw new
ClickhouseConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "Failed to
close prepared statement.", e);
}
}
}
@@ -145,7 +147,7 @@ public class ClickhouseSinkWriter implements
SinkWriter<SeaTunnelRow, CKCommitIn
}
clickHouseStatement.addBatch();
} catch (SQLException e) {
- throw new RuntimeException("Add row data into batch error", e);
+ throw new
ClickhouseConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "Add row
data into batch error", e);
}
}
@@ -153,7 +155,7 @@ public class ClickhouseSinkWriter implements
SinkWriter<SeaTunnelRow, CKCommitIn
try {
clickHouseStatement.executeBatch();
} catch (Exception e) {
- throw new RuntimeException("Clickhouse execute batch statement
error", e);
+ throw new
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Clickhouse
execute batch statement error", e);
}
}
@@ -169,7 +171,7 @@ public class ClickhouseSinkWriter implements
SinkWriter<SeaTunnelRow, CKCommitIn
new ClickhouseBatchStatement(clickhouseConnection,
preparedStatement, intHolder);
result.put(s, batchStatement);
} catch (SQLException e) {
- throw new RuntimeException("Clickhouse prepare statement
error: " + e.getMessage(), e);
+ throw new
ClickhouseConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "Clickhouse
prepare statement error: " + e.getMessage(), e);
}
});
return result;
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
index 4471f8157..71e6430fc 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
@@ -55,15 +57,15 @@ public class ShardRouter implements Serializable {
this.splitMode = shardMetadata.getSplitMode();
this.table = shardMetadata.getTable();
if (StringUtils.isNotEmpty(shardKey) &&
StringUtils.isEmpty(shardKeyType)) {
- throw new IllegalArgumentException("Shard key " + shardKey + " not
found in table " + table);
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SHARD_KEY_NOT_FOUND,
"Shard key " + shardKey + " not found in table " + table);
}
ClickHouseRequest<?> connection = proxy.getClickhouseConnection();
if (splitMode) {
DistributedEngine localTable =
proxy.getClickhouseDistributedTable(connection, shardMetadata.getDatabase(),
table);
this.shardTable = localTable.getTable();
List<Shard> shardList = proxy.getClusterShardList(connection,
localTable.getClusterName(),
- localTable.getDatabase(),
shardMetadata.getDefaultShard().getNode().getPort(),
- shardMetadata.getUsername(), shardMetadata.getPassword());
+ localTable.getDatabase(),
shardMetadata.getDefaultShard().getNode().getPort(),
+ shardMetadata.getUsername(), shardMetadata.getPassword());
int weight = 0;
for (Shard shard : shardList) {
shards.put(weight, shard);
@@ -87,7 +89,7 @@ public class ShardRouter implements Serializable {
return
shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount + 1)).getValue();
}
int offset = (int)
(HASH_INSTANCE.hash(ByteBuffer.wrap(shardValue.toString().getBytes(StandardCharsets.UTF_8)),
- 0) & Long.MAX_VALUE % shardWeightCount);
+ 0) & Long.MAX_VALUE % shardWeightCount);
return shards.lowerEntry(offset + 1).getValue();
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index e32ecd2ef..973478a8b 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -30,6 +30,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -40,6 +41,8 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
@@ -75,15 +78,18 @@ public class ClickhouseFileSink implements
SeaTunnelSink<SeaTunnelRow, Clickhous
public void prepare(Config config) throws PrepareFailException {
CheckResult checkResult = CheckConfigUtil.checkAllExists(config,
HOST.key(), TABLE.key(), DATABASE.key(), USERNAME.key(), PASSWORD.key(),
CLICKHOUSE_LOCAL_PATH.key());
if (!checkResult.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SINK,
checkResult.getMsg());
+ throw new ClickhouseConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SINK, checkResult.getMsg()));
}
Map<String, Object> defaultConfigs = ImmutableMap.<String,
Object>builder()
- .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName())
- .build();
+ .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName())
+ .build();
config = config.withFallback(ConfigFactory.parseMap(defaultConfigs));
List<ClickHouseNode> nodes =
ClickhouseUtil.createNodes(config.getString(HOST.key()),
- config.getString(DATABASE.key()),
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
+ config.getString(DATABASE.key()),
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
Map<String, String> tableSchema =
proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
@@ -94,34 +100,34 @@ public class ClickhouseFileSink implements
SeaTunnelSink<SeaTunnelRow, Clickhous
shardKeyType = tableSchema.get(shardKey);
}
ShardMetadata shardMetadata = new ShardMetadata(
- shardKey,
- shardKeyType,
- config.getString(DATABASE.key()),
- config.getString(TABLE.key()),
- false, // we don't need to set splitMode in clickhouse file
mode.
- new Shard(1, 1, nodes.get(0)),
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
+ shardKey,
+ shardKeyType,
+ config.getString(DATABASE.key()),
+ config.getString(TABLE.key()),
+ false, // we don't need to set splitMode in clickhouse file mode.
+ new Shard(1, 1, nodes.get(0)), config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
List<String> fields;
if (config.hasPath(FIELDS.key())) {
fields = config.getStringList(FIELDS.key());
// check if the fields exist in schema
for (String field : fields) {
if (!tableSchema.containsKey(field)) {
- throw new RuntimeException("Field " + field + " does not
exist in table " + config.getString(TABLE.key()));
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE,
"Field " + field + " does not exist in table " + config.getString(TABLE.key()));
}
}
} else {
fields = new ArrayList<>(tableSchema.keySet());
}
Map<String, String> nodeUser =
config.getObjectList(NODE_PASS.key()).stream()
- .collect(Collectors.toMap(configObject ->
configObject.toConfig().getString(NODE_ADDRESS),
- configObject ->
configObject.toConfig().hasPath(USERNAME.key()) ?
configObject.toConfig().getString(USERNAME.key()) : "root"));
+ .collect(Collectors.toMap(configObject ->
configObject.toConfig().getString(NODE_ADDRESS),
+ configObject ->
configObject.toConfig().hasPath(USERNAME.key()) ?
configObject.toConfig().getString(USERNAME.key()) : "root"));
Map<String, String> nodePassword =
config.getObjectList(NODE_PASS.key()).stream()
- .collect(Collectors.toMap(configObject ->
configObject.toConfig().getString(NODE_ADDRESS),
- configObject ->
configObject.toConfig().getString(PASSWORD.key())));
+ .collect(Collectors.toMap(configObject ->
configObject.toConfig().getString(NODE_ADDRESS),
+ configObject ->
configObject.toConfig().getString(PASSWORD.key())));
proxy.close();
this.readerOption = new FileReaderOption(shardMetadata, tableSchema,
fields, config.getString(CLICKHOUSE_LOCAL_PATH.key()),
-
ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser,
nodePassword);
+
ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser,
nodePassword);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index ac5900915..6f24e2c79 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -20,7 +20,10 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
@@ -70,18 +73,18 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
proxy = new
ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode());
shardRouter = new ShardRouter(proxy,
this.readerOption.getShardMetadata());
clickhouseTable =
proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(),
- this.readerOption.getShardMetadata().getTable());
+ this.readerOption.getShardMetadata().getTable());
rowCache = new HashMap<>(Common.COLLECTION_SIZE);
nodePasswordCheck();
// find file local save path of each node
shardLocalDataPaths = shardRouter.getShards().values().stream()
- .collect(Collectors.toMap(Function.identity(), shard -> {
- ClickhouseTable shardTable =
proxy.getClickhouseTable(shard.getNode().getDatabase().get(),
- clickhouseTable.getLocalTableName());
- return shardTable.getDataPaths();
- }));
+ .collect(Collectors.toMap(Function.identity(), shard -> {
+ ClickhouseTable shardTable =
proxy.getClickhouseTable(shard.getNode().getDatabase().get(),
+ clickhouseTable.getLocalTableName());
+ return shardTable.getDataPaths();
+ }));
}
@Override
@@ -94,8 +97,8 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
if (!this.readerOption.isNodeFreePass()) {
shardRouter.getShards().values().forEach(shard -> {
if
(!this.readerOption.getNodePassword().containsKey(shard.getNode().getAddress().getHostName())
- &&
!this.readerOption.getNodePassword().containsKey(shard.getNode().getHost())) {
- throw new RuntimeException("Cannot find password of shard
" + shard.getNode().getAddress().getHostName());
+ &&
!this.readerOption.getNodePassword().containsKey(shard.getNode().getHost())) {
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.PASSWORD_NOT_FOUND_IN_SHARD_NODE,
"Cannot find password of shard " + shard.getNode().getAddress().getHostName());
}
});
}
@@ -126,12 +129,12 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
// clear local file
clearLocalFileDirectory(clickhouseLocalFiles);
} catch (Exception e) {
- throw new RuntimeException("Flush data into clickhouse file
error", e);
+ throw new
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data
into clickhouse file error", e);
}
}
private List<String> generateClickhouseLocalFiles(List<SeaTunnelRow> rows)
throws IOException,
- InterruptedException {
+ InterruptedException {
if (rows.isEmpty()) {
return Collections.emptyList();
}
@@ -140,18 +143,18 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
FileUtils.forceMkdir(new File(clickhouseLocalFile));
String clickhouseLocalFileTmpFile = clickhouseLocalFile +
"/local_data.log";
try (FileChannel fileChannel =
FileChannel.open(Paths.get(clickhouseLocalFileTmpFile),
StandardOpenOption.WRITE,
- StandardOpenOption.READ, StandardOpenOption.CREATE_NEW)) {
+ StandardOpenOption.READ, StandardOpenOption.CREATE_NEW)) {
String data = rows.stream()
- .map(row ->
this.readerOption.getFields().stream().map(field ->
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
- .collect(Collectors.joining("\t")))
- .collect(Collectors.joining("\n"));
+ .map(row -> this.readerOption.getFields().stream().map(field
->
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
+ .collect(Collectors.joining("\t")))
+ .collect(Collectors.joining("\n"));
MappedByteBuffer buffer =
fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
- data.getBytes(StandardCharsets.UTF_8).length);
+ data.getBytes(StandardCharsets.UTF_8).length);
buffer.put(data.getBytes(StandardCharsets.UTF_8));
}
List<String> localPaths =
Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(" "))
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
List<String> command = new ArrayList<>(localPaths);
if (localPaths.size() == 1) {
command.add("local");
@@ -164,17 +167,17 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
command.add("\"" + "temp_table" + uuid + "\"");
command.add("-q");
command.add(String.format(
- "\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"",
-
clickhouseTable.getCreateTableDDL().replace(clickhouseTable.getDatabase() +
".", "").replaceAll("`", ""),
- clickhouseTable.getLocalTableName(),
- readerOption.getTableSchema().keySet().stream().map(s -> {
- if (readerOption.getFields().contains(s)) {
- return s;
- } else {
- return "NULL";
- }
- }).collect(Collectors.joining(",")),
- uuid));
+ "\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"",
+
clickhouseTable.getCreateTableDDL().replace(clickhouseTable.getDatabase() +
".", "").replaceAll("`", ""),
+ clickhouseTable.getLocalTableName(),
+ readerOption.getTableSchema().keySet().stream().map(s -> {
+ if (readerOption.getFields().contains(s)) {
+ return s;
+ } else {
+ return "NULL";
+ }
+ }).collect(Collectors.joining(",")),
+ uuid));
command.add("--path");
command.add("\"" + clickhouseLocalFile + "\"");
log.info("Generate clickhouse local file command: {}", String.join("
", command));
@@ -192,16 +195,16 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
start.waitFor();
File file = new File(clickhouseLocalFile + "/data/_local/" +
clickhouseTable.getLocalTableName());
if (!file.exists()) {
- throw new RuntimeException("clickhouse local file not exists");
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.FILE_NOT_EXISTS,
"clickhouse local file not exists");
}
File[] files = file.listFiles();
if (files == null) {
- throw new RuntimeException("clickhouse local file not exists");
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.FILE_NOT_EXISTS,
"clickhouse local file not exists");
}
return Arrays.stream(files)
- .filter(File::isDirectory)
- .filter(f -> !"detached".equals(f.getName()))
- .map(File::getAbsolutePath).collect(Collectors.toList());
+ .filter(File::isDirectory)
+ .filter(f -> !"detached".equals(f.getName()))
+ .map(File::getAbsolutePath).collect(Collectors.toList());
}
private void attachClickhouseLocalFileToServer(Shard shard, List<String>
clickhouseLocalFiles) throws ClickHouseException {
@@ -215,8 +218,8 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
ClickHouseRequest<?> request = proxy.getClickhouseConnection(shard);
for (String clickhouseLocalFile : clickhouseLocalFiles) {
ClickHouseResponse response = request.query(String.format("ALTER
TABLE %s ATTACH PART '%s'",
- clickhouseTable.getLocalTableName(),
-
clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") +
1))).executeAndWait();
+ clickhouseTable.getLocalTableName(),
+
clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") +
1))).executeAndWait();
response.close();
}
}
@@ -230,7 +233,7 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
FileUtils.deleteDirectory(new File(localFileDir));
}
} catch (IOException e) {
- throw new RuntimeException("Unable to delete directory " +
localFileDir, e);
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.DELETE_DIRECTORY_FIELD,
"Unable to delete directory " + localFileDir, e);
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransferFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransferFactory.java
index 14455b45f..3af640c50 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransferFactory.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransferFactory.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
public class FileTransferFactory {
public static FileTransfer createFileTransfer(ClickhouseFileCopyMethod
type, String host, String user, String password) {
@@ -27,7 +29,7 @@ public class FileTransferFactory {
case RSYNC:
return new RsyncFileTransfer(host, user, password);
default:
- throw new RuntimeException("unsupported clickhouse file copy
method:" + type);
+ throw new
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "unsupported
clickhouse file copy method:" + type);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
index 4dd8bc766..9cbf176ef 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.sshd.client.SshClient;
@@ -58,10 +62,11 @@ public class RsyncFileTransfer implements FileTransfer {
}
// TODO support add publicKey to identity
if (!clientSession.auth().verify().isSuccess()) {
- throw new IOException("ssh host " + host + "authentication
failed");
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
"ssh host " + host + "authentication failed");
}
} catch (IOException e) {
- throw new RuntimeException("Failed to connect to host: " + host +
" by user: " + user + " on port 22", e);
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
+ "Failed to connect to host: " + host + " by user: " + user + "
on port 22", e);
}
}
@@ -96,7 +101,7 @@ public class RsyncFileTransfer implements FileTransfer {
}
start.waitFor();
} catch (IOException | InterruptedException ex) {
- throw new RuntimeException("Rsync failed to transfer file: " +
sourcePath + " to: " + targetPath, ex);
+ throw new
ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "Rsync
failed to transfer file: " + sourcePath + " to: " + targetPath, ex);
}
// remote exec command to change file owner. Only file owner equal
with server's clickhouse user can
// make ATTACH command work.
@@ -104,7 +109,7 @@ public class RsyncFileTransfer implements FileTransfer {
command.add("ls");
command.add("-l");
command.add(targetPath.substring(0,
- StringUtils.stripEnd(targetPath, "/").lastIndexOf("/")) + "/");
+ StringUtils.stripEnd(targetPath, "/").lastIndexOf("/")) + "/");
command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R
{}:{} " + targetPath);
try {
String finalCommand = String.join(" ", command);
@@ -118,7 +123,7 @@ public class RsyncFileTransfer implements FileTransfer {
@Override
public void transferAndChown(List<String> sourcePaths, String targetPath) {
if (sourcePaths == null) {
- throw new IllegalArgumentException("sourcePath is null");
+ throw new
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "sourcePath is
null");
}
sourcePaths.forEach(sourcePath -> transferAndChown(sourcePath,
targetPath));
}
@@ -129,7 +134,7 @@ public class RsyncFileTransfer implements FileTransfer {
try {
clientSession.close();
} catch (IOException e) {
- throw new RuntimeException("Failed to close ssh session", e);
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
"Failed to close ssh session", e);
}
}
if (sshClient != null && sshClient.isOpen()) {
@@ -137,7 +142,7 @@ public class RsyncFileTransfer implements FileTransfer {
try {
sshClient.close();
} catch (IOException e) {
- throw new RuntimeException("Failed to close ssh client", e);
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
"Failed to close ssh client", e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
index 1c207276a..1c39815d8 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.sshd.client.SshClient;
@@ -58,11 +62,11 @@ public class ScpFileTransfer implements FileTransfer {
}
// TODO support add publicKey to identity
if (!clientSession.auth().verify().isSuccess()) {
- throw new IOException("ssh host " + host + "authentication
failed");
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
"ssh host " + host + "authentication failed");
}
scpClient =
ScpClientCreator.instance().createScpClient(clientSession);
} catch (IOException e) {
- throw new RuntimeException("Failed to connect to host: " + host +
" by user: " + user + " on port 22", e);
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
"Failed to connect to host: " + host + " by user: " + user + " on port 22", e);
}
}
@@ -76,7 +80,7 @@ public class ScpFileTransfer implements FileTransfer {
ScpClient.Option.TargetIsDirectory,
ScpClient.Option.PreserveAttributes);
} catch (IOException e) {
- throw new RuntimeException("Scp failed to transfer file: " +
sourcePath + " to: " + targetPath, e);
+ throw new
ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "Scp failed
to transfer file: " + sourcePath + " to: " + targetPath, e);
}
// remote exec command to change file owner. Only file owner equal
with server's clickhouse user can
// make ATTACH command work.
@@ -84,7 +88,7 @@ public class ScpFileTransfer implements FileTransfer {
command.add("ls");
command.add("-l");
command.add(targetPath.substring(0,
- StringUtils.stripEnd(targetPath, "/").lastIndexOf("/")) + "/");
+ StringUtils.stripEnd(targetPath, "/").lastIndexOf("/")) + "/");
command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R
{}:{} " + targetPath);
try {
String finalCommand = String.join(" ", command);
@@ -98,7 +102,7 @@ public class ScpFileTransfer implements FileTransfer {
@Override
public void transferAndChown(List<String> sourcePaths, String targetPath) {
if (sourcePaths == null) {
- throw new IllegalArgumentException("sourcePath is null");
+ throw new
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "sourcePath is
null");
}
sourcePaths.forEach(sourcePath -> transferAndChown(sourcePath,
targetPath));
}
@@ -109,7 +113,7 @@ public class ScpFileTransfer implements FileTransfer {
try {
clientSession.close();
} catch (IOException e) {
- throw new RuntimeException("Failed to close ssh session", e);
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
"Failed to close ssh session", e);
}
}
if (sshClient != null && sshClient.isOpen()) {
@@ -117,7 +121,7 @@ public class ScpFileTransfer implements FileTransfer {
try {
sshClient.close();
} catch (IOException e) {
- throw new RuntimeException("Failed to close ssh client", e);
+ throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
"Failed to close ssh client", e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
index 644e61994..606e7f80d 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
@@ -74,7 +77,7 @@ public class ArrayInjectFunction implements
ClickhouseFieldInjectFunction {
elements = Arrays.copyOf(elements, elements.length,
Boolean[].class);
break;
default:
- throw new IllegalArgumentException("array inject error, not
supported data type: " + type);
+ throw new
ClickhouseConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "array
inject error, unsupported data type: " + type);
}
statement.setArray(index,
statement.getConnection().createArrayOf(sqlType, elements));
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
index 96d1f5193..a8ae28eba 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,7 +46,7 @@ public class StringInjectFunction implements
ClickhouseFieldInjectFunction {
statement.setString(index, value.toString());
}
} catch (JsonProcessingException e) {
- throw new RuntimeException(e);
+ throw new
ClickhouseConnectorException(CommonErrorCode.JSON_OPERATION_FAILED,
e.getMessage());
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index 59a24cac9..f9d857b9c 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -24,6 +24,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
@@ -34,6 +35,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
@@ -66,17 +68,18 @@ public class ClickhouseSource implements
SeaTunnelSource<SeaTunnelRow, Clickhous
public void prepare(Config config) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(config,
HOST.key(), DATABASE.key(), SQL.key(), USERNAME.key(), PASSWORD.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
servers = ClickhouseUtil.createNodes(config.getString(HOST.key()),
config.getString(DATABASE.key()),
- config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
+ config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
sql = config.getString(SQL.key());
ClickHouseNode currentServer =
servers.get(ThreadLocalRandom.current().nextInt(servers.size()));
try (ClickHouseClient client =
ClickHouseClient.newInstance(currentServer.getProtocol());
ClickHouseResponse response =
-
client.connect(currentServer).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
-
.query(modifySQLToLimit1(config.getString(SQL.key()))).executeAndWait()) {
+
client.connect(currentServer).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+
.query(modifySQLToLimit1(config.getString(SQL.key()))).executeAndWait()) {
int columnSize = response.getColumns().size();
String[] fieldNames = new String[columnSize];
@@ -90,7 +93,8 @@ public class ClickhouseSource implements
SeaTunnelSource<SeaTunnelRow, Clickhous
this.rowTypeInfo = new SeaTunnelRowType(fieldNames,
seaTunnelDataTypes);
} catch (ClickHouseException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
e.getMessage());
+ throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, e.getMessage()));
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
index abfe96809..4fedc083e 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import com.clickhouse.client.ClickHouseColumn;
import com.clickhouse.client.ClickHouseValue;
@@ -59,7 +61,7 @@ public class TypeConvertUtil {
} else if (BasicType.BYTE_TYPE.equals(dataType)) {
return ArrayType.BYTE_ARRAY_TYPE;
} else {
- throw new IllegalArgumentException("data type in array is not
supported: " + subArrayDataType.getDataType());
+ throw new
ClickhouseConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "data type
in array is not supported: " + subArrayDataType.getDataType());
}
}
Class<?> type = column.getDataType().getObjectClass();
@@ -99,7 +101,7 @@ public class TypeConvertUtil {
return BasicType.STRING_TYPE;
} else {
// TODO support pojo
- throw new IllegalArgumentException("not supported data type: " +
column.getDataType());
+ throw new
ClickhouseConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"unsupported data type: " + column.getDataType());
}
}
@@ -151,7 +153,7 @@ public class TypeConvertUtil {
}
} else {
// TODO support pojo
- throw new IllegalArgumentException("not supported data type: " +
dataType);
+ throw new
ClickhouseConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"unsupported data type: " + dataType);
}
}