This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4830dd5512 [Fix][Transform-V2] Fix multiTable SQL transform (#10263)
4830dd5512 is described below
commit 4830dd55127accb9f627c7ea0c37e681bc0bf7d5
Author: dy102 <[email protected]>
AuthorDate: Wed Feb 4 20:58:13 2026 +0900
[Fix][Transform-V2] Fix multiTable SQL transform (#10263)
---
.../apache/seatunnel/e2e/transform/TestSQLIT.java | 2 +-
.../DefineSinkTypeMultiCatalogTransform.java | 6 +++
.../common/AbstractMultiCatalogTransform.java | 39 ++--------------
.../IdentityFlatMapTransform.java} | 34 +++++++++-----
.../IdentityMapTransform.java} | 35 ++++++++------
.../copy/CopyFieldMultiCatalogTransform.java | 6 +++
.../DynamicCompileMultiCatalogTransform.java | 10 +++-
.../FieldMapperMultiCatalogTransform.java | 6 +++
.../filter/FieldFieldMultiCatalogTransform.java | 6 +++
.../FieldRowKindMultiCatalogTransform.java | 6 +++
.../jsonpath/JsonPathMultiCatalogTransform.java | 10 +++-
.../metadata/MetadataMultiCatalogTransform.java | 6 +++
.../embedding/EmbeddingMultiCatalogTransform.java | 10 +++-
.../nlpmodel/llm/LLMMultiCatalogTransform.java | 10 +++-
.../RegexExtractMultiCatalogTransform.java | 10 +++-
.../rename/FieldRenameMultiCatalogTransform.java | 6 +++
.../rename/TableRenameMultiCatalogTransform.java | 6 +++
.../replace/ReplaceMultiCatalogTransform.java | 6 +++
.../RowKindExtractorMultiCatalogTransform.java | 6 +++
.../split/SplitMultiCatalogTransform.java | 6 +++
.../sql/SQLMultiCatalogFlatMapTransform.java | 7 +++
.../table/TableFilterMultiCatalogTransform.java | 6 +++
.../table/TableMergeMultiCatalogTransform.java | 8 +++-
.../FieldRenameMultiCatalogTransformTest.java} | 41 ++++++++++++-----
.../sql/SQLMultiCatalogFlatMapTransformTest.java | 53 ++++++++++++++++++++--
25 files changed, 252 insertions(+), 89 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index 2e0a2a0851..4dc0c776e4 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -105,7 +105,7 @@ public class TestSQLIT extends TestSuiteBase {
@TestTemplate
public void testSQLTransformMultiTable(TestContainer container)
throws IOException, InterruptedException {
- Container.ExecResult sqlTransform =
container.executeJob("/sql_transform.conf");
+ Container.ExecResult sqlTransform =
container.executeJob("/sql_transform_multi_table.conf");
Assertions.assertEquals(0, sqlTransform.getExitCode());
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/adaptsink/DefineSinkTypeMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/adaptsink/DefineSinkTypeMultiCatalogTransform.java
index 31d629421d..e51d8386e6 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/adaptsink/DefineSinkTypeMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/adaptsink/DefineSinkTypeMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -45,4 +46,9 @@ public class DefineSinkTypeMultiCatalogTransform extends
AbstractMultiCatalogMap
CatalogTable table, ReadonlyConfig config) {
return new
DefineSinkTypeTransform(DefineSinkTypeTransformConfig.of(config), table);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogTransform.java
index 49fa561979..e432afb05e 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogTransform.java
@@ -19,8 +19,6 @@ package org.apache.seatunnel.transform.common;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -73,7 +71,7 @@ public abstract class AbstractMultiCatalogTransform
implements SeaTunnelTransfor
if (tableConfig != null) {
transformMap.put(tableId,
buildTransform(inputCatalogTable, tableConfig));
} else {
- transformMap.put(tableId, new
IdentityTransform(inputCatalogTable));
+ transformMap.put(tableId,
createIdentityTransform(inputCatalogTable));
}
});
@@ -91,6 +89,9 @@ public abstract class AbstractMultiCatalogTransform
implements SeaTunnelTransfor
protected abstract SeaTunnelTransform<SeaTunnelRow> buildTransform(
CatalogTable inputCatalogTable, ReadonlyConfig config);
+ protected abstract SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(
+ CatalogTable catalogTable);
+
@Override
public List<CatalogTable> getProducedCatalogTables() {
return outputCatalogTables;
@@ -103,36 +104,4 @@ public abstract class AbstractMultiCatalogTransform
implements SeaTunnelTransfor
@Override
public void setTypeInfo(SeaTunnelDataType<SeaTunnelRow> inputDataType) {}
-
- public static class IdentityTransform extends
AbstractCatalogSupportMapTransform {
- private final CatalogTable catalogTable;
-
- @Override
- public String getPluginName() {
- return "Identity";
- }
-
- public IdentityTransform(CatalogTable catalogTable) {
- super(catalogTable);
- this.catalogTable = catalogTable;
- }
-
- @Override
- protected SeaTunnelRow transformRow(SeaTunnelRow row) {
- return row;
- }
-
- @Override
- protected TableSchema transformTableSchema() {
- return catalogTable.getTableSchema();
- }
-
- @Override
- protected TableIdentifier transformTableIdentifier() {
- return catalogTable.getTableId();
- }
-
- @Override
- public void setTypeInfo(SeaTunnelDataType<SeaTunnelRow> inputDataType)
{}
- }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityFlatMapTransform.java
similarity index 54%
copy from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
copy to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityFlatMapTransform.java
index c12709e633..d1045b59e6 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityFlatMapTransform.java
@@ -15,31 +15,41 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform.metadata;
+package org.apache.seatunnel.transform.common;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import java.util.Collections;
import java.util.List;
-public class MetadataMultiCatalogTransform extends
AbstractMultiCatalogMapTransform {
+public class IdentityFlatMapTransform extends
AbstractCatalogSupportFlatMapTransform {
+ private final CatalogTable catalogTable;
- public MetadataMultiCatalogTransform(
- List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
- super(inputCatalogTables, config);
+ public IdentityFlatMapTransform(CatalogTable catalogTable) {
+ super(catalogTable);
+ this.catalogTable = catalogTable;
}
@Override
public String getPluginName() {
- return MetadataTransformConfig.PLUGIN_NAME;
+ return "IdentityFlatMap";
}
@Override
- protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
- CatalogTable inputCatalogTable, ReadonlyConfig config) {
- return new MetadataTransform(config, inputCatalogTable);
+ protected List<SeaTunnelRow> transformRow(SeaTunnelRow row) {
+ return Collections.singletonList(row);
+ }
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ return catalogTable.getTableSchema();
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return catalogTable.getTableId();
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityMapTransform.java
similarity index 56%
copy from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
copy to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityMapTransform.java
index c12709e633..82f2c0a9c5 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/IdentityMapTransform.java
@@ -15,31 +15,38 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform.metadata;
+package org.apache.seatunnel.transform.common;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
-import java.util.List;
+public class IdentityMapTransform extends AbstractCatalogSupportMapTransform {
+ private final CatalogTable catalogTable;
-public class MetadataMultiCatalogTransform extends
AbstractMultiCatalogMapTransform {
-
- public MetadataMultiCatalogTransform(
- List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
- super(inputCatalogTables, config);
+ public IdentityMapTransform(CatalogTable catalogTable) {
+ super(catalogTable);
+ this.catalogTable = catalogTable;
}
@Override
public String getPluginName() {
- return MetadataTransformConfig.PLUGIN_NAME;
+ return "IdentityMap";
+ }
+
+ @Override
+ protected SeaTunnelRow transformRow(SeaTunnelRow row) {
+ return row;
+ }
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ return catalogTable.getTableSchema();
}
@Override
- protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
- CatalogTable inputCatalogTable, ReadonlyConfig config) {
- return new MetadataTransform(config, inputCatalogTable);
+ protected TableIdentifier transformTableIdentifier() {
+ return catalogTable.getTableId();
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java
index 6ed09e211f..542c90cc6e 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -42,4 +43,9 @@ public class CopyFieldMultiCatalogTransform extends
AbstractMultiCatalogMapTrans
CatalogTable inputCatalogTable, ReadonlyConfig config) {
return new CopyFieldTransform(CopyTransformConfig.of(config),
inputCatalogTable);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java
index aa699b52ea..8c3fa4432e 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -32,6 +33,11 @@ public class DynamicCompileMultiCatalogTransform extends
AbstractMultiCatalogMap
super(inputCatalogTables, config);
}
+ @Override
+ public String getPluginName() {
+ return DynamicCompileTransform.PLUGIN_NAME;
+ }
+
@Override
protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
CatalogTable inputCatalogTable, ReadonlyConfig config) {
@@ -39,7 +45,7 @@ public class DynamicCompileMultiCatalogTransform extends
AbstractMultiCatalogMap
}
@Override
- public String getPluginName() {
- return DynamicCompileTransform.PLUGIN_NAME;
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java
index 259f1b3163..96317f4656 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -42,4 +43,9 @@ public class FieldMapperMultiCatalogTransform extends
AbstractMultiCatalogMapTra
CatalogTable inputCatalogTable, ReadonlyConfig config) {
return new FieldMapperTransform(FieldMapperTransformConfig.of(config),
inputCatalogTable);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
index f1a3b86c98..f281b3d855 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -42,4 +43,9 @@ public class FieldFieldMultiCatalogTransform extends
AbstractMultiCatalogMapTran
CatalogTable inputCatalogTable, ReadonlyConfig config) {
return new FilterFieldTransform(config, inputCatalogTable);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java
index e17411933d..e3b492f5c2 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -42,4 +43,9 @@ public class FieldRowKindMultiCatalogTransform extends
AbstractMultiCatalogMapTr
CatalogTable inputCatalogTable, ReadonlyConfig config) {
return new FilterRowKindTransform(config, inputCatalogTable);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java
index f8e096d34f..17b3614a5b 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -31,6 +32,11 @@ public class JsonPathMultiCatalogTransform extends
AbstractMultiCatalogMapTransf
super(inputCatalogTables, config);
}
+ @Override
+ public String getPluginName() {
+ return "JsonPath";
+ }
+
@Override
protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
CatalogTable inputCatalogTable, ReadonlyConfig config) {
@@ -39,7 +45,7 @@ public class JsonPathMultiCatalogTransform extends
AbstractMultiCatalogMapTransf
}
@Override
- public String getPluginName() {
- return "JsonPath";
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
index c12709e633..6f1c970bc1 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -42,4 +43,9 @@ public class MetadataMultiCatalogTransform extends
AbstractMultiCatalogMapTransf
CatalogTable inputCatalogTable, ReadonlyConfig config) {
return new MetadataTransform(config, inputCatalogTable);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingMultiCatalogTransform.java
index b32181d067..21f7e1d145 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -31,6 +32,11 @@ public class EmbeddingMultiCatalogTransform extends
AbstractMultiCatalogMapTrans
super(inputCatalogTables, config);
}
+ @Override
+ public String getPluginName() {
+ return "Embedding";
+ }
+
@Override
protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
CatalogTable inputCatalogTable, ReadonlyConfig config) {
@@ -38,7 +44,7 @@ public class EmbeddingMultiCatalogTransform extends
AbstractMultiCatalogMapTrans
}
@Override
- public String getPluginName() {
- return "Embedding";
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMMultiCatalogTransform.java
index 2331aae805..987771c1d3 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -30,6 +31,11 @@ public class LLMMultiCatalogTransform extends
AbstractMultiCatalogMapTransform {
super(inputCatalogTables, config);
}
+ @Override
+ public String getPluginName() {
+ return "LLM";
+ }
+
@Override
protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
CatalogTable inputCatalogTable, ReadonlyConfig config) {
@@ -37,7 +43,7 @@ public class LLMMultiCatalogTransform extends
AbstractMultiCatalogMapTransform {
}
@Override
- public String getPluginName() {
- return "LLM";
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractMultiCatalogTransform.java
index 937f69f725..1694c88fd8 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -32,6 +33,11 @@ public class RegexExtractMultiCatalogTransform extends
AbstractMultiCatalogMapTr
super(inputCatalogTables, config);
}
+ @Override
+ public String getPluginName() {
+ return RegexExtractTransform.PLUGIN_NAME;
+ }
+
@Override
protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
CatalogTable inputCatalogTable, ReadonlyConfig config) {
@@ -39,7 +45,7 @@ public class RegexExtractMultiCatalogTransform extends
AbstractMultiCatalogMapTr
}
@Override
- public String getPluginName() {
- return RegexExtractTransform.PLUGIN_NAME;
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
index dd0f61e4e4..d670a788a5 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -42,4 +43,9 @@ public class FieldRenameMultiCatalogTransform extends
AbstractMultiCatalogMapTra
CatalogTable table, ReadonlyConfig config) {
return new FieldRenameTransform(FieldRenameConfig.of(config), table);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
index 67cff881da..e8eae6355e 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -42,4 +43,9 @@ public class TableRenameMultiCatalogTransform extends
AbstractMultiCatalogMapTra
CatalogTable table, ReadonlyConfig config) {
return new TableRenameTransform(TableRenameConfig.of(config), table);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceMultiCatalogTransform.java
index 54d6077541..19f3bd4677 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -42,4 +43,9 @@ public class ReplaceMultiCatalogTransform extends
AbstractMultiCatalogMapTransfo
CatalogTable inputCatalogTable, ReadonlyConfig config) {
return new ReplaceTransform(config, inputCatalogTable);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorMultiCatalogTransform.java
index 0c7815c584..5eb82c1691 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -42,4 +43,9 @@ public class RowKindExtractorMultiCatalogTransform extends
AbstractMultiCatalogM
CatalogTable inputCatalogTable, ReadonlyConfig config) {
return new RowKindExtractorTransform(config, inputCatalogTable);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitMultiCatalogTransform.java
index 52f033b10a..6dde024f44 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import java.util.List;
@@ -42,4 +43,9 @@ public class SplitMultiCatalogTransform extends
AbstractMultiCatalogMapTransform
CatalogTable inputCatalogTable, ReadonlyConfig config) {
return new SplitTransform(SplitTransformConfig.of(config),
inputCatalogTable);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransform.java
index 0bf14feaec..732717dbdf 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransform.java
@@ -21,7 +21,9 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import
org.apache.seatunnel.transform.common.AbstractMultiCatalogFlatMapTransform;
+import org.apache.seatunnel.transform.common.IdentityFlatMapTransform;
import java.util.List;
@@ -42,4 +44,9 @@ public class SQLMultiCatalogFlatMapTransform extends
AbstractMultiCatalogFlatMap
CatalogTable inputCatalogTable, ReadonlyConfig config) {
return new SQLTransform(config, inputCatalogTable);
}
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityFlatMapTransform(catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
index 8c85b7c4c6..064c62efb1 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import lombok.extern.slf4j.Slf4j;
@@ -61,6 +62,11 @@ public class TableFilterMultiCatalogTransform extends
AbstractMultiCatalogMapTra
return new TableFilterTransform(include, table);
}
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
+
@Override
public List<CatalogTable> getProducedCatalogTables() {
List<CatalogTable> outputTables = new ArrayList<>();
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableMergeMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableMergeMultiCatalogTransform.java
index 406ba03edb..c7dcd7219c 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableMergeMultiCatalogTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableMergeMultiCatalogTransform.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
import lombok.extern.slf4j.Slf4j;
@@ -51,6 +52,11 @@ public class TableMergeMultiCatalogTransform extends
AbstractMultiCatalogMapTran
return new TableMergeTransform(TableMergeConfig.of(config), table);
}
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow>
createIdentityTransform(CatalogTable catalogTable) {
+ return new IdentityMapTransform(catalogTable);
+ }
+
@Override
public List<CatalogTable> getProducedCatalogTables() {
List<CatalogTable> outputTables = new ArrayList<>();
@@ -62,7 +68,7 @@ public class TableMergeMultiCatalogTransform extends
AbstractMultiCatalogMapTran
String tableId = outputTable.getTablePath().getFullName();
SeaTunnelTransform<SeaTunnelRow> transform =
transformMap.get(tableId);
- if (transform instanceof IdentityTransform) {
+ if (transform instanceof IdentityMapTransform) {
outputTables.add(outputTable);
} else {
if (!mergeTables.containsKey(tableId)) {
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransformTest.java
similarity index 55%
copy from
seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
copy to
seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransformTest.java
index bf9d9d42f8..52c824b955 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransformTest.java
@@ -15,45 +15,62 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform.sql;
+package org.apache.seatunnel.transform.rename;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.IdentityMapTransform;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
-public class SQLMultiCatalogFlatMapTransformTest {
+class FieldRenameMultiCatalogTransformTest {
@Test
- public void testGetPluginNameAndBuildTransform() {
+ void testCreateIdentityTransform() {
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"id", "name"},
new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
BasicType.INT_TYPE, BasicType.STRING_TYPE
});
- CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("test",
rowType);
+ CatalogTable catalogTable =
+ CatalogTableUtil.getCatalogTable("test", "test", "test",
"test", rowType);
List<CatalogTable> tables = Collections.singletonList(catalogTable);
-
ReadonlyConfig config =
ReadonlyConfig.fromMap(
Collections.singletonMap(
- SQLTransform.KEY_QUERY.key(), "select * from
dual"));
+
TransformCommonOptions.TABLE_MATCH_REGEX.key(), ".exclude"));
+
+ TestRenameMultiCatalogTransform transform =
+ new TestRenameMultiCatalogTransform(tables, config);
+
+ Assertions.assertInstanceOf(
+ IdentityMapTransform.class,
+ transform
+ .getTransformMap()
+
.get(tables.get(0).getTableId().toTablePath().toString()));
+ }
- SQLMultiCatalogFlatMapTransform transform =
- new SQLMultiCatalogFlatMapTransform(tables, config);
+ private static class TestRenameMultiCatalogTransform extends
FieldRenameMultiCatalogTransform {
- Assertions.assertEquals(SQLTransform.PLUGIN_NAME,
transform.getPluginName());
+ private TestRenameMultiCatalogTransform(
+ List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
+ super(inputCatalogTables, config);
+ }
- SeaTunnelFlatMapTransform<?> inner =
transform.buildTransform(catalogTable, config);
- Assertions.assertTrue(inner instanceof SQLTransform);
+ private Map<String, SeaTunnelTransform<SeaTunnelRow>>
getTransformMap() {
+ return this.transformMap;
+ }
}
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
index bf9d9d42f8..a190acd524 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogFlatMapTransformTest.java
@@ -21,26 +21,32 @@ import
org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.IdentityFlatMapTransform;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
-public class SQLMultiCatalogFlatMapTransformTest {
+class SQLMultiCatalogFlatMapTransformTest {
@Test
- public void testGetPluginNameAndBuildTransform() {
+ void testGetPluginNameAndBuildTransform() {
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"id", "name"},
new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
BasicType.INT_TYPE, BasicType.STRING_TYPE
});
- CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("test",
rowType);
+ CatalogTable catalogTable =
+ CatalogTableUtil.getCatalogTable("test", "test", "test",
"test", rowType);
List<CatalogTable> tables = Collections.singletonList(catalogTable);
ReadonlyConfig config =
@@ -54,6 +60,45 @@ public class SQLMultiCatalogFlatMapTransformTest {
Assertions.assertEquals(SQLTransform.PLUGIN_NAME,
transform.getPluginName());
SeaTunnelFlatMapTransform<?> inner =
transform.buildTransform(catalogTable, config);
- Assertions.assertTrue(inner instanceof SQLTransform);
+ Assertions.assertInstanceOf(SQLTransform.class, inner);
+ }
+
+ @Test
+ void testCreateIdentityTransform() {
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name"},
+ new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+ BasicType.INT_TYPE, BasicType.STRING_TYPE
+ });
+ CatalogTable catalogTable =
+ CatalogTableUtil.getCatalogTable("test", "test", "test",
"test", rowType);
+ List<CatalogTable> tables = Collections.singletonList(catalogTable);
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ Collections.singletonMap(
+
TransformCommonOptions.TABLE_MATCH_REGEX.key(), ".exclude"));
+
+ TestSQLMultiCatalogFlatMapTransform transform =
+ new TestSQLMultiCatalogFlatMapTransform(tables, config);
+
+ Assertions.assertInstanceOf(
+ IdentityFlatMapTransform.class,
+ transform
+ .getTransformMap()
+
.get(tables.get(0).getTableId().toTablePath().toString()));
+ }
+
+ private static class TestSQLMultiCatalogFlatMapTransform
+ extends SQLMultiCatalogFlatMapTransform {
+
+ private TestSQLMultiCatalogFlatMapTransform(
+ List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
+ super(inputCatalogTables, config);
+ }
+
+ private Map<String, SeaTunnelTransform<SeaTunnelRow>>
getTransformMap() {
+ return this.transformMap;
+ }
}
}