This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4830dd5512 [Fix][Transform-V2] Fix multiTable SQL transform (#10263)
4830dd5512 is described below

commit 4830dd55127accb9f627c7ea0c37e681bc0bf7d5
Author: dy102 <[email protected]>
AuthorDate: Wed Feb 4 20:58:13 2026 +0900

    [Fix][Transform-V2] Fix multiTable SQL transform (#10263)
---
 .../apache/seatunnel/e2e/transform/TestSQLIT.java  |  2 +-
 .../DefineSinkTypeMultiCatalogTransform.java       |  6 +++
 .../common/AbstractMultiCatalogTransform.java      | 39 ++--------------
 .../IdentityFlatMapTransform.java}                 | 34 +++++++++-----
 .../IdentityMapTransform.java}                     | 35 ++++++++------
 .../copy/CopyFieldMultiCatalogTransform.java       |  6 +++
 .../DynamicCompileMultiCatalogTransform.java       | 10 +++-
 .../FieldMapperMultiCatalogTransform.java          |  6 +++
 .../filter/FieldFieldMultiCatalogTransform.java    |  6 +++
 .../FieldRowKindMultiCatalogTransform.java         |  6 +++
 .../jsonpath/JsonPathMultiCatalogTransform.java    | 10 +++-
 .../metadata/MetadataMultiCatalogTransform.java    |  6 +++
 .../embedding/EmbeddingMultiCatalogTransform.java  | 10 +++-
 .../nlpmodel/llm/LLMMultiCatalogTransform.java     | 10 +++-
 .../RegexExtractMultiCatalogTransform.java         | 10 +++-
 .../rename/FieldRenameMultiCatalogTransform.java   |  6 +++
 .../rename/TableRenameMultiCatalogTransform.java   |  6 +++
 .../replace/ReplaceMultiCatalogTransform.java      |  6 +++
 .../RowKindExtractorMultiCatalogTransform.java     |  6 +++
 .../split/SplitMultiCatalogTransform.java          |  6 +++
 .../sql/SQLMultiCatalogFlatMapTransform.java       |  7 +++
 .../table/TableFilterMultiCatalogTransform.java    |  6 +++
 .../table/TableMergeMultiCatalogTransform.java     |  8 +++-
 .../FieldRenameMultiCatalogTransformTest.java}     | 41 ++++++++++++-----
 .../sql/SQLMultiCatalogFlatMapTransformTest.java   | 53 ++++++++++++++++++++--
 25 files changed, 252 insertions(+), 89 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index 2e0a2a0851..4dc0c776e4 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -105,7 +105,7 @@ public class TestSQLIT extends TestSuiteBase {
     @TestTemplate
     public void testSQLTransformMultiTable(TestContainer container)
             throws IOException, InterruptedException {
-        Container.ExecResult sqlTransform = 
container.executeJob("/sql_transform.conf");
+        Container.ExecResult sqlTransform = 
container.executeJob("/sql_transform_multi_table.conf");
         Assertions.assertEquals(0, sqlTransform.getExitCode());
     }
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/adaptsink/DefineSinkTypeMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/adaptsink/DefineSinkTypeMultiCatalogTransform.java
index 31d629421d..e51d8386e6 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/adaptsink/DefineSinkTypeMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/adaptsink/DefineSinkTypeMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -45,4 +46,9 @@ public class DefineSinkTypeMultiCatalogTransform extends 
AbstractMultiCatalogMap
             CatalogTable table, ReadonlyConfig config) {
         return new 
DefineSinkTypeTransform(DefineSinkTypeTransformConfig.of(config), table);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogTransform.java
index 49fa561979..e432afb05e 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogTransform.java
@@ -19,8 +19,6 @@ package org.apache.seatunnel.transform.common;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -73,7 +71,7 @@ public abstract class AbstractMultiCatalogTransform 
implements SeaTunnelTransfor
                     if (tableConfig != null) {
                         transformMap.put(tableId, 
buildTransform(inputCatalogTable, tableConfig));
                     } else {
-                        transformMap.put(tableId, new 
IdentityTransform(inputCatalogTable));
+                        transformMap.put(tableId, 
createIdentityTransform(inputCatalogTable));
                     }
                 });
 
