This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 273418ddc [Improve][Connector-V2][Kudu] Unified exception for kudu
source & sink connector (#3564)
273418ddc is described below
commit 273418ddc9ae38ec107250f5f16ae0febfa7ed37
Author: Huan Liang <[email protected]>
AuthorDate: Sat Nov 26 12:08:21 2022 +0800
[Improve][Connector-V2][Kudu] Unified exception for kudu source & sink
connector (#3564)
* [Doc]Fix typos in QuickStart Doc
* [Improve][Connector-V2][Kudu] Unified exception for kudu source & sink
connector
* [Improve][Connector-V2][Kudu] Add license header
* [Improve][Connector-V2][Kudu] fix code style
---
.../connector-v2/Error-Quick-Reference-Manual.md | 12 +++++
.../seatunnel/kudu/config/KuduSinkConfig.java | 7 ++-
.../kudu/exception/KuduConnectorErrorCode.java | 52 ++++++++++++++++++++++
.../kudu/exception/KuduConnectorException.java | 35 +++++++++++++++
.../seatunnel/kudu/kuduclient/KuduInputFormat.java | 20 ++++-----
.../kudu/kuduclient/KuduOutputFormat.java | 18 ++++----
.../seatunnel/kudu/kuduclient/KuduTypeMapper.java | 4 +-
.../seatunnel/kudu/source/KuduSource.java | 23 +++++++---
8 files changed, 143 insertions(+), 28 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 4fc996f3c..9fb6664b2 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -90,3 +90,15 @@ This document records some common error codes and
corresponding solutions of Sea
|-------------|------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
| INFLUXDB-01 | Connect influxdb failed, due to influxdb version info is
unknown | When the user encounters this error code, it indicates that the
connection to influxdb failed. Please check |
| INFLUXDB-02 | Get column index of query result exception
| When the user encounters this error code, it indicates that obtaining the
column index failed. Please check |
+
+## Kudu Connector Error Codes
+
+| code | description |
solution
|
+|---------|----------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| KUDU-01 | Get the Kuduscan object for each splice failed | When
users encounter this error code, it is usually there are some problems with
getting the KuduScan Object for each splice, please check your configuration
whether correct and Kudu is work |
+| KUDU-02 | Close Kudu client failed | When
users encounter this error code, it is usually there are some problems with
closing the Kudu client, please check the Kudu is work
|
|
+| KUDU-03 | Value type does not match column type | When
users encounter this error code, it is usually there are some problems on
matching the Type between value type and colum type, please check if the data
type is supported |
+| KUDU-04 | Upsert data to Kudu failed | When
users encounter this error code, it means that Kudu has some problems, please
check it whether is work
|
+| KUDU-05 | Insert data to Kudu failed | When
users encounter this error code, it means that Kudu has some problems, please
check it whether is work
|
+| KUDU-06 | Initialize the Kudu client failed | When
users encounter this error code, it is usually there are some problems with
initializing the Kudu client, please check your configuration whether correct
and connector is work |
+| KUDU-07 | Generate Kudu Parameters in the preparation phase failed | When
users encounter this error code, it means that there are some problems on Kudu
parameters generation, please check your configuration
|
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
index b00dd3232..82d924d19 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
@@ -17,8 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -75,7 +78,9 @@ public class KuduSinkConfig {
this.kuduMaster = pluginConfig.getString(KUDU_MASTER.key());
this.kuduTableName = pluginConfig.getString(KUDU_TABLE_NAME.key());
} else {
- throw new RuntimeException("Missing Sink configuration
parameters");
+ throw new
KuduConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ "Kudu", PluginType.SINK, "Missing Sink
configuration parameters"));
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorErrorCode.java
new file mode 100644
index 000000000..36f1e2658
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorErrorCode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum KuduConnectorErrorCode implements SeaTunnelErrorCode {
+ GET_KUDUSCAN_OBJECT_FAILED("KUDU-01", "Get the Kuduscan object for each
splice failed"),
+ CLOSE_KUDU_CLIENT_FAILED("KUDU-02", "Close Kudu client failed"),
+ DATA_TYPE_CAST_FILED("KUDU-03", "Value type does not match column type"),
+ KUDU_UPSERT_FAILED("KUDU-04", "Upsert data to Kudu failed"),
+ KUDU_INSERT_FAILED("KUDU-05", "Insert data to Kudu failed"),
+ INIT_KUDU_CLIENT_FAILED("KUDU-06", "Initialize the Kudu client failed"),
+ GENERATE_KUDU_PARAMETERS_FAILED("KUDU-07", "Generate Kudu Parameters in
the preparation phase failed")
+ ;
+
+
+
+ private final String code;
+
+ private final String description;
+
+ KuduConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorException.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorException.java
new file mode 100644
index 000000000..e33e501ef
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class KuduConnectorException extends SeaTunnelRuntimeException {
+ public KuduConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public KuduConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public KuduConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
index a70a19f88..3471e07fb 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
@@ -17,14 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kudu.ColumnSchema;
@@ -79,8 +81,7 @@ public class KuduInputFormat implements Serializable {
keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
columns = schema.getColumns();
} catch (KuduException e) {
- log.warn("get table Columns Schemas Fail.", e);
- throw new RuntimeException("get table Columns Schemas Fail..", e);
+ throw new
KuduConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, "get table
Columns Schemas Failed");
}
return columns;
}
@@ -116,7 +117,8 @@ public class KuduInputFormat implements Serializable {
} else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getString(i);
} else {
- throw new IllegalStateException("Unexpected value: " +
seaTunnelDataType);
+ throw new
KuduConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Unsupported data type: " + seaTunnelDataType);
}
fields.add(seatunnelField);
}
@@ -135,8 +137,8 @@ public class KuduInputFormat implements Serializable {
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
}
} catch (Exception e) {
- log.warn("get row type info exception.", e);
- throw new PrepareFailException("kudu", PluginType.SOURCE,
ExceptionUtils.getMessage(e));
+ throw new
KuduConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
+ "Kudu", PluginType.SOURCE, ExceptionUtils.getMessage(e)));
}
return new SeaTunnelRowType(fieldNames.toArray(new
String[fieldNames.size()]), seaTunnelDataTypes.toArray(new
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
}
@@ -179,8 +181,7 @@ public class KuduInputFormat implements Serializable {
kuduScanner = kuduScannerBuilder.addPredicate(lowerPred)
.addPredicate(upperPred).build();
} catch (KuduException e) {
- log.warn("get the Kuduscan object for each splice exception", e);
- throw new RuntimeException("get the Kuduscan object for each
splice exception.", e);
+ throw new
KuduConnectorException(KuduConnectorErrorCode.GET_KUDUSCAN_OBJECT_FAILED, e);
}
return kuduScanner;
}
@@ -190,8 +191,7 @@ public class KuduInputFormat implements Serializable {
try {
kuduClient.close();
} catch (KuduException e) {
- log.warn("Kudu Client close failed.", e);
- throw new RuntimeException("Kudu Client close failed.", e);
+ throw new
KuduConnectorException(KuduConnectorErrorCode.CLOSE_KUDU_CLIENT_FAILED, e);
} finally {
kuduClient = null;
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
index c356baabf..c8b5e4ca7 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
@@ -18,7 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kudu.ColumnSchema;
@@ -109,10 +112,10 @@ public class KuduOutputFormat
row.addDecimal(columnIndex, (BigDecimal)
element.getField(columnIndex));
break;
default:
- throw new IllegalArgumentException("Unsupported column
type: " + col.getType());
+ throw new
KuduConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported
column type: " + col.getType());
}
} catch (ClassCastException e) {
- throw new IllegalArgumentException(
+ throw new
KuduConnectorException(KuduConnectorErrorCode.DATA_TYPE_CAST_FILED,
"Value type does not match column type " +
col.getType() +
" for column " + col.getName());
}
@@ -128,8 +131,7 @@ public class KuduOutputFormat
try {
kuduSession.apply(upsert);
} catch (KuduException e) {
- log.error("Failed to upsert.", e);
- throw new RuntimeException("Failed to upsert.", e);
+ throw new
KuduConnectorException(KuduConnectorErrorCode.KUDU_UPSERT_FAILED, e);
}
}
@@ -141,8 +143,7 @@ public class KuduOutputFormat
try {
kuduSession.apply(insert);
} catch (KuduException e) {
- log.error("Failed to insert.", e);
- throw new RuntimeException("Failed to insert.", e);
+ throw new
KuduConnectorException(KuduConnectorErrorCode.KUDU_INSERT_FAILED, e);
}
}
@@ -155,7 +156,7 @@ public class KuduOutputFormat
upsert(element);
break;
default:
- throw new IllegalArgumentException(String.format("Unsupported
saveMode: %s.", saveMode.name()));
+ throw new
KuduConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
String.format("Unsupported saveMode: %s.", saveMode.name()));
}
}
@@ -170,8 +171,7 @@ public class KuduOutputFormat
try {
kuduTable = kuduClient.openTable(kuduTableName);
} catch (KuduException e) {
- log.error("Failed to initialize the Kudu client.", e);
- throw new RuntimeException("Failed to initialize the Kudu
client.", e);
+ throw new
KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e);
}
log.info("The Kudu client for Master: {} is initialized
successfully.", kuduMaster);
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
index cea22dfb2..940556bcc 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import org.apache.kudu.ColumnSchema;
import org.slf4j.Logger;
@@ -94,7 +96,7 @@ public class KuduTypeMapper {
case KUDU_UNKNOWN:
default:
- throw new UnsupportedOperationException(
+ throw new
KuduConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format(
"Doesn't support KUDU type '%s' .",
kuduType));
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
index b4fc5652c..6098e89b1 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.source;
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Boundedness;
@@ -27,8 +27,14 @@ import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
import
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
@@ -103,13 +109,16 @@ public class KuduSource implements
SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
String kudumaster = "";
String tableName = "";
String columnslist = "";
- if (config.hasPath(KuduSourceConfig.KUDU_MASTER.key()) &&
config.hasPath(KuduSourceConfig.TABLE_NAME.key()) &&
config.hasPath(KuduSourceConfig.COLUMNS_LIST.key())) {
+ CheckResult checkResult = CheckConfigUtil.checkAllExists(config,
KuduSourceConfig.KUDU_MASTER.key(), KuduSourceConfig.TABLE_NAME.key(),
KuduSourceConfig.COLUMNS_LIST.key());
+ if (checkResult.isSuccess()) {
kudumaster = config.getString(KuduSourceConfig.KUDU_MASTER.key());
tableName = config.getString(KuduSourceConfig.TABLE_NAME.key());
columnslist =
config.getString(KuduSourceConfig.COLUMNS_LIST.key());
kuduInputFormat = new KuduInputFormat(kudumaster, tableName,
columnslist);
} else {
- throw new RuntimeException("Missing Source configuration
parameters");
+ throw new
KuduConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SINK,
checkResult.getMsg()));
}
try {
KuduClient.KuduClientBuilder kuduClientBuilder = new
@@ -121,7 +130,7 @@ public class KuduSource implements
SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
SeaTunnelRowType seaTunnelRowType =
getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
rowTypeInfo = seaTunnelRowType;
} catch (KuduException e) {
- throw new RuntimeException("Parameters in the preparation phase
fail to be generated", e);
+ throw new
KuduConnectorException(KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED,
e);
}
}
@@ -158,7 +167,7 @@ public class KuduSource implements
SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
}
}
} catch (KuduException e) {
- throw new RuntimeException("Failed to generate upper and lower
limits for each partition", e);
+ throw new
KuduConnectorException(KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED,
"Failed to generate upper and lower limits for each partition");
}
return new PartitionParameter(keyColumn, Long.parseLong(minKey + ""),
Long.parseLong(maxKey + ""));
}
@@ -174,8 +183,8 @@ public class KuduSource implements
SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
}
} catch (Exception e) {
- log.warn("get row type info exception", e);
- throw new PrepareFailException("kudu", PluginType.SOURCE,
e.toString());
+ throw new
KuduConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
+ "Kudu", PluginType.SOURCE, ExceptionUtils.getMessage(e)));
}
return new SeaTunnelRowType(fieldNames.toArray(new
String[fieldNames.size()]), seaTunnelDataTypes.toArray(new
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
}