This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 81cf10e112 [Improve][Transform] Optimize transform structure (#8071)
81cf10e112 is described below

commit 81cf10e1126bf5a58d15272eba613fcedcf5ca5f
Author: corgy-w <[email protected]>
AuthorDate: Mon Nov 18 18:26:37 2024 +0800

    [Improve][Transform] Optimize transform structure (#8071)
---
 ...tiRowTransform.java => SeaTunnelFlatMapTransform.java} |  6 +-----
 ...lMultiRowTransform.java => SeaTunnelMapTransform.java} | 11 +++--------
 .../seatunnel/api/transform/SeaTunnelTransform.java       |  9 ---------
 .../flink/execution/TransformExecuteProcessor.java        |  9 +++++----
 .../spark/execution/TransformExecuteProcessor.java        | 13 +++++++------
 .../engine/server/task/flow/TransformFlowLifeCycle.java   | 15 +++++++++------
 ...m.java => AbstractCatalogSupportFlatMapTransform.java} | 10 +++++-----
 ...sform.java => AbstractCatalogSupportMapTransform.java} | 10 +++++-----
 .../seatunnel/transform/common/FilterRowTransform.java    |  2 +-
 .../transform/common/MultipleFieldOutputTransform.java    |  2 +-
 .../transform/common/SingleFieldOutputTransform.java      |  2 +-
 .../transform/fieldmapper/FieldMapperTransform.java       |  4 ++--
 .../seatunnel/transform/filter/FilterFieldTransform.java  |  4 ++--
 .../org/apache/seatunnel/transform/sql/SQLTransform.java  |  4 ++--
 14 files changed, 44 insertions(+), 57 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelFlatMapTransform.java
similarity index 84%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelFlatMapTransform.java
index 1f78e8be48..e0e2d85b5d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelFlatMapTransform.java
@@ -18,7 +18,7 @@ package org.apache.seatunnel.api.transform;
 
 import java.util.List;
 