@@ -91,6 +89,9 @@ public abstract class AbstractMultiCatalogTransform 
implements SeaTunnelTransfor
     protected abstract SeaTunnelTransform<SeaTunnelRow> buildTransform(
             CatalogTable inputCatalogTable, ReadonlyConfig config);
 
+    protected abstract SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(
+            CatalogTable catalogTable);
+
     @Override
     public List<CatalogTable> getProducedCatalogTables() {
         return outputCatalogTables;
@@ -103,36 +104,4 @@ public abstract class AbstractMultiCatalogTransform 
implements SeaTunnelTransfor
 
     @Override
     public void setTypeInfo(SeaTunnelDataType<SeaTunnelRow> inputDataType) {}
-
-    public static class IdentityTransform extends 
AbstractCatalogSupportMapTransform {
-        private final CatalogTable catalogTable;
-
-        @Override
-        public String getPluginName() {
-            return "Identity";
-        }
-
-        public IdentityTransform(CatalogTable catalogTable) {
-            super(catalogTable);
-            this.catalogTable = catalogTable;
-        }
-
-        @Override
-        protected SeaTunnelRow transformRow(SeaTunnelRow row) {
-            return row;
-        }
-
-        @Override
-        protected TableSchema transformTableSchema() {
-            return catalogTable.getTableSchema();
-        }
-
-        @Override
-        protected TableIdentifier transformTableIdentifier() {
-            return catalogTable.getTableId();
-        }
-
-        @Override
-        public void setTypeInfo(SeaTunnelDataType<SeaTunnelRow> inputDataType) 
{}
-    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityFlatMapTransform.java
similarity index 54%
copy from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
copy to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityFlatMapTransform.java
index c12709e633..d1045b59e6 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityFlatMapTransform.java
@@ -15,31 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform.metadata;
+package org.apache.seatunnel.transform.common;
 
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
 
+import java.util.Collections;
 import java.util.List;
 
-public class MetadataMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
+public class IdentityFlatMapTransform extends 
AbstractCatalogSupportFlatMapTransform {
+    private final CatalogTable catalogTable;
 
-    public MetadataMultiCatalogTransform(
-            List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
-        super(inputCatalogTables, config);
+    public IdentityFlatMapTransform(CatalogTable catalogTable) {
+        super(catalogTable);
+        this.catalogTable = catalogTable;
     }
 
     @Override
     public String getPluginName() {
-        return MetadataTransformConfig.PLUGIN_NAME;
+        return "IdentityFlatMap";
     }
 
     @Override
-    protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
-            CatalogTable inputCatalogTable, ReadonlyConfig config) {
-        return new MetadataTransform(config, inputCatalogTable);
+    protected List<SeaTunnelRow> transformRow(SeaTunnelRow row) {
+        return Collections.singletonList(row);
+    }
+
+    @Override
+    protected TableSchema transformTableSchema() {
+        return catalogTable.getTableSchema();
+    }
+
+    @Override
+    protected TableIdentifier transformTableIdentifier() {
+        return catalogTable.getTableId();
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityMapTransform.java
similarity index 56%
copy from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
copy to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityMapTransform.java
index c12709e633..82f2c0a9c5 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityMapTransform.java
@@ -15,31 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform.metadata;
+package org.apache.seatunnel.transform.common;
 
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
 
-import java.util.List;
+public class IdentityMapTransform extends AbstractCatalogSupportMapTransform {
+    private final CatalogTable catalogTable;
 
-public class MetadataMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
-
-    public MetadataMultiCatalogTransform(
-            List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
-        super(inputCatalogTables, config);
+    public IdentityMapTransform(CatalogTable catalogTable) {
+        super(catalogTable);
+        this.catalogTable = catalogTable;
     }
 
     @Override
     public String getPluginName() {
-        return MetadataTransformConfig.PLUGIN_NAME;
+        return "IdentityMap";
+    }
+
+    @Override
+    protected SeaTunnelRow transformRow(SeaTunnelRow row) {
+        return row;
+    }
+
+    @Override
+    protected TableSchema transformTableSchema() {
+        return catalogTable.getTableSchema();
     }
 
     @Override
-    protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
-            CatalogTable inputCatalogTable, ReadonlyConfig config) {
-        return new MetadataTransform(config, inputCatalogTable);
+    protected TableIdentifier transformTableIdentifier() {
+        return catalogTable.getTableId();
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java
index 6ed09e211f..542c90cc6e 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -42,4 +43,9 @@ public class CopyFieldMultiCatalogTransform extends 
AbstractMultiCatalogMapTrans
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
         return new CopyFieldTransform(CopyTransformConfig.of(config), 
inputCatalogTable);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java
index aa699b52ea..8c3fa4432e 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -32,6 +33,11 @@ public class DynamicCompileMultiCatalogTransform extends 
AbstractMultiCatalogMap
         super(inputCatalogTables, config);
     }
 
+    @Override
+    public String getPluginName() {
+        return DynamicCompileTransform.PLUGIN_NAME;
+    }
+
     @Override
     protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
@@ -39,7 +45,7 @@ public class DynamicCompileMultiCatalogTransform extends 
AbstractMultiCatalogMap
     }
 
     @Override
-    public String getPluginName() {
-        return DynamicCompileTransform.PLUGIN_NAME;
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java
index 259f1b3163..96317f4656 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -42,4 +43,9 @@ public class FieldMapperMultiCatalogTransform extends 
AbstractMultiCatalogMapTra
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
         return new FieldMapperTransform(FieldMapperTransformConfig.of(config), 
inputCatalogTable);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
index f1a3b86c98..f281b3d855 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -42,4 +43,9 @@ public class FieldFieldMultiCatalogTransform extends 
AbstractMultiCatalogMapTran
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
         return new FilterFieldTransform(config, inputCatalogTable);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java
index e17411933d..e3b492f5c2 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -42,4 +43,9 @@ public class FieldRowKindMultiCatalogTransform extends 
AbstractMultiCatalogMapTr
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
         return new FilterRowKindTransform(config, inputCatalogTable);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java
index f8e096d34f..17b3614a5b 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -31,6 +32,11 @@ public class JsonPathMultiCatalogTransform extends 
AbstractMultiCatalogMapTransf
         super(inputCatalogTables, config);
     }
 
+    @Override
+    public String getPluginName() {
+        return "JsonPath";
+    }
+
     @Override
     protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
@@ -39,7 +45,7 @@ public class JsonPathMultiCatalogTransform extends 
AbstractMultiCatalogMapTransf
     }
 
     @Override
-    public String getPluginName() {
-        return "JsonPath";
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
index c12709e633..6f1c970bc1 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -42,4 +43,9 @@ public class MetadataMultiCatalogTransform extends 
AbstractMultiCatalogMapTransf
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
         return new MetadataTransform(config, inputCatalogTable);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingMultiCatalogTransform.java
index b32181d067..21f7e1d145 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -31,6 +32,11 @@ public class EmbeddingMultiCatalogTransform extends 
AbstractMultiCatalogMapTrans
         super(inputCatalogTables, config);
     }
 
+    @Override
+    public String getPluginName() {
+        return "Embedding";
+    }
+
     @Override
     protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
@@ -38,7 +44,7 @@ public class EmbeddingMultiCatalogTransform extends 
AbstractMultiCatalogMapTrans
     }
 
     @Override
-    public String getPluginName() {
-        return "Embedding";
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMMultiCatalogTransform.java
index 2331aae805..987771c1d3 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -30,6 +31,11 @@ public class LLMMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
         super(inputCatalogTables, config);
     }
 
+    @Override
+    public String getPluginName() {
+        return "LLM";
+    }
+
     @Override
     protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
@@ -37,7 +43,7 @@ public class LLMMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
     }
 
     @Override
-    public String getPluginName() {
-        return "LLM";
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractMultiCatalogTransform.java
index 937f69f725..1694c88fd8 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -32,6 +33,11 @@ public class RegexExtractMultiCatalogTransform extends 
AbstractMultiCatalogMapTr
         super(inputCatalogTables, config);
     }
 
+    @Override
+    public String getPluginName() {
+        return RegexExtractTransform.PLUGIN_NAME;
+    }
+
     @Override
     protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
@@ -39,7 +45,7 @@ public class RegexExtractMultiCatalogTransform extends 
AbstractMultiCatalogMapTr
     }
 
     @Override
-    public String getPluginName() {
-        return RegexExtractTransform.PLUGIN_NAME;
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
index dd0f61e4e4..d670a788a5 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -42,4 +43,9 @@ public class FieldRenameMultiCatalogTransform extends 
AbstractMultiCatalogMapTra
             CatalogTable table, ReadonlyConfig config) {
         return new FieldRenameTransform(FieldRenameConfig.of(config), table);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
index 67cff881da..e8eae6355e 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -42,4 +43,9 @@ public class TableRenameMultiCatalogTransform extends 
AbstractMultiCatalogMapTra
             CatalogTable table, ReadonlyConfig config) {
         return new TableRenameTransform(TableRenameConfig.of(config), table);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceMultiCatalogTransform.java
index 54d6077541..19f3bd4677 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -42,4 +43,9 @@ public class ReplaceMultiCatalogTransform extends 
AbstractMultiCatalogMapTransfo
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
         return new ReplaceTransform(config, inputCatalogTable);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorMultiCatalogTransform.java
index 0c7815c584..5eb82c1691 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -42,4 +43,9 @@ public class RowKindExtractorMultiCatalogTransform extends 
AbstractMultiCatalogM
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
         return new RowKindExtractorTransform(config, inputCatalogTable);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitMultiCatalogTransform.java
index 52f033b10a..6dde024f44 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import java.util.List;
 
@@ -42,4 +43,9 @@ public class SplitMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
         return new SplitTransform(SplitTransformConfig.of(config), 
inputCatalogTable);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransform.java
index 0bf14feaec..732717dbdf 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransform.java
@@ -21,7 +21,9 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import 
org.apache.seatunnel.transform.common.AbstractMultiCatalogFlatMapTransform;
+import org.apache.seatunnel.transform.common.IdentityFlatMapTransform;
 
 import java.util.List;
 
@@ -42,4 +44,9 @@ public class SQLMultiCatalogFlatMapTransform extends 
AbstractMultiCatalogFlatMap
             CatalogTable inputCatalogTable, ReadonlyConfig config) {
         return new SQLTransform(config, inputCatalogTable);
     }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityFlatMapTransform(catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
index 8c85b7c4c6..064c62efb1 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -61,6 +62,11 @@ public class TableFilterMultiCatalogTransform extends 
AbstractMultiCatalogMapTra
         return new TableFilterTransform(include, table);
     }
 
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
+
     @Override
     public List<CatalogTable> getProducedCatalogTables() {
         List<CatalogTable> outputTables = new ArrayList<>();
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableMergeMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableMergeMultiCatalogTransform.java
index 406ba03edb..c7dcd7219c 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableMergeMultiCatalogTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableMergeMultiCatalogTransform.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -51,6 +52,11 @@ public class TableMergeMultiCatalogTransform extends 
AbstractMultiCatalogMapTran
         return new TableMergeTransform(TableMergeConfig.of(config), table);
     }
 
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> 
createIdentityTransform(CatalogTable catalogTable) {
+        return new IdentityMapTransform(catalogTable);
+    }
+
     @Override
     public List<CatalogTable> getProducedCatalogTables() {
         List<CatalogTable> outputTables = new ArrayList<>();
@@ -62,7 +68,7 @@ public class TableMergeMultiCatalogTransform extends 
AbstractMultiCatalogMapTran
 
             String tableId = outputTable.getTablePath().getFullName();
             SeaTunnelTransform<SeaTunnelRow> transform = 
transformMap.get(tableId);
-            if (transform instanceof IdentityTransform) {
+            if (transform instanceof IdentityMapTransform) {
                 outputTables.add(outputTable);
             } else {
                 if (!mergeTables.containsKey(tableId)) {
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransformTest.java
similarity index 55%
copy from 
seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
copy to 
seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransformTest.java
index bf9d9d42f8..52c824b955 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransformTest.java
@@ -15,45 +15,62 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform.sql;
+package org.apache.seatunnel.transform.rename;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
-public class SQLMultiCatalogFlatMapTransformTest {
+class FieldRenameMultiCatalogTransformTest {
 
     @Test
-    public void testGetPluginNameAndBuildTransform() {
+    void testCreateIdentityTransform() {
         SeaTunnelRowType rowType =
                 new SeaTunnelRowType(
                         new String[] {"id", "name"},
                         new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
                             BasicType.INT_TYPE, BasicType.STRING_TYPE
                         });
-        CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("test", 
rowType);
+        CatalogTable catalogTable =
+                CatalogTableUtil.getCatalogTable("test", "test", "test", 
"test", rowType);
         List<CatalogTable> tables = Collections.singletonList(catalogTable);
-
         ReadonlyConfig config =
                 ReadonlyConfig.fromMap(
                         Collections.singletonMap(
-                                SQLTransform.KEY_QUERY.key(), "select * from 
dual"));
+                                
TransformCommonOptions.TABLE_MATCH_REGEX.key(), ".exclude"));
+
+        TestRenameMultiCatalogTransform transform =
+                new TestRenameMultiCatalogTransform(tables, config);
+
+        Assertions.assertInstanceOf(
+                IdentityMapTransform.class,
+                transform
+                        .getTransformMap()
+                        
.get(tables.get(0).getTableId().toTablePath().toString()));
+    }
 
-        SQLMultiCatalogFlatMapTransform transform =
-                new SQLMultiCatalogFlatMapTransform(tables, config);
+    private static class TestRenameMultiCatalogTransform extends 
FieldRenameMultiCatalogTransform {
 
-        Assertions.assertEquals(SQLTransform.PLUGIN_NAME, 
transform.getPluginName());
+        private TestRenameMultiCatalogTransform(
+                List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
+            super(inputCatalogTables, config);
+        }
 
-        SeaTunnelFlatMapTransform<?> inner = 
transform.buildTransform(catalogTable, config);
-        Assertions.assertTrue(inner instanceof SQLTransform);
+        private Map<String, SeaTunnelTransform<SeaTunnelRow>> 
getTransformMap() {
+            return this.transformMap;
+        }
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
index bf9d9d42f8..a190acd524 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
@@ -21,26 +21,32 @@ import 
org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.IdentityFlatMapTransform;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
-public class SQLMultiCatalogFlatMapTransformTest {
+class SQLMultiCatalogFlatMapTransformTest {
 
     @Test
-    public void testGetPluginNameAndBuildTransform() {
+    void testGetPluginNameAndBuildTransform() {
         SeaTunnelRowType rowType =
                 new SeaTunnelRowType(
                         new String[] {"id", "name"},
                         new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
                             BasicType.INT_TYPE, BasicType.STRING_TYPE
                         });
-        CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("test", 
rowType);
+        CatalogTable catalogTable =
+                CatalogTableUtil.getCatalogTable("test", "test", "test", 
"test", rowType);
         List<CatalogTable> tables = Collections.singletonList(catalogTable);
 
         ReadonlyConfig config =
@@ -54,6 +60,45 @@ public class SQLMultiCatalogFlatMapTransformTest {
         Assertions.assertEquals(SQLTransform.PLUGIN_NAME, 
transform.getPluginName());
 
         SeaTunnelFlatMapTransform<?> inner = 
transform.buildTransform(catalogTable, config);
-        Assertions.assertTrue(inner instanceof SQLTransform);
+        Assertions.assertInstanceOf(SQLTransform.class, inner);
+    }
+
+    @Test
+    void testCreateIdentityTransform() {
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name"},
+                        new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+                            BasicType.INT_TYPE, BasicType.STRING_TYPE
+                        });
+        CatalogTable catalogTable =
+                CatalogTableUtil.getCatalogTable("test", "test", "test", 
"test", rowType);
+        List<CatalogTable> tables = Collections.singletonList(catalogTable);
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                
TransformCommonOptions.TABLE_MATCH_REGEX.key(), ".exclude"));
+
+        TestSQLMultiCatalogFlatMapTransform transform =
+                new TestSQLMultiCatalogFlatMapTransform(tables, config);
+
+        Assertions.assertInstanceOf(
+                IdentityFlatMapTransform.class,
+                transform
+                        .getTransformMap()
+                        
.get(tables.get(0).getTableId().toTablePath().toString()));
+    }
+
+    private static class TestSQLMultiCatalogFlatMapTransform
+            extends SQLMultiCatalogFlatMapTransform {
+
+        private TestSQLMultiCatalogFlatMapTransform(
+                List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
+            super(inputCatalogTables, config);
+        }
+
+        private Map<String, SeaTunnelTransform<SeaTunnelRow>> 
getTransformMap() {
+            return this.transformMap;
+        }
     }
 }


Reply via email to