This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f69f773f01 [Improve] Add common error for transform (#5815)
f69f773f01 is described below

commit f69f773f01b23153dd684babecd3278b56985791
Author: Jia Fan <[email protected]>
AuthorDate: Sat Nov 11 10:18:51 2023 +0800

    [Improve] Add common error for transform (#5815)
---
 .../transform/copy/CopyFieldTransform.java         |  34 ++-
 .../exception/FieldMapperTransformException.java   |  37 ---
 .../exception/FilterFieldTransformErrorCode.java   |  42 ---
 .../transform/exception/TransformCommonError.java  |  46 ++++
 ...rrorCode.java => TransformCommonErrorCode.java} |  10 +-
 .../transform/exception/TransformException.java    |  11 +-
 .../fieldmapper/FieldMapperTransform.java          |  21 +-
 .../transform/filter/FilterFieldTransform.java     |  21 +-
 .../transform/jsonpath/JsonPathTransform.java      |  15 +-
 .../transform/replace/ReplaceTransform.java        |  15 +-
 .../seatunnel/transform/split/SplitTransform.java  |  12 +-
 .../transform/exception/TransformErrorTest.java    | 291 +++++++++++++++++++++
 12 files changed, 415 insertions(+), 140 deletions(-)

diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
index 3455b8e8b4..1718030db6 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java
@@ -25,8 +25,10 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
 import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
 
 import java.lang.reflect.Array;
 import java.util.ArrayList;
@@ -63,10 +65,11 @@ public class CopyFieldTransform extends 
MultipleFieldOutputTransform {
         List<SeaTunnelDataType<?>> fieldsType = new ArrayList<>();
         for (Map.Entry<String, String> field : fields.entrySet()) {
             String srcField = field.getValue();
-            int srcFieldIndex = inputRowType.indexOf(srcField);
-            if (srcFieldIndex == -1) {
-                throw new IllegalArgumentException(
-                        "Cannot find [" + srcField + "] field in input row 
type");
+            int srcFieldIndex;
+            try {
+                srcFieldIndex = inputRowType.indexOf(srcField);
+            } catch (IllegalArgumentException e) {
+                throw 
TransformCommonError.cannotFindInputFieldError(getPluginName(), srcField);
             }
             fieldNames.add(field.getKey());
             fieldOriginalIndexes.add(srcFieldIndex);
@@ -113,12 +116,15 @@ public class CopyFieldTransform extends 
MultipleFieldOutputTransform {
         Object[] fieldValues = new Object[fieldNames.size()];
         for (int i = 0; i < fieldOriginalIndexes.size(); i++) {
             fieldValues[i] =
-                    clone(fieldTypes.get(i), 
inputRow.getField(fieldOriginalIndexes.get(i)));
+                    clone(
+                            fieldNames.get(i),
+                            fieldTypes.get(i),
+                            inputRow.getField(fieldOriginalIndexes.get(i)));
         }
         return fieldValues;
     }
 
-    private Object clone(SeaTunnelDataType<?> dataType, Object value) {
+    private Object clone(String field, SeaTunnelDataType<?> dataType, Object 
value) {
         if (value == null) {
             return null;
         }
@@ -147,7 +153,7 @@ public class CopyFieldTransform extends 
MultipleFieldOutputTransform {
                 Object newArray =
                         
Array.newInstance(arrayType.getElementType().getTypeClass(), array.length);
                 for (int i = 0; i < array.length; i++) {
-                    Array.set(newArray, i, clone(arrayType.getElementType(), 
array[i]));
+                    Array.set(newArray, i, clone(field, 
arrayType.getElementType(), array[i]));
                 }
                 return newArray;
             case MAP:
@@ -156,8 +162,8 @@ public class CopyFieldTransform extends 
MultipleFieldOutputTransform {
                 Map<Object, Object> newMap = new HashMap<>();
                 for (Object key : map.keySet()) {
                     newMap.put(
-                            clone(mapType.getKeyType(), key),
-                            clone(mapType.getValueType(), map.get(key)));
+                            clone(field, mapType.getKeyType(), key),
+                            clone(field, mapType.getValueType(), 
map.get(key)));
                 }
                 return newMap;
             case ROW:
@@ -166,7 +172,11 @@ public class CopyFieldTransform extends 
MultipleFieldOutputTransform {
 
                 Object[] newFields = new Object[rowType.getTotalFields()];
                 for (int i = 0; i < rowType.getTotalFields(); i++) {
-                    newFields[i] = clone(rowType.getFieldType(i), 
row.getField(i));
+                    newFields[i] =
+                            clone(
+                                    rowType.getFieldName(i),
+                                    rowType.getFieldType(i),
+                                    row.getField(i));
                 }
                 SeaTunnelRow newRow = new SeaTunnelRow(newFields);
                 newRow.setRowKind(row.getRowKind());
@@ -175,8 +185,8 @@ public class CopyFieldTransform extends 
MultipleFieldOutputTransform {
             case NULL:
                 return null;
             default:
-                throw new UnsupportedOperationException(
-                        "Unsupported type: " + dataType.getSqlType());
+                throw CommonError.unsupportedDataType(
+                        getPluginName(), dataType.getSqlType().toString(), 
field);
         }
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java
deleted file mode 100644
index d6d5ba16eb..0000000000
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform.exception;
-
-import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
-import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-
-public class FieldMapperTransformException extends SeaTunnelRuntimeException {
-    public FieldMapperTransformException(
-            SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
-        super(seaTunnelErrorCode, errorMessage);
-    }
-
-    public FieldMapperTransformException(
-            SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, 
Throwable cause) {
-        super(seaTunnelErrorCode, errorMessage, cause);
-    }
-
-    public FieldMapperTransformException(SeaTunnelErrorCode 
seaTunnelErrorCode, Throwable cause) {
-        super(seaTunnelErrorCode, cause);
-    }
-}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
deleted file mode 100644
index 59d2d07087..0000000000
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform.exception;
-
-import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
-
-public enum FilterFieldTransformErrorCode implements SeaTunnelErrorCode {
-    FILTER_FIELD_NOT_FOUND("FILTER_FIELD_TRANSFORM-01", "filter field not 
found");
-
-    private final String code;
-    private final String description;
-
-    FilterFieldTransformErrorCode(String code, String description) {
-        this.code = code;
-        this.description = description;
-    }
-
-    @Override
-    public String getCode() {
-        return this.code;
-    }
-
-    @Override
-    public String getDescription() {
-        return this.description;
-    }
-}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
new file mode 100644
index 0000000000..b35df6a448
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.common.exception.CommonError;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND;
+import static 
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND;
+
+/** The common error of SeaTunnel transform. Please refer {@link CommonError} 
*/
+public class TransformCommonError {
+
+    public static TransformException cannotFindInputFieldError(String 
transform, String field) {
+        Map<String, String> params = new HashMap<>();
+        params.put("field", field);
+        params.put("transform", transform);
+        return new TransformException(INPUT_FIELD_NOT_FOUND, params);
+    }
+
+    public static TransformException cannotFindInputFieldsError(
+            String transform, List<String> fields) {
+        Map<String, String> params = new HashMap<>();
+        params.put("fields", String.join(",", fields));
+        params.put("transform", transform);
+        return new TransformException(INPUT_FIELDS_NOT_FOUND, params);
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
similarity index 74%
rename from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java
rename to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
index 5c5de9c444..4a5eea66c7 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
@@ -19,14 +19,18 @@ package org.apache.seatunnel.transform.exception;
 
 import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
 
-public enum FieldMapperTransformErrorCode implements SeaTunnelErrorCode {
+enum TransformCommonErrorCode implements SeaTunnelErrorCode {
     INPUT_FIELD_NOT_FOUND(
-            "FIELD_MAPPER_TRANSFORM-01", "field mapper input field not found 
in inputRowType");
+            "TRANSFORM_COMMON-01",
+            "The input field '<field>' of '<transform>' transform not found in 
upstream schema"),
+    INPUT_FIELDS_NOT_FOUND(
+            "TRANSFORM_COMMON-02",
+            "The input fields '<fields>' of '<transform>' transform not found 
in upstream schema");
 
     private final String code;
     private final String description;
 
-    FieldMapperTransformErrorCode(String code, String description) {
+    TransformCommonErrorCode(String code, String description) {
         this.code = code;
         this.description = description;
     }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
index 8d1838473c..77467a7bd2 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
@@ -20,17 +20,14 @@ package org.apache.seatunnel.transform.exception;
 import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
+import java.util.Map;
+
 public class TransformException extends SeaTunnelRuntimeException {
     public TransformException(SeaTunnelErrorCode seaTunnelErrorCode, String 
errorMessage) {
         super(seaTunnelErrorCode, errorMessage);
     }
 
-    public TransformException(
-            SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, 
Throwable cause) {
-        super(seaTunnelErrorCode, errorMessage, cause);
-    }
-
-    public TransformException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable 
cause) {
-        super(seaTunnelErrorCode, cause);
+    TransformException(SeaTunnelErrorCode seaTunnelErrorCode, Map<String, 
String> params) {
+        super(seaTunnelErrorCode, params);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
index 6d4e312f20..037d4ba742 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
@@ -27,9 +27,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
-import org.apache.seatunnel.transform.exception.FieldMapperTransformErrorCode;
-import org.apache.seatunnel.transform.exception.FieldMapperTransformException;
-import org.apache.seatunnel.transform.exception.TransformException;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
 
 import org.apache.commons.collections4.CollectionUtils;
 
@@ -56,11 +54,18 @@ public class FieldMapperTransform extends 
AbstractCatalogSupportTransform {
         SeaTunnelRowType seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
         List<String> notFoundField =
                 fieldMapper.keySet().stream()
-                        .filter(field -> seaTunnelRowType.indexOf(field) == -1)
+                        .filter(
+                                field -> {
+                                    try {
+                                        seaTunnelRowType.indexOf(field);
+                                        return false;
+                                    } catch (Exception e) {
+                                        return true;
+                                    }
+                                })
                         .collect(Collectors.toList());
         if (!CollectionUtils.isEmpty(notFoundField)) {
-            throw new TransformException(
-                    FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND, 
notFoundField.toString());
+            throw 
TransformCommonError.cannotFindInputFieldsError(getPluginName(), notFoundField);
         }
     }
 
@@ -97,9 +102,7 @@ public class FieldMapperTransform extends 
AbstractCatalogSupportTransform {
                 (key, value) -> {
                     int fieldIndex = inputFieldNames.indexOf(key);
                     if (fieldIndex < 0) {
-                        throw new FieldMapperTransformException(
-                                
FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND,
-                                "Can not found field " + key + " from 
inputRowType");
+                        throw 
TransformCommonError.cannotFindInputFieldError(getPluginName(), key);
                     }
                     Column oldColumn = inputColumns.get(fieldIndex);
                     PhysicalColumn outputColumn =
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
index 0105149036..aaf3168e1b 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
@@ -27,8 +27,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
-import org.apache.seatunnel.transform.exception.FilterFieldTransformErrorCode;
-import org.apache.seatunnel.transform.exception.TransformException;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
 
 import org.apache.commons.collections4.CollectionUtils;
 
@@ -53,13 +52,20 @@ public class FilterFieldTransform extends 
AbstractCatalogSupportTransform {
         fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new 
String[0]);
         List<String> canNotFoundFields =
                 Arrays.stream(fields)
-                        .filter(field -> seaTunnelRowType.indexOf(field) == -1)
+                        .filter(
+                                field -> {
+                                    try {
+                                        seaTunnelRowType.indexOf(field);
+                                        return false;
+                                    } catch (Exception e) {
+                                        return true;
+                                    }
+                                })
                         .collect(Collectors.toList());
 
         if (!CollectionUtils.isEmpty(canNotFoundFields)) {
-            throw new TransformException(
-                    FilterFieldTransformErrorCode.FILTER_FIELD_NOT_FOUND,
-                    canNotFoundFields.toString());
+            throw TransformCommonError.cannotFindInputFieldsError(
+                    getPluginName(), canNotFoundFields);
         }
     }
 
@@ -92,8 +98,7 @@ public class FilterFieldTransform extends 
AbstractCatalogSupportTransform {
             String field = filterFields.get(i);
             int inputFieldIndex = seaTunnelRowType.indexOf(field);
             if (inputFieldIndex == -1) {
-                throw new IllegalArgumentException(
-                        "Cannot find [" + field + "] field in input row type");
+                throw 
TransformCommonError.cannotFindInputFieldError(getPluginName(), field);
             }
             inputValueIndex[i] = inputFieldIndex;
             outputColumns.add(
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
index 874d29da8b..6f5397e887 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
@@ -24,10 +24,12 @@ import 
org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.format.json.JsonToRowConverters;
 import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
 import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
 import org.apache.seatunnel.transform.exception.TransformException;
 
 import com.jayway.jsonpath.JsonPath;
@@ -42,7 +44,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.JSON_PATH_COMPILE_ERROR;
-import static 
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.SRC_FIELD_NOT_FOUND;
 
 @Slf4j
 public class JsonPathTransform extends MultipleFieldOutputTransform {
@@ -108,10 +109,7 @@ public class JsonPathTransform extends 
MultipleFieldOutputTransform {
             ColumnConfig columnConfig = columnConfigs.get(i);
             String srcField = columnConfig.getSrcField();
             if (!fieldNameSet.contains(srcField)) {
-                throw new TransformException(
-                        SRC_FIELD_NOT_FOUND,
-                        String.format(
-                                "JsonPathTransform config not found 
src_field:[%s]", srcField));
+                throw 
TransformCommonError.cannotFindInputFieldError(getPluginName(), srcField);
             }
             this.srcFieldIndexArr[i] = seaTunnelRowType.indexOf(srcField);
         }
@@ -161,9 +159,10 @@ public class JsonPathTransform extends 
MultipleFieldOutputTransform {
                     jsonString = JsonUtils.toJsonString(row.getFields());
                     break;
                 default:
-                    throw new UnsupportedOperationException(
-                            "JsonPathTransform unsupported sourceDataType: "
-                                    + inputDataType.getSqlType());
+                    throw CommonError.unsupportedDataType(
+                            getPluginName(),
+                            inputDataType.getSqlType().toString(),
+                            columnConfig.getSrcField());
             }
             Object result = 
JSON_PATH_CACHE.get(columnConfig.getPath()).read(jsonString);
             JsonNode jsonNode = JsonUtils.toJsonNode(result);
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
index a99aab49b0..5c5451fce7 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
 import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
 
 import org.apache.commons.collections4.CollectionUtils;
 
@@ -50,10 +51,10 @@ public class ReplaceTransform extends 
SingleFieldOutputTransform {
     }
 
     private void initOutputFields(SeaTunnelRowType inputRowType, String 
replaceField) {
-        inputFieldIndex = inputRowType.indexOf(replaceField);
-        if (inputFieldIndex == -1) {
-            throw new IllegalArgumentException(
-                    "Cannot find [" + replaceField + "] field in input row 
type");
+        try {
+            inputFieldIndex = inputRowType.indexOf(replaceField);
+        } catch (IllegalArgumentException e) {
+            throw 
TransformCommonError.cannotFindInputFieldError(getPluginName(), replaceField);
         }
     }
 
@@ -102,10 +103,8 @@ public class ReplaceTransform extends 
SingleFieldOutputTransform {
                                                                         
.KEY_REPLACE_FIELD)))
                         .collect(Collectors.toList());
         if (CollectionUtils.isEmpty(collect)) {
-            throw new IllegalArgumentException(
-                    "Cannot find ["
-                            + 
config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD)
-                            + "] field in input catalog table");
+            throw TransformCommonError.cannotFindInputFieldError(
+                    getPluginName(), 
config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD));
         }
         return collect.get(0).copy();
     }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
index 46c38639fd..c1ead2dd0b 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
 import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
 
 import lombok.NonNull;
 
@@ -39,12 +40,11 @@ public class SplitTransform extends 
MultipleFieldOutputTransform {
         super(catalogTable);
         this.splitTransformConfig = splitTransformConfig;
         SeaTunnelRowType seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
-        splitFieldIndex = 
seaTunnelRowType.indexOf(splitTransformConfig.getSplitField());
-        if (splitFieldIndex == -1) {
-            throw new IllegalArgumentException(
-                    "Cannot find ["
-                            + splitTransformConfig.getSplitField()
-                            + "] field in input row type");
+        try {
+            splitFieldIndex = 
seaTunnelRowType.indexOf(splitTransformConfig.getSplitField());
+        } catch (IllegalArgumentException e) {
+            throw TransformCommonError.cannotFindInputFieldError(
+                    getPluginName(), splitTransformConfig.getSplitField());
         }
         this.outputCatalogTable = getProducedCatalogTable();
     }
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java
new file mode 100644
index 0000000000..c305511809
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.transform.copy.CopyFieldTransformFactory;
+import org.apache.seatunnel.transform.copy.CopyTransformConfig;
+import org.apache.seatunnel.transform.fieldmapper.FieldMapperTransformConfig;
+import org.apache.seatunnel.transform.fieldmapper.FieldMapperTransformFactory;
+import org.apache.seatunnel.transform.filter.FilterFieldTransformConfig;
+import org.apache.seatunnel.transform.filter.FilterFieldTransformFactory;
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransformConfig;
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransformFactory;
+import org.apache.seatunnel.transform.replace.ReplaceTransformConfig;
+import org.apache.seatunnel.transform.replace.ReplaceTransformFactory;
+import org.apache.seatunnel.transform.split.SplitTransformConfig;
+import org.apache.seatunnel.transform.split.SplitTransformFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TransformErrorTest {
+
+    private static final CatalogTable table =
+            CatalogTableUtil.getCatalogTable(
+                    "test",
+                    "test",
+                    "test",
+                    "test",
+                    new SeaTunnelRowType(
+                            new String[] {"name"},
+                            new SeaTunnelDataType[] {BasicType.STRING_TYPE}));
+
+    @Test
+    void testFieldMapperTransformWithError() {
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(
+                                        
FieldMapperTransformConfig.FIELD_MAPPER.key(),
+                                        new HashMap<String, String>() {
+                                            {
+                                                put("age", "age1");
+                                            }
+                                        });
+                            }
+                        });
+        TableTransformFactoryContext context =
+                new TableTransformFactoryContext(
+                        Collections.singletonList(table),
+                        config,
+                        Thread.currentThread().getContextClassLoader());
+        TransformException exception =
+                Assertions.assertThrows(
+                        TransformException.class,
+                        () ->
+                                new FieldMapperTransformFactory()
+                                        .createTransform(context)
+                                        .createTransform());
+        Assertions.assertEquals(
+                "ErrorCode:[TRANSFORM_COMMON-02], ErrorDescription:[The input 
fields 'age' of 'FieldMapper' transform not found in upstream schema]",
+                exception.getMessage());
+    }
+
+    @Test
+    void testCopyTransformWithError() {
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(
+                                        CopyTransformConfig.FIELDS.key(),
+                                        new HashMap<String, String>() {
+                                            {
+                                                put("ageA", "age1");
+                                            }
+                                        });
+                            }
+                        });
+        TableTransformFactoryContext context =
+                new TableTransformFactoryContext(
+                        Collections.singletonList(table),
+                        config,
+                        Thread.currentThread().getContextClassLoader());
+        TransformException exception =
+                Assertions.assertThrows(
+                        TransformException.class,
+                        () ->
+                                new CopyFieldTransformFactory()
+                                        .createTransform(context)
+                                        .createTransform());
+        Assertions.assertEquals(
+                "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input 
field 'age1' of 'Copy' transform not found in upstream schema]",
+                exception.getMessage());
+
+        ReadonlyConfig config2 =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(CopyTransformConfig.SRC_FIELD.key(), 
"ageB");
+                                put(CopyTransformConfig.DEST_FIELD.key(), 
"age1");
+                            }
+                        });
+        TableTransformFactoryContext context2 =
+                new TableTransformFactoryContext(
+                        Collections.singletonList(table),
+                        config2,
+                        Thread.currentThread().getContextClassLoader());
+        TransformException exception2 =
+                Assertions.assertThrows(
+                        TransformException.class,
+                        () ->
+                                new CopyFieldTransformFactory()
+                                        .createTransform(context2)
+                                        .createTransform());
+        Assertions.assertEquals(
+                "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input 
field 'ageB' of 'Copy' transform not found in upstream schema]",
+                exception2.getMessage());
+    }
+
+    @Test
+    void testFilterTransformWithError() {
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(
+                                        
FilterFieldTransformConfig.KEY_FIELDS.key(),
+                                        new ArrayList<String>() {
+                                            {
+                                                add("age");
+                                                add("gender");
+                                            }
+                                        });
+                            }
+                        });
+        TableTransformFactoryContext context =
+                new TableTransformFactoryContext(
+                        Collections.singletonList(table),
+                        config,
+                        Thread.currentThread().getContextClassLoader());
+        TransformException exception =
+                Assertions.assertThrows(
+                        TransformException.class,
+                        () ->
+                                new FilterFieldTransformFactory()
+                                        .createTransform(context)
+                                        .createTransform());
+        Assertions.assertEquals(
+                "ErrorCode:[TRANSFORM_COMMON-02], ErrorDescription:[The input 
fields 'age,gender' of 'Filter' transform not found in upstream schema]",
+                exception.getMessage());
+    }
+
+    @Test
+    void testJsonPathTransformWithError() {
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(
+                                        JsonPathTransformConfig.COLUMNS.key(),
+                                        new ArrayList<Map<String, String>>() {
+                                            {
+                                                add(
+                                                        new HashMap<String, 
String>() {
+                                                            {
+                                                                put(
+                                                                        
JsonPathTransformConfig.PATH
+                                                                               
 .key(),
+                                                                        
"path");
+                                                                put(
+                                                                        
JsonPathTransformConfig
+                                                                               
 .SRC_FIELD
+                                                                               
 .key(),
+                                                                        "age");
+                                                                put(
+                                                                        
JsonPathTransformConfig
+                                                                               
 .DEST_FIELD
+                                                                               
 .key(),
+                                                                        
"age2");
+                                                            }
+                                                        });
+                                            }
+                                        });
+                            }
+                        });
+        TableTransformFactoryContext context =
+                new TableTransformFactoryContext(
+                        Collections.singletonList(table),
+                        config,
+                        Thread.currentThread().getContextClassLoader());
+        TransformException exception =
+                Assertions.assertThrows(
+                        TransformException.class,
+                        () ->
+                                new JsonPathTransformFactory()
+                                        .createTransform(context)
+                                        .createTransform());
+        Assertions.assertEquals(
+                "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input 
field 'age' of 'JsonPath' transform not found in upstream schema]",
+                exception.getMessage());
+    }
+
+    @Test
+    void testReplaceTransformWithError() {
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                
put(ReplaceTransformConfig.KEY_REPLACE_FIELD.key(), "age");
+                                put(ReplaceTransformConfig.KEY_PATTERN.key(), 
"1");
+                                
put(ReplaceTransformConfig.KEY_REPLACEMENT.key(), "2");
+                                put(ReplaceTransformConfig.KEY_IS_REGEX.key(), 
"false");
+                                
put(ReplaceTransformConfig.KEY_REPLACE_FIRST.key(), "false");
+                            }
+                        });
+        TableTransformFactoryContext context =
+                new TableTransformFactoryContext(
+                        Collections.singletonList(table),
+                        config,
+                        Thread.currentThread().getContextClassLoader());
+        TransformException exception =
+                Assertions.assertThrows(
+                        TransformException.class,
+                        () ->
+                                new ReplaceTransformFactory()
+                                        .createTransform(context)
+                                        .createTransform());
+        Assertions.assertEquals(
+                "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input 
field 'age' of 'Replace' transform not found in upstream schema]",
+                exception.getMessage());
+    }
+
+    @Test
+    void testSplitTransformWithError() {
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                
put(SplitTransformConfig.KEY_SPLIT_FIELD.key(), "age");
+                                put(
+                                        
SplitTransformConfig.KEY_OUTPUT_FIELDS.key(),
+                                        Arrays.asList("age1", "age2"));
+                                put(SplitTransformConfig.KEY_SEPARATOR.key(), 
",");
+                            }
+                        });
+        TableTransformFactoryContext context =
+                new TableTransformFactoryContext(
+                        Collections.singletonList(table),
+                        config,
+                        Thread.currentThread().getContextClassLoader());
+        TransformException exception =
+                Assertions.assertThrows(
+                        TransformException.class,
+                        () ->
+                                new SplitTransformFactory()
+                                        .createTransform(context)
+                                        .createTransform());
+        Assertions.assertEquals(
+                "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input 
field 'age' of 'Split' transform not found in upstream schema]",
+                exception.getMessage());
+    }
+}


Reply via email to