-public interface SeaTunnelMultiRowTransform<T> extends SeaTunnelTransform<T> {
+public interface SeaTunnelFlatMapTransform<T> extends SeaTunnelTransform<T> {
 
     /**
      * Transform input data to {@link 
this#getProducedCatalogTable().getSeaTunnelRowType()} types
@@ -28,8 +28,4 @@ public interface SeaTunnelMultiRowTransform<T> extends 
SeaTunnelTransform<T> {
      * @return transformed data.
      */
     List<T> flatMap(T row);
-
-    default T map(T row) {
-        throw new UnsupportedOperationException("Heads-up conversion is not 
supported");
-    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMapTransform.java
similarity index 80%
rename from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMapTransform.java
index 1f78e8be48..165c405baf 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMapTransform.java
@@ -14,11 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.api.transform;
 
-import java.util.List;
+package org.apache.seatunnel.api.transform;
 
-public interface SeaTunnelMultiRowTransform<T> extends SeaTunnelTransform<T> {
+public interface SeaTunnelMapTransform<T> extends SeaTunnelTransform<T> {
 
     /**
      * Transform input data to {@link 
this#getProducedCatalogTable().getSeaTunnelRowType()} types
@@ -27,9 +26,5 @@ public interface SeaTunnelMultiRowTransform<T> extends 
SeaTunnelTransform<T> {
      * @param row the data need be transformed.
      * @return transformed data.
      */
-    List<T> flatMap(T row);
-
-    default T map(T row) {
-        throw new UnsupportedOperationException("Heads-up conversion is not 
supported");
-    }
+    T map(T row);
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
index a64e1b7c7d..0aa45d012e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
@@ -44,15 +44,6 @@ public interface SeaTunnelTransform<T>
     /** Get the catalog table output by this transform */
     CatalogTable getProducedCatalogTable();
 
-    /**
-     * Transform input data to {@link 
this#getProducedCatalogTable().getSeaTunnelRowType()} types
-     * data.
-     *
-     * @param row the data need be transformed.
-     * @return transformed data.
-     */
-    T map(T row);
-
     /** call it when Transformer completed */
     default void close() {}
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 982179dd2e..1612c9822f 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -25,7 +25,8 @@ import 
org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -144,7 +145,7 @@ public class TransformExecuteProcessor
 
     protected DataStream<SeaTunnelRow> flinkTransform(
             SeaTunnelTransform transform, DataStream<SeaTunnelRow> stream) {
-        if (transform instanceof SeaTunnelMultiRowTransform) {
+        if (transform instanceof SeaTunnelFlatMapTransform) {
             return stream.flatMap(
                     new ArrayFlatMap(transform), 
TypeInformation.of(SeaTunnelRow.class));
         }
@@ -157,7 +158,7 @@ public class TransformExecuteProcessor
                                 .getStreamExecutionEnvironment()
                                 .clean(
                                         row ->
-                                                
((SeaTunnelTransform<SeaTunnelRow>) transform)
+                                                
((SeaTunnelMapTransform<SeaTunnelRow>) transform)
                                                         .map(row))));
     }
 
@@ -172,7 +173,7 @@ public class TransformExecuteProcessor
         @Override
         public void flatMap(SeaTunnelRow row, Collector<SeaTunnelRow> 
collector) {
             List<SeaTunnelRow> rows =
-                    ((SeaTunnelMultiRowTransform<SeaTunnelRow>) 
transform).flatMap(row);
+                    ((SeaTunnelFlatMapTransform<SeaTunnelRow>) 
transform).flatMap(row);
             if (CollectionUtils.isNotEmpty(rows)) {
                 for (SeaTunnelRow rowResult : rows) {
                     collector.collect(rowResult);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 44a60195ee..dbd5ed2307 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -26,7 +26,8 @@ import 
org.apache.seatunnel.api.table.factory.TableTransformFactory;
 import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -192,17 +193,17 @@ public class TransformExecuteProcessor
             List<Row> rows = new ArrayList<>();
 
             SeaTunnelRow seaTunnelRow = 
inputRowConverter.unpack((GenericRowWithSchema) row);
-            if (transform instanceof SeaTunnelMultiRowTransform) {
+            if (transform instanceof SeaTunnelFlatMapTransform) {
                 List<SeaTunnelRow> seaTunnelRows =
-                        ((SeaTunnelMultiRowTransform<SeaTunnelRow>) transform)
-                                .flatMap(seaTunnelRow);
+                        ((SeaTunnelFlatMapTransform<SeaTunnelRow>) 
transform).flatMap(seaTunnelRow);
                 if (CollectionUtils.isNotEmpty(seaTunnelRows)) {
                     for (SeaTunnelRow seaTunnelRowTransform : seaTunnelRows) {
                         
rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
                     }
                 }
-            } else {
-                SeaTunnelRow seaTunnelRowTransform = 
transform.map(seaTunnelRow);
+            } else if (transform instanceof SeaTunnelMapTransform) {
+                SeaTunnelRow seaTunnelRowTransform =
+                        ((SeaTunnelMapTransform<SeaTunnelRow>) 
transform).map(seaTunnelRow);
                 if (seaTunnelRowTransform != null) {
                     rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
                 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index 093ac1cca6..66f658fa4f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -19,7 +19,8 @@ package org.apache.seatunnel.engine.server.task.flow;
 
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
-import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -113,9 +114,9 @@ public class TransformFlowLifeCycle<T> extends 
ActionFlowLifeCycle
 
         for (SeaTunnelTransform<T> transformer : transform) {
             List<T> nextInputDataList = new ArrayList<>();
-            if (transformer instanceof SeaTunnelMultiRowTransform) {
-                SeaTunnelMultiRowTransform<T> transformDecorator =
-                        (SeaTunnelMultiRowTransform<T>) transformer;
+            if (transformer instanceof SeaTunnelFlatMapTransform) {
+                SeaTunnelFlatMapTransform<T> transformDecorator =
+                        (SeaTunnelFlatMapTransform<T>) transformer;
                 for (T data : dataList) {
                     List<T> outputDataArray = transformDecorator.flatMap(data);
                     log.debug(
@@ -127,9 +128,11 @@ public class TransformFlowLifeCycle<T> extends 
ActionFlowLifeCycle
                         nextInputDataList.addAll(outputDataArray);
                     }
                 }
-            } else {
+            } else if (transformer instanceof SeaTunnelMapTransform) {
                 for (T data : dataList) {
-                    T outputData = transformer.map(data);
+                    SeaTunnelMapTransform<T> transformDecorator =
+                            (SeaTunnelMapTransform<T>) transformer;
+                    T outputData = transformDecorator.map(data);
                     log.debug(
                             "Transform[{}] input row {} and output row {}",
                             transformer,
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportFlatMapTransform.java
similarity index 81%
rename from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
rename to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportFlatMapTransform.java
index bf7dc2ab96..71b7b536be 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportFlatMapTransform.java
@@ -18,7 +18,7 @@ package org.apache.seatunnel.transform.common;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -26,15 +26,15 @@ import lombok.extern.slf4j.Slf4j;
 import java.util.List;
 
 @Slf4j
-public abstract class AbstractCatalogMultiRowTransform
+public abstract class AbstractCatalogSupportFlatMapTransform
         extends AbstractSeaTunnelTransform<SeaTunnelRow, List<SeaTunnelRow>>
-        implements SeaTunnelMultiRowTransform<SeaTunnelRow> {
+        implements SeaTunnelFlatMapTransform<SeaTunnelRow> {
 
-    public AbstractCatalogMultiRowTransform(@NonNull CatalogTable 
inputCatalogTable) {
+    public AbstractCatalogSupportFlatMapTransform(@NonNull CatalogTable 
inputCatalogTable) {
         super(inputCatalogTable);
     }
 
-    public AbstractCatalogMultiRowTransform(
+    public AbstractCatalogSupportFlatMapTransform(
             @NonNull CatalogTable inputCatalogTable, ErrorHandleWay 
rowErrorHandleWay) {
         super(inputCatalogTable, rowErrorHandleWay);
     }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportMapTransform.java
similarity index 81%
rename from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
rename to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportMapTransform.java
index 358bcd4298..6380d88539 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportMapTransform.java
@@ -19,20 +19,20 @@ package org.apache.seatunnel.transform.common;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public abstract class AbstractCatalogSupportTransform
+public abstract class AbstractCatalogSupportMapTransform
         extends AbstractSeaTunnelTransform<SeaTunnelRow, SeaTunnelRow>
-        implements SeaTunnelTransform<SeaTunnelRow> {
-    public AbstractCatalogSupportTransform(@NonNull CatalogTable 
inputCatalogTable) {
+        implements SeaTunnelMapTransform<SeaTunnelRow> {
+    public AbstractCatalogSupportMapTransform(@NonNull CatalogTable 
inputCatalogTable) {
         super(inputCatalogTable);
     }
 
-    public AbstractCatalogSupportTransform(
+    public AbstractCatalogSupportMapTransform(
             @NonNull CatalogTable inputCatalogTable, ErrorHandleWay 
rowErrorHandleWay) {
         super(inputCatalogTable, rowErrorHandleWay);
     }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
index d5eabf678e..855085a98b 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 
 import lombok.NonNull;
 
-public abstract class FilterRowTransform extends 
AbstractCatalogSupportTransform {
+public abstract class FilterRowTransform extends 
AbstractCatalogSupportMapTransform {
 
     public FilterRowTransform(@NonNull CatalogTable inputCatalogTable) {
         super(inputCatalogTable);
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 7f6ac1fba7..84e3a9348d 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -34,7 +34,7 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 @Slf4j
-public abstract class MultipleFieldOutputTransform extends 
AbstractCatalogSupportTransform {
+public abstract class MultipleFieldOutputTransform extends 
AbstractCatalogSupportMapTransform {
 
     private static final String[] TYPE_ARRAY_STRING = new String[0];
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
index 394242a41b..8768069ab8 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
@@ -33,7 +33,7 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 @Slf4j
-public abstract class SingleFieldOutputTransform extends 
AbstractCatalogSupportTransform {
+public abstract class SingleFieldOutputTransform extends 
AbstractCatalogSupportMapTransform {
 
     private int fieldIndex;
     private SeaTunnelRowContainerGenerator rowContainerGenerator;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
index c5c6fa4f9d..5c9709c18a 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
+import 
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
 import org.apache.seatunnel.transform.exception.TransformCommonError;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -41,7 +41,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 @Slf4j
-public class FieldMapperTransform extends AbstractCatalogSupportTransform {
+public class FieldMapperTransform extends AbstractCatalogSupportMapTransform {
     public static String PLUGIN_NAME = "FieldMapper";
     private final FieldMapperTransformConfig config;
     private List<Integer> needReaderColIndex;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
index e6797c2cf9..53b66af3d8 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
@@ -28,7 +28,7 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
+import 
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
 import org.apache.seatunnel.transform.exception.TransformCommonError;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 @Slf4j
-public class FilterFieldTransform extends AbstractCatalogSupportTransform {
+public class FilterFieldTransform extends AbstractCatalogSupportMapTransform {
     public static final String PLUGIN_NAME = "Filter";
 
     private int[] inputValueIndexList;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index 71f448500c..6c2be79f34 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -30,7 +30,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.transform.common.AbstractCatalogMultiRowTransform;
+import 
org.apache.seatunnel.transform.common.AbstractCatalogSupportFlatMapTransform;
 import org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType;
 
 import lombok.NonNull;
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType.ZETA;
 
 @Slf4j
-public class SQLTransform extends AbstractCatalogMultiRowTransform {
+public class SQLTransform extends AbstractCatalogSupportFlatMapTransform {
     public static final String PLUGIN_NAME = "Sql";
 
     public static final Option<String> KEY_QUERY =

Reply via email to