This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 81cf10e112 [Improve][Transform] Optimize transform structure (#8071)
81cf10e112 is described below
commit 81cf10e1126bf5a58d15272eba613fcedcf5ca5f
Author: corgy-w <[email protected]>
AuthorDate: Mon Nov 18 18:26:37 2024 +0800
[Improve][Transform] Optimize transform structure (#8071)
---
...tiRowTransform.java => SeaTunnelFlatMapTransform.java} | 6 +-----
...lMultiRowTransform.java => SeaTunnelMapTransform.java} | 11 +++--------
.../seatunnel/api/transform/SeaTunnelTransform.java | 9 ---------
.../flink/execution/TransformExecuteProcessor.java | 9 +++++----
.../spark/execution/TransformExecuteProcessor.java | 13 +++++++------
.../engine/server/task/flow/TransformFlowLifeCycle.java | 15 +++++++++------
...m.java => AbstractCatalogSupportFlatMapTransform.java} | 10 +++++-----
...sform.java => AbstractCatalogSupportMapTransform.java} | 10 +++++-----
.../seatunnel/transform/common/FilterRowTransform.java | 2 +-
.../transform/common/MultipleFieldOutputTransform.java | 2 +-
.../transform/common/SingleFieldOutputTransform.java | 2 +-
.../transform/fieldmapper/FieldMapperTransform.java | 4 ++--
.../seatunnel/transform/filter/FilterFieldTransform.java | 4 ++--
.../org/apache/seatunnel/transform/sql/SQLTransform.java | 4 ++--
14 files changed, 44 insertions(+), 57 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelFlatMapTransform.java
similarity index 84%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelFlatMapTransform.java
index 1f78e8be48..e0e2d85b5d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelFlatMapTransform.java
@@ -18,7 +18,7 @@ package org.apache.seatunnel.api.transform;
import java.util.List;
-public interface SeaTunnelMultiRowTransform<T> extends SeaTunnelTransform<T> {
+public interface SeaTunnelFlatMapTransform<T> extends SeaTunnelTransform<T> {
/**
* Transform input data to {@link
this#getProducedCatalogTable().getSeaTunnelRowType()} types
@@ -28,8 +28,4 @@ public interface SeaTunnelMultiRowTransform<T> extends
SeaTunnelTransform<T> {
* @return transformed data.
*/
List<T> flatMap(T row);
-
- default T map(T row) {
- throw new UnsupportedOperationException("Heads-up conversion is not
supported");
- }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMapTransform.java
similarity index 80%
rename from
seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMapTransform.java
index 1f78e8be48..165c405baf 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMapTransform.java
@@ -14,11 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.api.transform;
-import java.util.List;
+package org.apache.seatunnel.api.transform;
-public interface SeaTunnelMultiRowTransform<T> extends SeaTunnelTransform<T> {
+public interface SeaTunnelMapTransform<T> extends SeaTunnelTransform<T> {
/**
* Transform input data to {@link
this#getProducedCatalogTable().getSeaTunnelRowType()} types
@@ -27,9 +26,5 @@ public interface SeaTunnelMultiRowTransform<T> extends
SeaTunnelTransform<T> {
* @param row the data need be transformed.
* @return transformed data.
*/
- List<T> flatMap(T row);
-
- default T map(T row) {
- throw new UnsupportedOperationException("Heads-up conversion is not
supported");
- }
+ T map(T row);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
index a64e1b7c7d..0aa45d012e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
@@ -44,15 +44,6 @@ public interface SeaTunnelTransform<T>
/** Get the catalog table output by this transform */
CatalogTable getProducedCatalogTable();
- /**
- * Transform input data to {@link
this#getProducedCatalogTable().getSeaTunnelRowType()} types
- * data.
- *
- * @param row the data need be transformed.
- * @return transformed data.
- */
- T map(T row);
-
/** call it when Transformer completed */
default void close() {}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 982179dd2e..1612c9822f 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -25,7 +25,8 @@ import
org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -144,7 +145,7 @@ public class TransformExecuteProcessor
protected DataStream<SeaTunnelRow> flinkTransform(
SeaTunnelTransform transform, DataStream<SeaTunnelRow> stream) {
- if (transform instanceof SeaTunnelMultiRowTransform) {
+ if (transform instanceof SeaTunnelFlatMapTransform) {
return stream.flatMap(
new ArrayFlatMap(transform),
TypeInformation.of(SeaTunnelRow.class));
}
@@ -157,7 +158,7 @@ public class TransformExecuteProcessor
.getStreamExecutionEnvironment()
.clean(
row ->
-
((SeaTunnelTransform<SeaTunnelRow>) transform)
+
((SeaTunnelMapTransform<SeaTunnelRow>) transform)
.map(row))));
}
@@ -172,7 +173,7 @@ public class TransformExecuteProcessor
@Override
public void flatMap(SeaTunnelRow row, Collector<SeaTunnelRow>
collector) {
List<SeaTunnelRow> rows =
- ((SeaTunnelMultiRowTransform<SeaTunnelRow>)
transform).flatMap(row);
+ ((SeaTunnelFlatMapTransform<SeaTunnelRow>)
transform).flatMap(row);
if (CollectionUtils.isNotEmpty(rows)) {
for (SeaTunnelRow rowResult : rows) {
collector.collect(rowResult);
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 44a60195ee..dbd5ed2307 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -26,7 +26,8 @@ import
org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -192,17 +193,17 @@ public class TransformExecuteProcessor
List<Row> rows = new ArrayList<>();
SeaTunnelRow seaTunnelRow =
inputRowConverter.unpack((GenericRowWithSchema) row);
- if (transform instanceof SeaTunnelMultiRowTransform) {
+ if (transform instanceof SeaTunnelFlatMapTransform) {
List<SeaTunnelRow> seaTunnelRows =
- ((SeaTunnelMultiRowTransform<SeaTunnelRow>) transform)
- .flatMap(seaTunnelRow);
+ ((SeaTunnelFlatMapTransform<SeaTunnelRow>)
transform).flatMap(seaTunnelRow);
if (CollectionUtils.isNotEmpty(seaTunnelRows)) {
for (SeaTunnelRow seaTunnelRowTransform : seaTunnelRows) {
rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
}
}
- } else {
- SeaTunnelRow seaTunnelRowTransform =
transform.map(seaTunnelRow);
+ } else if (transform instanceof SeaTunnelMapTransform) {
+ SeaTunnelRow seaTunnelRowTransform =
+ ((SeaTunnelMapTransform<SeaTunnelRow>)
transform).map(seaTunnelRow);
if (seaTunnelRowTransform != null) {
rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index 093ac1cca6..66f658fa4f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -19,7 +19,8 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
-import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -113,9 +114,9 @@ public class TransformFlowLifeCycle<T> extends
ActionFlowLifeCycle
for (SeaTunnelTransform<T> transformer : transform) {
List<T> nextInputDataList = new ArrayList<>();
- if (transformer instanceof SeaTunnelMultiRowTransform) {
- SeaTunnelMultiRowTransform<T> transformDecorator =
- (SeaTunnelMultiRowTransform<T>) transformer;
+ if (transformer instanceof SeaTunnelFlatMapTransform) {
+ SeaTunnelFlatMapTransform<T> transformDecorator =
+ (SeaTunnelFlatMapTransform<T>) transformer;
for (T data : dataList) {
List<T> outputDataArray = transformDecorator.flatMap(data);
log.debug(
@@ -127,9 +128,11 @@ public class TransformFlowLifeCycle<T> extends
ActionFlowLifeCycle
nextInputDataList.addAll(outputDataArray);
}
}
- } else {
+ } else if (transformer instanceof SeaTunnelMapTransform) {
for (T data : dataList) {
- T outputData = transformer.map(data);
+ SeaTunnelMapTransform<T> transformDecorator =
+ (SeaTunnelMapTransform<T>) transformer;
+ T outputData = transformDecorator.map(data);
log.debug(
"Transform[{}] input row {} and output row {}",
transformer,
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportFlatMapTransform.java
similarity index 81%
rename from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
rename to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportFlatMapTransform.java
index bf7dc2ab96..71b7b536be 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportFlatMapTransform.java
@@ -18,7 +18,7 @@ package org.apache.seatunnel.transform.common;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -26,15 +26,15 @@ import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
-public abstract class AbstractCatalogMultiRowTransform
+public abstract class AbstractCatalogSupportFlatMapTransform
extends AbstractSeaTunnelTransform<SeaTunnelRow, List<SeaTunnelRow>>
- implements SeaTunnelMultiRowTransform<SeaTunnelRow> {
+ implements SeaTunnelFlatMapTransform<SeaTunnelRow> {
- public AbstractCatalogMultiRowTransform(@NonNull CatalogTable
inputCatalogTable) {
+ public AbstractCatalogSupportFlatMapTransform(@NonNull CatalogTable
inputCatalogTable) {
super(inputCatalogTable);
}
- public AbstractCatalogMultiRowTransform(
+ public AbstractCatalogSupportFlatMapTransform(
@NonNull CatalogTable inputCatalogTable, ErrorHandleWay
rowErrorHandleWay) {
super(inputCatalogTable, rowErrorHandleWay);
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportMapTransform.java
similarity index 81%
rename from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
rename to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportMapTransform.java
index 358bcd4298..6380d88539 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportMapTransform.java
@@ -19,20 +19,20 @@ package org.apache.seatunnel.transform.common;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public abstract class AbstractCatalogSupportTransform
+public abstract class AbstractCatalogSupportMapTransform
extends AbstractSeaTunnelTransform<SeaTunnelRow, SeaTunnelRow>
- implements SeaTunnelTransform<SeaTunnelRow> {
- public AbstractCatalogSupportTransform(@NonNull CatalogTable
inputCatalogTable) {
+ implements SeaTunnelMapTransform<SeaTunnelRow> {
+ public AbstractCatalogSupportMapTransform(@NonNull CatalogTable
inputCatalogTable) {
super(inputCatalogTable);
}
- public AbstractCatalogSupportTransform(
+ public AbstractCatalogSupportMapTransform(
@NonNull CatalogTable inputCatalogTable, ErrorHandleWay
rowErrorHandleWay) {
super(inputCatalogTable, rowErrorHandleWay);
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
index d5eabf678e..855085a98b 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import lombok.NonNull;
-public abstract class FilterRowTransform extends
AbstractCatalogSupportTransform {
+public abstract class FilterRowTransform extends
AbstractCatalogSupportMapTransform {
public FilterRowTransform(@NonNull CatalogTable inputCatalogTable) {
super(inputCatalogTable);
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index 7f6ac1fba7..84e3a9348d 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -34,7 +34,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
-public abstract class MultipleFieldOutputTransform extends
AbstractCatalogSupportTransform {
+public abstract class MultipleFieldOutputTransform extends
AbstractCatalogSupportMapTransform {
private static final String[] TYPE_ARRAY_STRING = new String[0];
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
index 394242a41b..8768069ab8 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
@@ -33,7 +33,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
-public abstract class SingleFieldOutputTransform extends
AbstractCatalogSupportTransform {
+public abstract class SingleFieldOutputTransform extends
AbstractCatalogSupportMapTransform {
private int fieldIndex;
private SeaTunnelRowContainerGenerator rowContainerGenerator;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
index c5c6fa4f9d..5c9709c18a 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
+import
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
import org.apache.seatunnel.transform.exception.TransformCommonError;
import org.apache.commons.collections4.CollectionUtils;
@@ -41,7 +41,7 @@ import java.util.Map;
import java.util.stream.Collectors;
@Slf4j
-public class FieldMapperTransform extends AbstractCatalogSupportTransform {
+public class FieldMapperTransform extends AbstractCatalogSupportMapTransform {
public static String PLUGIN_NAME = "FieldMapper";
private final FieldMapperTransformConfig config;
private List<Integer> needReaderColIndex;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
index e6797c2cf9..53b66af3d8 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java
@@ -28,7 +28,7 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
+import
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
import org.apache.seatunnel.transform.exception.TransformCommonError;
import org.apache.commons.collections4.CollectionUtils;
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
@Slf4j
-public class FilterFieldTransform extends AbstractCatalogSupportTransform {
+public class FilterFieldTransform extends AbstractCatalogSupportMapTransform {
public static final String PLUGIN_NAME = "Filter";
private int[] inputValueIndexList;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index 71f448500c..6c2be79f34 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -30,7 +30,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.transform.common.AbstractCatalogMultiRowTransform;
+import
org.apache.seatunnel.transform.common.AbstractCatalogSupportFlatMapTransform;
import org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType;
import lombok.NonNull;
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
import static
org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType.ZETA;
@Slf4j
-public class SQLTransform extends AbstractCatalogMultiRowTransform {
+public class SQLTransform extends AbstractCatalogSupportFlatMapTransform {
public static final String PLUGIN_NAME = "Sql";
public static final Option<String> KEY_QUERY =