This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 612d0297a [Improve][Connector-V2][StarRocks] Unified exception for
StarRocks source and sink (#3593)
612d0297a is described below
commit 612d0297a09bc46e3fbd5f6cb2ee2ab43ea8c586
Author: john <[email protected]>
AuthorDate: Mon Dec 5 18:32:31 2022 +0800
[Improve][Connector-V2][StarRocks] Unified exception for StarRocks source
and sink (#3593)
---
.../connector-v2/Error-Quick-Reference-Manual.md | 6 ++++
.../starrocks/client/StarRocksSinkManager.java | 11 +++---
.../client/StarRocksStreamLoadVisitor.java | 25 +++++++------
.../exception/StarRocksConnectorErrorCode.java | 42 ++++++++++++++++++++++
.../StarRocksConnectorException.java} | 30 ++++++++--------
.../serialize/StarRocksBaseSerializer.java | 4 ++-
.../serialize/StarRocksDelimiterParser.java | 9 +++--
.../seatunnel/starrocks/sink/StarRocksSink.java | 6 +++-
.../starrocks/sink/StarRocksSinkWriter.java | 6 ++--
9 files changed, 100 insertions(+), 39 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 4b5ef6b10..efca58358 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -181,3 +181,9 @@ problems encountered by users.
| PULSAR-05 | Get last cursor of pulsar topic failed
| When users encounter this error code, it means that get last cursor of pulsar
topic failed, please check it |
| PULSAR-06 | Get partition information of pulsar topic failed
| When users encounter this error code, it means that Get partition information
of pulsar topic failed, please check it |
+## StarRocks Connector Error Codes
+
+| code | description | solution
|
+|---------|-----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| STARROCKS-01 | Flush batch data to sink connector failed | When users
encounter this error code, it means that flush batch data to sink connector
failed, please check it |
+| STARROCKS-02 | Writing records to StarRocks failed. | When users encounter
this error code, it means that writing records to StarRocks failed, please
check data from files whether is correct |
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
index ab05da5b5..f51feff7f 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -116,10 +118,10 @@ public class StarRocksSinkManager {
} catch (Exception e) {
log.warn("Writing records to StarRocks failed, retry times =
{}", i, e);
if (i >= sinkConfig.getMaxRetries()) {
- throw new IOException("Writing records to StarRocks
failed.", e);
+ throw new
StarRocksConnectorException(StarRocksConnectorErrorCode.WRITE_RECORDS_FAILED,
"The number of retries was exceeded, writing records to StarRocks failed.", e);
}
- if (e instanceof StarRocksStreamLoadFailedException &&
((StarRocksStreamLoadFailedException) e).needReCreateLabel()) {
+ if (e instanceof StarRocksConnectorException &&
((StarRocksConnectorException) e).needReCreateLabel()) {
String newLabel = createBatchLabel();
log.warn(String.format("Batch label changed from [%s] to
[%s]", tuple.getLabel(), newLabel));
tuple.setLabel(newLabel);
@@ -131,8 +133,7 @@ public class StarRocksSinkManager {
Thread.sleep(backoff);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
- throw new IOException(
- "Unable to flush; interrupted while doing another
attempt.", e);
+ throw new
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, e);
}
}
}
@@ -143,7 +144,7 @@ public class StarRocksSinkManager {
private void checkFlushException() {
if (flushException != null) {
- throw new RuntimeException("Writing records to StarRocks failed.",
flushException);
+ throw new
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED,
flushException);
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
index 7b277b3a9..7d5c1d71b 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
@@ -17,8 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser;
import org.apache.commons.codec.binary.Base64;
@@ -62,7 +65,7 @@ public class StarRocksStreamLoadVisitor {
public Boolean doStreamLoad(StarRocksFlushTuple flushData) throws
IOException {
String host = getAvailableHost();
if (null == host) {
- throw new IOException("None of the host in `load_url` could be
connected.");
+ throw new
StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "None of the host
in `load_url` could be connected.");
}
String loadUrl = new StringBuilder(host)
.append("/api/")
@@ -78,7 +81,7 @@ public class StarRocksStreamLoadVisitor {
final String keyStatus = "Status";
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
LOG.error("unknown result status. {}", loadResult);
- throw new IOException("Unable to flush data to StarRocks: unknown
result status. " + loadResult);
+ throw new
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED,
"Unable to flush data to StarRocks: unknown result status. " + loadResult);
}
if (LOG.isDebugEnabled()) {
LOG.debug(new StringBuilder("StreamLoad
response:\n").append(JsonUtils.toJsonString(loadResult)).toString());
@@ -101,7 +104,7 @@ public class StarRocksStreamLoadVisitor {
errorBuilder.append(JsonUtils.toJsonString(loadResult));
errorBuilder.append('\n');
}
- throw new IOException(errorBuilder.toString());
+ throw new
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED,
errorBuilder.toString());
} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
LOG.debug(new StringBuilder("StreamLoad
response:\n").append(JsonUtils.toJsonString(loadResult)).toString());
// has to block-checking the state to get the final result
@@ -149,7 +152,7 @@ public class StarRocksStreamLoadVisitor {
bos.put("]".getBytes(StandardCharsets.UTF_8));
return bos.array();
}
- throw new RuntimeException("Failed to join rows data, unsupported
`format` from stream load properties:");
+ throw new
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED,
"Failed to join rows data, unsupported `format` from stream load properties:");
}
@SuppressWarnings("unchecked")
@@ -165,12 +168,12 @@ public class StarRocksStreamLoadVisitor {
String queryLoadStateUrl = new
StringBuilder(host).append("/api/").append(sinkConfig.getDatabase()).append("/get_load_state?label=").append(label).toString();
Map<String, Object> result =
httpHelper.doHttpGet(queryLoadStateUrl, getLoadStateHttpHeader(label));
if (result == null) {
- throw new IOException(String.format("Failed to flush data
to StarRocks, Error " +
+ throw new
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED,
String.format("Failed to flush data to StarRocks, Error " +
"could not get the final state of label[%s].\n",
label), null);
}
String labelState = (String) result.get("state");
if (null == labelState) {
- throw new IOException(String.format("Failed to flush data
to StarRocks, Error " +
+ throw new
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED,
String.format("Failed to flush data to StarRocks, Error " +
"could not get the final state of label[%s].
response[%s]\n", label, JsonUtils.toJsonString(result)), null);
}
LOG.info(String.format("Checking label[%s] state[%s]\n",
label, labelState));
@@ -181,15 +184,15 @@ public class StarRocksStreamLoadVisitor {
case RESULT_LABEL_PREPARE:
continue;
case RESULT_LABEL_ABORTED:
- throw new
StarRocksStreamLoadFailedException(String.format("Failed to flush data to
StarRocks, Error " +
- "label[%s] state[%s]\n", label, labelState),
null, true);
+ throw new
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED,
String.format("Failed to flush data to StarRocks, Error " +
+ "label[%s] state[%s]\n", label, labelState),
true);
case RESULT_LABEL_UNKNOWN:
default:
- throw new
StarRocksStreamLoadFailedException(String.format("Failed to flush data to
StarRocks, Error " +
- "label[%s] state[%s]\n", label, labelState),
null);
+ throw new
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED,
String.format("Failed to flush data to StarRocks, Error " +
+ "label[%s] state[%s]\n", label, labelState));
}
} catch (IOException e) {
- throw new IOException(e);
+ throw new
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorErrorCode.java
new file mode 100644
index 000000000..4a6a362d9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorErrorCode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum StarRocksConnectorErrorCode implements SeaTunnelErrorCode {
+ FLUSH_DATA_FAILED("STARROCKS-01", "Flush batch data to sink connector
failed"),
+ WRITE_RECORDS_FAILED("STARROCKS-02", "Writing records to StarRocks failed.");
+ private final String code;
+ private final String description;
+
+ StarRocksConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorException.java
similarity index 51%
rename from
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java
rename to
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorException.java
index 626b38d3f..60713e088 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorException.java
@@ -15,35 +15,33 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
+package org.apache.seatunnel.connectors.seatunnel.starrocks.exception;
-import java.io.IOException;
-import java.util.Map;
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-public class StarRocksStreamLoadFailedException extends IOException {
+public class StarRocksConnectorException extends SeaTunnelRuntimeException {
- static final long serialVersionUID = 1L;
-
- private final Map<String, Object> response;
private boolean reCreateLabel;
- public StarRocksStreamLoadFailedException(String message, Map<String,
Object> response) {
- super(message);
- this.response = response;
+ public StarRocksConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
}
- public StarRocksStreamLoadFailedException(String message, Map<String,
Object> response, boolean reCreateLabel) {
- super(message);
- this.response = response;
+ public StarRocksConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, boolean reCreateLabel) {
+ super(seaTunnelErrorCode, errorMessage);
this.reCreateLabel = reCreateLabel;
}
- public Map<String, Object> getFailedResponse() {
- return response;
+ public StarRocksConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public StarRocksConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
}
public boolean needReCreateLabel() {
return reCreateLabel;
}
-
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
index 190b18b95..7f9cc43ee 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
@@ -18,10 +18,12 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import lombok.Builder;
@@ -70,7 +72,7 @@ public class StarRocksBaseSerializer {
case BYTES:
return new String((byte[]) val);
default:
- throw new UnsupportedOperationException("Unsupported dataType:
" + dataType);
+ throw new
StarRocksConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, dataType + "
is not supported ");
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java
index 1b7ea726f..52b7b73b0 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
+
import com.google.common.base.Strings;
import java.io.StringWriter;
@@ -36,14 +39,14 @@ public class StarRocksDelimiterParser {
String hexStr = sp.substring(2);
// check hex str
if (hexStr.isEmpty()) {
- throw new RuntimeException("Failed to parse delimiter: `Hex str is
empty`");
+ throw new
StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Failed to parse
delimiter: `Hex str is empty`");
}
if (hexStr.length() % 2 != 0) {
- throw new RuntimeException("Failed to parse delimiter: `Hex str
length error`");
+ throw new
StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Failed to parse
delimiter: `Hex str is empty`");
}
for (char hexChar : hexStr.toUpperCase().toCharArray()) {
if (HEX_STRING.indexOf(hexChar) == -1) {
- throw new RuntimeException("Failed to parse delimiter: `Hex
str format error`");
+ throw new
StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Failed to parse
delimiter: `Hex str is empty`");
}
}
// transform to separator
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index a1748c6ea..16f303742 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -24,6 +24,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkCon
import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -34,6 +35,7 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -55,7 +57,9 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.pluginConfig = pluginConfig;
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
NODE_URLS.key(), DATABASE.key(), TABLE.key(), USERNAME.key(), PASSWORD.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
StarRocksConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SINK, result.getMsg()));
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
index 441af0296..6d0d44dee 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
@@ -19,9 +19,11 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksSinkManager;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksCsvSerializer;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksISerializer;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksJsonSerializer;
@@ -73,7 +75,7 @@ public class StarRocksSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
}
} catch (IOException e) {
log.error("Close starRocks manager failed.", e);
- throw new IOException("Close starRocks manager failed.", e);
+ throw new
StarRocksConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
}
}
@@ -84,6 +86,6 @@ public class StarRocksSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
if
(SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) {
return new StarRocksJsonSerializer(seaTunnelRowType);
}
- throw new RuntimeException("Failed to create row serializer,
unsupported `format` from stream load properties.");
+ throw new
StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Failed to create
row serializer, unsupported `format` from stream load properties.");
}
}