This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f69f773f01 [Improve] Add common error for transform (#5815)
f69f773f01 is described below
commit f69f773f01b23153dd684babecd3278b56985791
Author: Jia Fan <[email protected]>
AuthorDate: Sat Nov 11 10:18:51 2023 +0800
[Improve] Add common error for transform (#5815)
---
.../transform/copy/CopyFieldTransform.java | 34 ++-
.../exception/FieldMapperTransformException.java | 37 ---
.../exception/FilterFieldTransformErrorCode.java | 42 ---
.../transform/exception/TransformCommonError.java | 46 ++++
...rrorCode.java => TransformCommonErrorCode.java} | 10 +-
.../transform/exception/TransformException.java | 11 +-
.../fieldmapper/FieldMapperTransform.java | 21 +-
.../transform/filter/FilterFieldTransform.java | 21 +-
.../transform/jsonpath/JsonPathTransform.java | 15 +-
.../transform/replace/ReplaceTransform.java | 15 +-
.../seatunnel/transform/split/SplitTransform.java | 12 +-
.../transform/exception/TransformErrorTest.java | 291 +++++++++++++++++++++
12 files changed, 415 insertions(+), 140 deletions(-)
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
index 3455b8e8b4..1718030db6 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
@@ -25,8 +25,10 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
import java.lang.reflect.Array;
import java.util.ArrayList;
@@ -63,10 +65,11 @@ public class CopyFieldTransform extends
MultipleFieldOutputTransform {
List<SeaTunnelDataType<?>> fieldsType = new ArrayList<>();
for (Map.Entry<String, String> field : fields.entrySet()) {
String srcField = field.getValue();
- int srcFieldIndex = inputRowType.indexOf(srcField);
- if (srcFieldIndex == -1) {
- throw new IllegalArgumentException(
- "Cannot find [" + srcField + "] field in input row
type");
+ int srcFieldIndex;
+ try {
+ srcFieldIndex = inputRowType.indexOf(srcField);
+ } catch (IllegalArgumentException e) {
+ throw
TransformCommonError.cannotFindInputFieldError(getPluginName(), srcField);
}
fieldNames.add(field.getKey());
fieldOriginalIndexes.add(srcFieldIndex);
@@ -113,12 +116,15 @@ public class CopyFieldTransform extends
MultipleFieldOutputTransform {
Object[] fieldValues = new Object[fieldNames.size()];
for (int i = 0; i < fieldOriginalIndexes.size(); i++) {
fieldValues[i] =
- clone(fieldTypes.get(i),
inputRow.getField(fieldOriginalIndexes.get(i)));
+ clone(
+ fieldNames.get(i),
+ fieldTypes.get(i),
+ inputRow.getField(fieldOriginalIndexes.get(i)));
}
return fieldValues;
}
- private Object clone(SeaTunnelDataType<?> dataType, Object value) {
+ private Object clone(String field, SeaTunnelDataType<?> dataType, Object
value) {
if (value == null) {
return null;
}
@@ -147,7 +153,7 @@ public class CopyFieldTransform extends
MultipleFieldOutputTransform {
Object newArray =
Array.newInstance(arrayType.getElementType().getTypeClass(), array.length);
for (int i = 0; i < array.length; i++) {
- Array.set(newArray, i, clone(arrayType.getElementType(),
array[i]));
+ Array.set(newArray, i, clone(field,
arrayType.getElementType(), array[i]));
}
return newArray;
case MAP:
@@ -156,8 +162,8 @@ public class CopyFieldTransform extends
MultipleFieldOutputTransform {
Map<Object, Object> newMap = new HashMap<>();
for (Object key : map.keySet()) {
newMap.put(
- clone(mapType.getKeyType(), key),
- clone(mapType.getValueType(), map.get(key)));
+ clone(field, mapType.getKeyType(), key),
+ clone(field, mapType.getValueType(),
map.get(key)));
}
return newMap;
case ROW:
@@ -166,7 +172,11 @@ public class CopyFieldTransform extends
MultipleFieldOutputTransform {
Object[] newFields = new Object[rowType.getTotalFields()];
for (int i = 0; i < rowType.getTotalFields(); i++) {
- newFields[i] = clone(rowType.getFieldType(i),
row.getField(i));
+ newFields[i] =
+ clone(
+ rowType.getFieldName(i),
+ rowType.getFieldType(i),
+ row.getField(i));
}
SeaTunnelRow newRow = new SeaTunnelRow(newFields);
newRow.setRowKind(row.getRowKind());
@@ -175,8 +185,8 @@ public class CopyFieldTransform extends
MultipleFieldOutputTransform {
case NULL:
return null;
default:
- throw new UnsupportedOperationException(
- "Unsupported type: " + dataType.getSqlType());
+ throw CommonError.unsupportedDataType(
+ getPluginName(), dataType.getSqlType().toString(),
field);
}
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java
deleted file mode 100644
index d6d5ba16eb..0000000000
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform.exception;
-
-import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
-import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-
-public class FieldMapperTransformException extends SeaTunnelRuntimeException {
- public FieldMapperTransformException(
- SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
- super(seaTunnelErrorCode, errorMessage);
- }
-
- public FieldMapperTransformException(
- SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage,
Throwable cause) {
- super(seaTunnelErrorCode, errorMessage, cause);
- }
-
- public FieldMapperTransformException(SeaTunnelErrorCode
seaTunnelErrorCode, Throwable cause) {
- super(seaTunnelErrorCode, cause);
- }
-}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
deleted file mode 100644
index 59d2d07087..0000000000
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform.exception;
-
-import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
-
-public enum FilterFieldTransformErrorCode implements SeaTunnelErrorCode {
- FILTER_FIELD_NOT_FOUND("FILTER_FIELD_TRANSFORM-01", "filter field not
found");
-
- private final String code;
- private final String description;
-
- FilterFieldTransformErrorCode(String code, String description) {
- this.code = code;
- this.description = description;
- }
-
- @Override
- public String getCode() {
- return this.code;
- }
-
- @Override
- public String getDescription() {
- return this.description;
- }
-}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
new file mode 100644
index 0000000000..b35df6a448
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.common.exception.CommonError;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND;
+import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND;
+
+/** The common error of SeaTunnel transform. Please refer {@link CommonError}
*/
+public class TransformCommonError {
+
+ public static TransformException cannotFindInputFieldError(String
transform, String field) {
+ Map<String, String> params = new HashMap<>();
+ params.put("field", field);
+ params.put("transform", transform);
+ return new TransformException(INPUT_FIELD_NOT_FOUND, params);
+ }
+
+ public static TransformException cannotFindInputFieldsError(
+ String transform, List<String> fields) {
+ Map<String, String> params = new HashMap<>();
+ params.put("fields", String.join(",", fields));
+ params.put("transform", transform);
+ return new TransformException(INPUT_FIELDS_NOT_FOUND, params);
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
similarity index 74%
rename from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java
rename to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
index 5c5de9c444..4a5eea66c7 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
@@ -19,14 +19,18 @@ package org.apache.seatunnel.transform.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
-public enum FieldMapperTransformErrorCode implements SeaTunnelErrorCode {
+enum TransformCommonErrorCode implements SeaTunnelErrorCode {
INPUT_FIELD_NOT_FOUND(
- "FIELD_MAPPER_TRANSFORM-01", "field mapper input field not found
in inputRowType");
+ "TRANSFORM_COMMON-01",
+ "The input field '<field>' of '<transform>' transform not found in
upstream schema"),
+ INPUT_FIELDS_NOT_FOUND(
+ "TRANSFORM_COMMON-02",
+ "The input fields '<fields>' of '<transform>' transform not found
in upstream schema");
private final String code;
private final String description;
- FieldMapperTransformErrorCode(String code, String description) {
+ TransformCommonErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
index 8d1838473c..77467a7bd2 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
@@ -20,17 +20,14 @@ package org.apache.seatunnel.transform.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import java.util.Map;
+
public class TransformException extends SeaTunnelRuntimeException {
public TransformException(SeaTunnelErrorCode seaTunnelErrorCode, String
errorMessage) {
super(seaTunnelErrorCode, errorMessage);
}
- public TransformException(
- SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage,
Throwable cause) {
- super(seaTunnelErrorCode, errorMessage, cause);
- }
-
- public TransformException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable
cause) {
- super(seaTunnelErrorCode, cause);
+ TransformException(SeaTunnelErrorCode seaTunnelErrorCode, Map<String,
String> params) {
+ super(seaTunnelErrorCode, params);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
index 6d4e312f20..037d4ba742 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
@@ -27,9 +27,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
-import org.apache.seatunnel.transform.exception.FieldMapperTransformErrorCode;
-import org.apache.seatunnel.transform.exception.FieldMapperTransformException;
-import org.apache.seatunnel.transform.exception.TransformException;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
import org.apache.commons.collections4.CollectionUtils;
@@ -56,11 +54,18 @@ public class FieldMapperTransform extends
AbstractCatalogSupportTransform {
SeaTunnelRowType seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
List<String> notFoundField =
fieldMapper.keySet().stream()
- .filter(field -> seaTunnelRowType.indexOf(field) == -1)
+ .filter(
+ field -> {
+ try {
+ seaTunnelRowType.indexOf(field);
+ return false;
+ } catch (Exception e) {
+ return true;
+ }
+ })
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(notFoundField)) {
- throw new TransformException(
- FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND,
notFoundField.toString());
+ throw
TransformCommonError.cannotFindInputFieldsError(getPluginName(), notFoundField);
}
}
@@ -97,9 +102,7 @@ public class FieldMapperTransform extends
AbstractCatalogSupportTransform {
(key, value) -> {
int fieldIndex = inputFieldNames.indexOf(key);
if (fieldIndex < 0) {
- throw new FieldMapperTransformException(
-
FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND,
- "Can not found field " + key + " from
inputRowType");
+ throw
TransformCommonError.cannotFindInputFieldError(getPluginName(), key);
}
Column oldColumn = inputColumns.get(fieldIndex);
PhysicalColumn outputColumn =
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
index 0105149036..aaf3168e1b 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
@@ -27,8 +27,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
-import org.apache.seatunnel.transform.exception.FilterFieldTransformErrorCode;
-import org.apache.seatunnel.transform.exception.TransformException;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
import org.apache.commons.collections4.CollectionUtils;
@@ -53,13 +52,20 @@ public class FilterFieldTransform extends
AbstractCatalogSupportTransform {
fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new
String[0]);
List<String> canNotFoundFields =
Arrays.stream(fields)
- .filter(field -> seaTunnelRowType.indexOf(field) == -1)
+ .filter(
+ field -> {
+ try {
+ seaTunnelRowType.indexOf(field);
+ return false;
+ } catch (Exception e) {
+ return true;
+ }
+ })
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(canNotFoundFields)) {
- throw new TransformException(
- FilterFieldTransformErrorCode.FILTER_FIELD_NOT_FOUND,
- canNotFoundFields.toString());
+ throw TransformCommonError.cannotFindInputFieldsError(
+ getPluginName(), canNotFoundFields);
}
}
@@ -92,8 +98,7 @@ public class FilterFieldTransform extends
AbstractCatalogSupportTransform {
String field = filterFields.get(i);
int inputFieldIndex = seaTunnelRowType.indexOf(field);
if (inputFieldIndex == -1) {
- throw new IllegalArgumentException(
- "Cannot find [" + field + "] field in input row type");
+ throw
TransformCommonError.cannotFindInputFieldError(getPluginName(), field);
}
inputValueIndex[i] = inputFieldIndex;
outputColumns.add(
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
index 874d29da8b..6f5397e887 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
@@ -24,10 +24,12 @@ import
org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.format.json.JsonToRowConverters;
import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
import org.apache.seatunnel.transform.exception.TransformException;
import com.jayway.jsonpath.JsonPath;
@@ -42,7 +44,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.JSON_PATH_COMPILE_ERROR;
-import static
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.SRC_FIELD_NOT_FOUND;
@Slf4j
public class JsonPathTransform extends MultipleFieldOutputTransform {
@@ -108,10 +109,7 @@ public class JsonPathTransform extends
MultipleFieldOutputTransform {
ColumnConfig columnConfig = columnConfigs.get(i);
String srcField = columnConfig.getSrcField();
if (!fieldNameSet.contains(srcField)) {
- throw new TransformException(
- SRC_FIELD_NOT_FOUND,
- String.format(
- "JsonPathTransform config not found
src_field:[%s]", srcField));
+ throw
TransformCommonError.cannotFindInputFieldError(getPluginName(), srcField);
}
this.srcFieldIndexArr[i] = seaTunnelRowType.indexOf(srcField);
}
@@ -161,9 +159,10 @@ public class JsonPathTransform extends
MultipleFieldOutputTransform {
jsonString = JsonUtils.toJsonString(row.getFields());
break;
default:
- throw new UnsupportedOperationException(
- "JsonPathTransform unsupported sourceDataType: "
- + inputDataType.getSqlType());
+ throw CommonError.unsupportedDataType(
+ getPluginName(),
+ inputDataType.getSqlType().toString(),
+ columnConfig.getSrcField());
}
Object result =
JSON_PATH_CACHE.get(columnConfig.getPath()).read(jsonString);
JsonNode jsonNode = JsonUtils.toJsonNode(result);
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
index a99aab49b0..5c5451fce7 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
import org.apache.commons.collections4.CollectionUtils;
@@ -50,10 +51,10 @@ public class ReplaceTransform extends
SingleFieldOutputTransform {
}
private void initOutputFields(SeaTunnelRowType inputRowType, String
replaceField) {
- inputFieldIndex = inputRowType.indexOf(replaceField);
- if (inputFieldIndex == -1) {
- throw new IllegalArgumentException(
- "Cannot find [" + replaceField + "] field in input row
type");
+ try {
+ inputFieldIndex = inputRowType.indexOf(replaceField);
+ } catch (IllegalArgumentException e) {
+ throw
TransformCommonError.cannotFindInputFieldError(getPluginName(), replaceField);
}
}
@@ -102,10 +103,8 @@ public class ReplaceTransform extends
SingleFieldOutputTransform {
.KEY_REPLACE_FIELD)))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(collect)) {
- throw new IllegalArgumentException(
- "Cannot find ["
- +
config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD)
- + "] field in input catalog table");
+ throw TransformCommonError.cannotFindInputFieldError(
+ getPluginName(),
config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD));
}
return collect.get(0).copy();
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
index 46c38639fd..c1ead2dd0b 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
import lombok.NonNull;
@@ -39,12 +40,11 @@ public class SplitTransform extends
MultipleFieldOutputTransform {
super(catalogTable);
this.splitTransformConfig = splitTransformConfig;
SeaTunnelRowType seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
- splitFieldIndex =
seaTunnelRowType.indexOf(splitTransformConfig.getSplitField());
- if (splitFieldIndex == -1) {
- throw new IllegalArgumentException(
- "Cannot find ["
- + splitTransformConfig.getSplitField()
- + "] field in input row type");
+ try {
+ splitFieldIndex =
seaTunnelRowType.indexOf(splitTransformConfig.getSplitField());
+ } catch (IllegalArgumentException e) {
+ throw TransformCommonError.cannotFindInputFieldError(
+ getPluginName(), splitTransformConfig.getSplitField());
}
this.outputCatalogTable = getProducedCatalogTable();
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java
new file mode 100644
index 0000000000..c305511809
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.transform.copy.CopyFieldTransformFactory;
+import org.apache.seatunnel.transform.copy.CopyTransformConfig;
+import org.apache.seatunnel.transform.fieldmapper.FieldMapperTransformConfig;
+import org.apache.seatunnel.transform.fieldmapper.FieldMapperTransformFactory;
+import org.apache.seatunnel.transform.filter.FilterFieldTransformConfig;
+import org.apache.seatunnel.transform.filter.FilterFieldTransformFactory;
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransformConfig;
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransformFactory;
+import org.apache.seatunnel.transform.replace.ReplaceTransformConfig;
+import org.apache.seatunnel.transform.replace.ReplaceTransformFactory;
+import org.apache.seatunnel.transform.split.SplitTransformConfig;
+import org.apache.seatunnel.transform.split.SplitTransformFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TransformErrorTest {
+
+ private static final CatalogTable table =
+ CatalogTableUtil.getCatalogTable(
+ "test",
+ "test",
+ "test",
+ "test",
+ new SeaTunnelRowType(
+ new String[] {"name"},
+ new SeaTunnelDataType[] {BasicType.STRING_TYPE}));
+
+ @Test
+ void testFieldMapperTransformWithError() {
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+
FieldMapperTransformConfig.FIELD_MAPPER.key(),
+ new HashMap<String, String>() {
+ {
+ put("age", "age1");
+ }
+ });
+ }
+ });
+ TableTransformFactoryContext context =
+ new TableTransformFactoryContext(
+ Collections.singletonList(table),
+ config,
+ Thread.currentThread().getContextClassLoader());
+ TransformException exception =
+ Assertions.assertThrows(
+ TransformException.class,
+ () ->
+ new FieldMapperTransformFactory()
+ .createTransform(context)
+ .createTransform());
+ Assertions.assertEquals(
+ "ErrorCode:[TRANSFORM_COMMON-02], ErrorDescription:[The input
fields 'age' of 'FieldMapper' transform not found in upstream schema]",
+ exception.getMessage());
+ }
+
+ @Test
+ void testCopyTransformWithError() {
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ CopyTransformConfig.FIELDS.key(),
+ new HashMap<String, String>() {
+ {
+ put("ageA", "age1");
+ }
+ });
+ }
+ });
+ TableTransformFactoryContext context =
+ new TableTransformFactoryContext(
+ Collections.singletonList(table),
+ config,
+ Thread.currentThread().getContextClassLoader());
+ TransformException exception =
+ Assertions.assertThrows(
+ TransformException.class,
+ () ->
+ new CopyFieldTransformFactory()
+ .createTransform(context)
+ .createTransform());
+ Assertions.assertEquals(
+ "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input
field 'age1' of 'Copy' transform not found in upstream schema]",
+ exception.getMessage());
+
+ ReadonlyConfig config2 =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(CopyTransformConfig.SRC_FIELD.key(),
"ageB");
+ put(CopyTransformConfig.DEST_FIELD.key(),
"age1");
+ }
+ });
+ TableTransformFactoryContext context2 =
+ new TableTransformFactoryContext(
+ Collections.singletonList(table),
+ config2,
+ Thread.currentThread().getContextClassLoader());
+ TransformException exception2 =
+ Assertions.assertThrows(
+ TransformException.class,
+ () ->
+ new CopyFieldTransformFactory()
+ .createTransform(context2)
+ .createTransform());
+ Assertions.assertEquals(
+ "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input
field 'ageB' of 'Copy' transform not found in upstream schema]",
+ exception2.getMessage());
+ }
+
+ @Test
+ void testFilterTransformWithError() {
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+
FilterFieldTransformConfig.KEY_FIELDS.key(),
+ new ArrayList<String>() {
+ {
+ add("age");
+ add("gender");
+ }
+ });
+ }
+ });
+ TableTransformFactoryContext context =
+ new TableTransformFactoryContext(
+ Collections.singletonList(table),
+ config,
+ Thread.currentThread().getContextClassLoader());
+ TransformException exception =
+ Assertions.assertThrows(
+ TransformException.class,
+ () ->
+ new FilterFieldTransformFactory()
+ .createTransform(context)
+ .createTransform());
+ Assertions.assertEquals(
+ "ErrorCode:[TRANSFORM_COMMON-02], ErrorDescription:[The input
fields 'age,gender' of 'Filter' transform not found in upstream schema]",
+ exception.getMessage());
+ }
+
+ @Test
+ void testJsonPathTransformWithError() {
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ new ArrayList<Map<String, String>>() {
+ {
+ add(
+ new HashMap<String,
String>() {
+ {
+ put(
+
JsonPathTransformConfig.PATH
+
.key(),
+
"path");
+ put(
+
JsonPathTransformConfig
+
.SRC_FIELD
+
.key(),
+ "age");
+ put(
+
JsonPathTransformConfig
+
.DEST_FIELD
+
.key(),
+
"age2");
+ }
+ });
+ }
+ });
+ }
+ });
+ TableTransformFactoryContext context =
+ new TableTransformFactoryContext(
+ Collections.singletonList(table),
+ config,
+ Thread.currentThread().getContextClassLoader());
+ TransformException exception =
+ Assertions.assertThrows(
+ TransformException.class,
+ () ->
+ new JsonPathTransformFactory()
+ .createTransform(context)
+ .createTransform());
+ Assertions.assertEquals(
+ "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input
field 'age' of 'JsonPath' transform not found in upstream schema]",
+ exception.getMessage());
+ }
+
+ @Test
+ void testReplaceTransformWithError() {
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+
put(ReplaceTransformConfig.KEY_REPLACE_FIELD.key(), "age");
+ put(ReplaceTransformConfig.KEY_PATTERN.key(),
"1");
+
put(ReplaceTransformConfig.KEY_REPLACEMENT.key(), "2");
+ put(ReplaceTransformConfig.KEY_IS_REGEX.key(),
"false");
+
put(ReplaceTransformConfig.KEY_REPLACE_FIRST.key(), "false");
+ }
+ });
+ TableTransformFactoryContext context =
+ new TableTransformFactoryContext(
+ Collections.singletonList(table),
+ config,
+ Thread.currentThread().getContextClassLoader());
+ TransformException exception =
+ Assertions.assertThrows(
+ TransformException.class,
+ () ->
+ new ReplaceTransformFactory()
+ .createTransform(context)
+ .createTransform());
+ Assertions.assertEquals(
+ "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input
field 'age' of 'Replace' transform not found in upstream schema]",
+ exception.getMessage());
+ }
+
+ @Test
+ void testSplitTransformWithError() {
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+
put(SplitTransformConfig.KEY_SPLIT_FIELD.key(), "age");
+ put(
+
SplitTransformConfig.KEY_OUTPUT_FIELDS.key(),
+ Arrays.asList("age1", "age2"));
+ put(SplitTransformConfig.KEY_SEPARATOR.key(),
",");
+ }
+ });
+ TableTransformFactoryContext context =
+ new TableTransformFactoryContext(
+ Collections.singletonList(table),
+ config,
+ Thread.currentThread().getContextClassLoader());
+ TransformException exception =
+ Assertions.assertThrows(
+ TransformException.class,
+ () ->
+ new SplitTransformFactory()
+ .createTransform(context)
+ .createTransform());
+ Assertions.assertEquals(
+ "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input
field 'age' of 'Split' transform not found in upstream schema]",
+ exception.getMessage());
+ }
+}