This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b2fda11dd [Improve][Connector-V2][Hudi] Unified exception for hudi
source connector (#3581)
b2fda11dd is described below
commit b2fda11ddc4154f91ccbe5318478dce093cb6767
Author: ChunFuWu <[email protected]>
AuthorDate: Sun Nov 27 13:29:26 2022 +0800
[Improve][Connector-V2][Hudi] Unified exception for hudi source connector
(#3581)
* [Improve][Connector-V2][Hudi] Unified exception for hudi source connector
* [Improve][Connector-V2][Hudi] Unified exception for hudi source connector
---
.../connector-v2/Error-Quick-Reference-Manual.md | 9 ++++--
.../common/exception/CommonErrorCode.java | 3 +-
...nException.java => HudiConnectorException.java} | 19 +++++++++----
.../seatunnel/hudi/source/HudiSource.java | 32 +++++++++++++++-------
.../seatunnel/hudi/source/HudiSourceReader.java | 13 +++++----
.../seatunnel/hudi/source/HudiSourceSplit.java | 4 +--
.../hudi/source/HudiSourceSplitEnumerator.java | 15 +++++-----
.../seatunnel/hudi/source/HudiSourceState.java | 3 +-
.../connectors/seatunnel/hudi/util/HudiUtil.java | 14 ++++++----
9 files changed, 69 insertions(+), 43 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 502905018..ac72f52f1 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -88,7 +88,6 @@ This document records some common error codes and
corresponding solutions of Sea
|----------------|---------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------|
| TABLESTORE-01 | Failed to send these rows of data | When users
encounter this error code, it means that failed to write these rows of data,
please check the rows that failed to import |
-
## Hive Connector Error Codes
| code | description |
solution
|
@@ -135,7 +134,6 @@ This document records some common error codes and
corresponding solutions of Sea
| IOTDB-02 | Initialize IoTDB client failed | When
the user encounters this error code, it indicates that the client
initialization failed. Please check |
| IOTDB-03 | Close IoTDB client failed | When
the user encounters this error code, it indicates that closing the client
failed. Please check |
-
## File Connector Error Codes
| code | description | solution
|
@@ -143,3 +141,10 @@ This document records some common error codes and
corresponding solutions of Sea
| FILE-01 | File type is invalid | When users encounter this error
code, it means that the this file is not the format that user assigned, please
check it |
| FILE-02 | Data deserialization failed | When users encounter this error
code, it means that data from files not satisfied the schema that user
assigned, please check data from files whether is correct |
| FILE-03 | Get file list failed | When users encounter this error
code, it means that connector try to traverse the path and get file list
failed, please check file system whether is work |
+
+## Hudi Connector Error Codes
+
+| code | description | solution
|
+|---------|-------------------------------|-----------------------------------------------------------------------------------------------------------|
+| HUDI-01 | Create ParquetMetadata failed | When the user encounters this
error code, it indicates that ParquetMetadata creation failed. Please check |
+| HUDI-02 | Kerberos Authorized failed | When the user encounters this
error code, it indicates that Kerberos authorization failed. Please check |
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 89b382fe8..924437e11 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -30,7 +30,8 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
FLUSH_DATA_FAILED("COMMON-10", "Flush data operation that in sink
connector failed"),
WRITER_OPERATION_FAILED("COMMON-11", "Sink writer operation failed, such
as (open, close) etc..."),
READER_OPERATION_FAILED("COMMON-12", "Source reader operation failed, such
as (open, close) etc..."),
- HTTP_OPERATION_FAILED("COMMON-13", "Http operation failed, such as (open,
close, response) etc...");
+ HTTP_OPERATION_FAILED("COMMON-13", "Http operation failed, such as (open,
close, response) etc..."),
+ KERBEROS_AUTHORIZED_FAILED("COMMON-14", "Kerberos authorized failed");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/exception/HudiPluginException.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/exception/HudiConnectorException.java
similarity index 53%
rename from
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/exception/HudiPluginException.java
rename to
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/exception/HudiConnectorException.java
index 6971e029e..7fcea3ae8 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/exception/HudiPluginException.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/exception/HudiConnectorException.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,20 @@
package org.apache.seatunnel.connectors.seatunnel.hudi.exception;
-public class HudiPluginException extends Exception{
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
- public HudiPluginException(String message) {
- super(message);
+public class HudiConnectorException extends SeaTunnelRuntimeException {
+
+ public HudiConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public HudiConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
}
- public HudiPluginException(String message, Throwable cause) {
- super(message, cause);
+ public HudiConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
}
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
index f1cbb619a..4b2f11e75 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
@@ -24,7 +24,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceCo
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.TABLE_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.USE_KERBEROS;
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
@@ -35,7 +35,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
-import
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiPluginException;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -66,13 +67,19 @@ public class HudiSource implements
SeaTunnelSource<SeaTunnelRow, HudiSourceSplit
public void prepare(Config pluginConfig) {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
TABLE_PATH, CONF_FILES);
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
HudiConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SOURCE, result.getMsg())
+ );
}
// default hudi table tupe is cow
// TODO: support hudi mor table
// TODO: support Incremental Query and Read Optimized Query
if (!"cow".equalsIgnoreCase(pluginConfig.getString(TABLE_TYPE))) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Do not support hudi mor table yet!");
+ throw new
HudiConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SOURCE, "Do not support hudi
mor table yet!")
+ );
}
try {
this.confFiles = pluginConfig.getString(CONF_FILES);
@@ -82,22 +89,27 @@ public class HudiSource implements
SeaTunnelSource<SeaTunnelRow, HudiSourceSplit
if (this.useKerberos) {
CheckResult kerberosCheckResult =
CheckConfigUtil.checkAllExists(pluginConfig, KERBEROS_PRINCIPAL,
KERBEROS_PRINCIPAL_FILE);
if (!kerberosCheckResult.isSuccess()) {
- throw new PrepareFailException(getPluginName(),
PluginType.SOURCE, result.getMsg());
+ throw new
HudiConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s,
Message: %s",
+ getPluginName(), PluginType.SOURCE,
result.getMsg())
+ );
}
HudiUtil.initKerberosAuthentication(HudiUtil.getConfiguration(this.confFiles),
pluginConfig.getString(KERBEROS_PRINCIPAL),
pluginConfig.getString(KERBEROS_PRINCIPAL_FILE));
}
}
this.filePath = HudiUtil.getParquetFileByPath(this.confFiles,
tablePath);
if (this.filePath == null) {
- throw new HudiPluginException(String.format("%s has no parquet
file, please check!", tablePath));
+ throw new
HudiConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+ String.format("%s has no parquet file, please check!",
tablePath));
}
// should read from config or read from hudi metadata( wait catlog
done)
this.typeInfo = HudiUtil.getSeaTunnelRowTypeInfo(this.confFiles,
this.filePath);
-
- } catch (HudiPluginException | IOException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Prepare HudiSource error.", e);
+ } catch (HudiConnectorException | IOException e) {
+ throw new
HudiConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SOURCE, result.getMsg())
+ );
}
-
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java
index 15127ae5b..52ff5059d 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
import org.apache.commons.lang3.StringUtils;
@@ -47,13 +49,13 @@ public class HudiSourceReader implements
SourceReader<SeaTunnelRow, HudiSourceSp
private static final long THREAD_WAIT_TIME = 500L;
- private String confPaths;
+ private final String confPaths;
- private Set<HudiSourceSplit> sourceSplits;
+ private final Set<HudiSourceSplit> sourceSplits;
private final SourceReader.Context context;
- private SeaTunnelRowType seaTunnelRowType;
+ private final SeaTunnelRowType seaTunnelRowType;
public HudiSourceReader(String confPaths, SourceReader.Context context,
SeaTunnelRowType seaTunnelRowType) {
this.confPaths = confPaths;
@@ -86,7 +88,7 @@ public class HudiSourceReader implements
SourceReader<SeaTunnelRow, HudiSourceSp
ParquetHiveSerDe serde = new ParquetHiveSerDe();
Properties properties = new Properties();
List<String> types = new ArrayList<>();
- for (SeaTunnelDataType<?> type:
seaTunnelRowType.getFieldTypes()) {
+ for (SeaTunnelDataType<?> type :
seaTunnelRowType.getFieldTypes()) {
types.add(type.getSqlType().name());
}
String columns =
StringUtils.join(seaTunnelRowType.getFieldNames(), ",");
@@ -112,9 +114,8 @@ public class HudiSourceReader implements
SourceReader<SeaTunnelRow, HudiSourceSp
}
reader.close();
} catch (Exception e) {
- throw new RuntimeException("Hudi source read error", e);
+ throw new
HudiConnectorException(CommonErrorCode.READER_OPERATION_FAILED, e);
}
-
});
context.signalNoMoreElement();
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java
index b08f6f68e..d3c8bbb4f 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java
@@ -25,9 +25,9 @@ public class HudiSourceSplit implements SourceSplit {
private static final long serialVersionUID = -1L;
- private String splitId;
+ private final String splitId;
- private InputSplit inputSplit;
+ private final InputSplit inputSplit;
public HudiSourceSplit(String splitId, InputSplit inputSplit) {
this.splitId = splitId;
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java
index 0b2f998a1..f9f372e5c 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java
@@ -43,8 +43,8 @@ public class HudiSourceSplitEnumerator implements
SourceSplitEnumerator<HudiSour
private final Context<HudiSourceSplit> context;
private Set<HudiSourceSplit> pendingSplit;
private Set<HudiSourceSplit> assignedSplit;
- private String tablePath;
- private String confPaths;
+ private final String tablePath;
+ private final String confPaths;
public
HudiSourceSplitEnumerator(SourceSplitEnumerator.Context<HudiSourceSplit>
context, String tablePath, String confPaths) {
this.context = context;
@@ -79,7 +79,7 @@ public class HudiSourceSplitEnumerator implements
SourceSplitEnumerator<HudiSour
FileInputFormat.setInputPaths(jobConf, path);
HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat();
inputFormat.setConf(jobConf);
- for (InputSplit split: inputFormat.getSplits(jobConf, 0)) {
+ for (InputSplit split : inputFormat.getSplits(jobConf, 0)) {
hudiSourceSplits.add(new HudiSourceSplit(split.toString(), split));
}
return hudiSourceSplits;
@@ -98,14 +98,13 @@ public class HudiSourceSplitEnumerator implements
SourceSplitEnumerator<HudiSour
}
}
- private void assignSplit(Collection<Integer> taskIDList) {
+ private void assignSplit(Collection<Integer> taskIdList) {
Map<Integer, List<HudiSourceSplit>> readySplit = new
HashMap<>(Common.COLLECTION_SIZE);
- for (int taskID : taskIDList) {
- readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
+ for (int taskId : taskIdList) {
+ readySplit.computeIfAbsent(taskId, id -> new ArrayList<>());
}
- pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(),
taskIDList.size()))
- .add(s));
+ pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(),
taskIdList.size())).add(s));
readySplit.forEach(context::assignSplit);
assignedSplit.addAll(pendingSplit);
pendingSplit.clear();
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java
index 6235ca694..2dbb0172a 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java
@@ -22,8 +22,7 @@ import java.util.Set;
public class HudiSourceState implements Serializable {
-
- private Set<HudiSourceSplit> assignedSplit;
+ private final Set<HudiSourceSplit> assignedSplit;
public HudiSourceState(Set<HudiSourceSplit> assignedSplit) {
this.assignedSplit = assignedSplit;
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
index 346232b95..a33aea79b 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
@@ -22,7 +22,8 @@ import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FI
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiPluginException;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -68,14 +69,15 @@ public class HudiUtil {
return null;
}
- public static SeaTunnelRowType getSeaTunnelRowTypeInfo(String confPaths,
String path) throws HudiPluginException {
+ public static SeaTunnelRowType getSeaTunnelRowTypeInfo(String confPaths,
String path) throws HudiConnectorException {
Configuration configuration = getConfiguration(confPaths);
Path dstDir = new Path(path);
ParquetMetadata footer;
try {
footer = ParquetFileReader.readFooter(configuration, dstDir,
NO_FILTER);
} catch (IOException e) {
- throw new HudiPluginException("Create ParquetMetadata Fail!", e);
+ throw new
HudiConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
+ "Create ParquetMetadata Fail!", e);
}
MessageType schema = footer.getFileMetaData().getSchema();
String[] fields = new String[schema.getFields().size()];
@@ -95,14 +97,14 @@ public class HudiUtil {
return new JobConf(conf);
}
- public static void initKerberosAuthentication(Configuration conf, String
principal, String principalFile) throws HudiPluginException {
+ public static void initKerberosAuthentication(Configuration conf, String
principal, String principalFile) throws HudiConnectorException {
try {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(principal, principalFile);
} catch (IOException e) {
- throw new HudiPluginException("Kerberos Authorized Fail!", e);
+ throw new
HudiConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
+ "Kerberos Authorized Fail!", e);
}
-
}
}