This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new abf8fcd67 [Feature][SQL Transform]Add catalog support for SQL 
Transform plugin (#4819)
abf8fcd67 is described below

commit abf8fcd67b3025b42fc09015527e7a4c87a55a9a
Author: Marvin <[email protected]>
AuthorDate: Thu Jun 1 11:46:14 2023 +0800

    [Feature][SQL Transform]Add catalog support for SQL Transform plugin (#4819)
---
 .../src/test/resources/mysqlcdc_to_mysql.conf      |  14 +-
 .../apache/seatunnel/transform/sql/SQLEngine.java  |   4 +-
 .../seatunnel/transform/sql/SQLTransform.java      | 161 +++++++++++++++++++--
 .../transform/sql/SQLTransformFactory.java         |  11 +-
 .../transform/sql/zeta/ZetaSQLEngine.java          |  25 +++-
 5 files changed, 196 insertions(+), 19 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
index 7b92bd986..e8d85aecc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
@@ -33,15 +33,27 @@ source {
     password = "seatunnel"
     table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+    catalog {
+      factory = MySQL
+    }
   }
 }
 
 transform {
+  sql {
+    source_table_name = "customers_mysql_cdc"
+    query = """ select id, f_binary, f_blob, f_long_varbinary, f_longblob, 
f_tinyblob, f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint,
+                f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, 
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal,
+                f_float, f_double, f_double_precision, f_longtext, 
f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp,
+                f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time, f_tinyint, f_tinyint_unsigned, f_json, f_year
+                from customers_mysql_cdc """
+    result_table_name = "trans_mysql_cdc"
+  }
 }
 
 sink {
   jdbc {
-    source_table_name = "customers_mysql_cdc"
+    source_table_name = "trans_mysql_cdc"
     url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
     driver = "com.mysql.cj.jdbc.Driver"
     user = "st_user"
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
index f52858fef..b1e734c31 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
@@ -20,10 +20,12 @@ package org.apache.seatunnel.transform.sql;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
+import java.util.List;
+
 public interface SQLEngine {
     void init(String inputTableName, SeaTunnelRowType inputRowType, String 
sql);
 
-    SeaTunnelRowType typeMapping();
+    SeaTunnelRowType typeMapping(List<String> inputColumnsMapping);
 
     SeaTunnelRow transformBySQL(SeaTunnelRow inputRow);
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index a60640b7d..20a07dcee 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -19,22 +19,40 @@ package org.apache.seatunnel.transform.sql;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
 import org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType;
 
 import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
 
 import static 
org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType.ZETA;
 
+@Slf4j
+@NoArgsConstructor
 @AutoService(SeaTunnelTransform.class)
-public class SQLTransform extends AbstractSeaTunnelTransform {
+public class SQLTransform extends AbstractCatalogSupportTransform {
+    public static final String PLUGIN_NAME = "Sql";
 
     public static final Option<String> KEY_QUERY =
             
Options.key("query").stringType().noDefaultValue().withDescription("The query 
SQL");
@@ -51,22 +69,46 @@ public class SQLTransform extends 
AbstractSeaTunnelTransform {
 
     private transient SQLEngine sqlEngine;
 
+    public SQLTransform(@NonNull ReadonlyConfig config, @NonNull CatalogTable 
catalogTable) {
+        super(catalogTable);
+        this.query = config.get(KEY_QUERY);
+        if (config.getOptional(KEY_ENGINE).isPresent()) {
+            this.engineType = 
EngineType.valueOf(config.get(KEY_ENGINE).toUpperCase());
+        } else {
+            this.engineType = ZETA;
+        }
+
+        List<String> sourceTableNames = 
config.get(CommonOptions.SOURCE_TABLE_NAME);
+        if (sourceTableNames != null && !sourceTableNames.isEmpty()) {
+            this.inputTableName = sourceTableNames.get(0);
+        } else {
+            this.inputTableName = catalogTable.getTableId().getTableName();
+        }
+        List<Column> columns = catalogTable.getTableSchema().getColumns();
+        String[] fieldNames = new String[columns.size()];
+        SeaTunnelDataType<?>[] fieldTypes = new 
SeaTunnelDataType<?>[columns.size()];
+        for (int i = 0; i < columns.size(); i++) {
+            Column column = columns.get(i);
+            fieldNames[i] = column.getName();
+            fieldTypes[i] = column.getDataType();
+        }
+        this.inputRowType = new SeaTunnelRowType(fieldNames, fieldTypes);
+    }
+
     @Override
     public String getPluginName() {
-        return "Sql";
+        return PLUGIN_NAME;
     }
 
     @Override
     protected void setConfig(Config pluginConfig) {
-        CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, 
KEY_QUERY.key());
-        if (!checkResult.isSuccess()) {
-            throw new IllegalArgumentException("Failed to check config! " + 
checkResult.getMsg());
-        }
-        query = pluginConfig.getString(KEY_QUERY.key());
-        if (pluginConfig.hasPath(KEY_ENGINE.key())) {
-            engineType = 
EngineType.valueOf(pluginConfig.getString(KEY_ENGINE.key()).toUpperCase());
+        ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(pluginConfig);
+        ConfigValidator.of(readonlyConfig).validate(new 
SQLTransformFactory().optionRule());
+        this.query = readonlyConfig.get(KEY_QUERY);
+        if (readonlyConfig.getOptional(KEY_ENGINE).isPresent()) {
+            this.engineType = 
EngineType.valueOf(readonlyConfig.get(KEY_ENGINE).toUpperCase());
         } else {
-            engineType = ZETA;
+            this.engineType = ZETA;
         }
     }
 
@@ -85,7 +127,7 @@ public class SQLTransform extends AbstractSeaTunnelTransform 
{
     @Override
     protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) 
{
         tryOpen();
-        return sqlEngine.typeMapping();
+        return sqlEngine.typeMapping(null);
     }
 
     @Override
@@ -94,6 +136,97 @@ public class SQLTransform extends 
AbstractSeaTunnelTransform {
         return sqlEngine.transformBySQL(inputRow);
     }
 
+    @Override
+    protected TableSchema transformTableSchema() {
+        tryOpen();
+        List<String> inputColumnsMapping = new ArrayList<>();
+        SeaTunnelRowType outRowType = 
sqlEngine.typeMapping(inputColumnsMapping);
+
+        TableSchema.Builder builder = TableSchema.builder();
+        if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
+            List<String> outPkColumnNames = new ArrayList<>();
+            for (String pkColumnName :
+                    
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames()) {
+                for (int i = 0; i < inputColumnsMapping.size(); i++) {
+                    if (pkColumnName.equals(inputColumnsMapping.get(i))) {
+                        outPkColumnNames.add(outRowType.getFieldName(i));
+                    }
+                }
+            }
+            if (!outPkColumnNames.isEmpty()) {
+                builder.primaryKey(
+                        PrimaryKey.of(
+                                
inputCatalogTable.getTableSchema().getPrimaryKey().getPrimaryKey(),
+                                outPkColumnNames));
+            }
+        }
+        if (inputCatalogTable.getTableSchema().getConstraintKeys() != null) {
+            List<ConstraintKey> outConstraintKey = new ArrayList<>();
+            for (ConstraintKey constraintKey :
+                    inputCatalogTable.getTableSchema().getConstraintKeys()) {
+                List<ConstraintKey.ConstraintKeyColumn> 
outConstraintColumnKeys = new ArrayList<>();
+                for (ConstraintKey.ConstraintKeyColumn constraintKeyColumn :
+                        constraintKey.getColumnNames()) {
+                    String constraintColumnName = 
constraintKeyColumn.getColumnName();
+                    for (int i = 0; i < inputColumnsMapping.size(); i++) {
+                        if 
(constraintColumnName.equals(inputColumnsMapping.get(i))) {
+                            outConstraintColumnKeys.add(
+                                    ConstraintKey.ConstraintKeyColumn.of(
+                                            outRowType.getFieldName(i),
+                                            
constraintKeyColumn.getSortType()));
+                        }
+                    }
+                }
+                if (!outConstraintColumnKeys.isEmpty()) {
+                    outConstraintKey.add(
+                            ConstraintKey.of(
+                                    constraintKey.getConstraintType(),
+                                    constraintKey.getConstraintName(),
+                                    outConstraintColumnKeys));
+                }
+            }
+            if (!outConstraintKey.isEmpty()) {
+                builder.constraintKey(outConstraintKey);
+            }
+        }
+
+        String[] fieldNames = outRowType.getFieldNames();
+        SeaTunnelDataType<?>[] fieldTypes = outRowType.getFieldTypes();
+        List<Column> columns = new ArrayList<>(fieldNames.length);
+        for (int i = 0; i < fieldNames.length; i++) {
+            Column simpleColumn = null;
+            String inputColumnName = inputColumnsMapping.get(i);
+            if (inputColumnName != null) {
+                for (Column inputColumn : 
inputCatalogTable.getTableSchema().getColumns()) {
+                    if (inputColumnName.equals(inputColumn.getName())) {
+                        simpleColumn = inputColumn;
+                        break;
+                    }
+                }
+            }
+            Column column;
+            if (simpleColumn != null) {
+                column =
+                        PhysicalColumn.of(
+                                fieldNames[i],
+                                fieldTypes[i],
+                                simpleColumn.getColumnLength(),
+                                simpleColumn.isNullable(),
+                                simpleColumn.getDefaultValue(),
+                                simpleColumn.getComment());
+            } else {
+                column = PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, 
true, null, null);
+            }
+            columns.add(column);
+        }
+        return builder.columns(columns).build();
+    }
+
+    @Override
+    protected TableIdentifier transformTableIdentifier() {
+        return inputCatalogTable.getTableId().copy();
+    }
+
     @Override
     public void close() {
         sqlEngine.close();
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
index 3b97c96a7..f509af832 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
@@ -18,7 +18,10 @@
 package org.apache.seatunnel.transform.sql;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 
 import com.google.auto.service.AutoService;
@@ -29,11 +32,17 @@ import static 
org.apache.seatunnel.transform.sql.SQLTransform.KEY_QUERY;
 public class SQLTransformFactory implements TableTransformFactory {
     @Override
     public String factoryIdentifier() {
-        return "Sql";
+        return SQLTransform.PLUGIN_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder().required(KEY_QUERY).build();
     }
+
+    @Override
+    public TableTransform createTransform(TableFactoryContext context) {
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new SQLTransform(context.getOptions(), catalogTable);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 62fa8d901..fa491e7e6 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -38,11 +38,14 @@ import 
net.sf.jsqlparser.statement.select.SelectExpressionItem;
 import net.sf.jsqlparser.statement.select.SelectItem;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.ServiceLoader;
+import java.util.stream.Collectors;
 
 public class ZetaSQLEngine implements SQLEngine {
     private String inputTableName;
+    private SeaTunnelRowType inputRowType;
 
     private String sql;
     private PlainSelect selectBody;
@@ -56,6 +59,7 @@ public class ZetaSQLEngine implements SQLEngine {
     @Override
     public void init(String inputTableName, SeaTunnelRowType inputRowType, 
String sql) {
         this.inputTableName = inputTableName;
+        this.inputRowType = inputRowType;
         this.sql = sql;
 
         List<ZetaUDF> udfList = new ArrayList<>();
@@ -140,11 +144,19 @@ public class ZetaSQLEngine implements SQLEngine {
     }
 
     @Override
-    public SeaTunnelRowType typeMapping() {
+    public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
         List<SelectItem> selectItems = selectBody.getSelectItems();
 
         String[] fieldNames = new String[selectItems.size()];
         SeaTunnelDataType<?>[] seaTunnelDataTypes = new 
SeaTunnelDataType<?>[selectItems.size()];
+        if (inputColumnsMapping != null) {
+            for (int i = 0; i < selectItems.size(); i++) {
+                inputColumnsMapping.add(null);
+            }
+        }
+
+        List<String> inputColumnNames =
+                
Arrays.stream(inputRowType.getFieldNames()).collect(Collectors.toList());
 
         for (int i = 0; i < selectItems.size(); i++) {
             SelectItem selectItem = selectItems.get(i);
@@ -162,6 +174,12 @@ public class ZetaSQLEngine implements SQLEngine {
                     }
                 }
 
+                if (inputColumnsMapping != null
+                        && expression instanceof Column
+                        && inputColumnNames.contains(((Column) 
expression).getColumnName())) {
+                    inputColumnsMapping.set(i, ((Column) 
expression).getColumnName());
+                }
+
                 seaTunnelDataTypes[i] = 
zetaSQLType.getExpressionType(expression);
             }
         }
@@ -183,7 +201,10 @@ public class ZetaSQLEngine implements SQLEngine {
         // Project
         Object[] outputFields = project(inputFields);
 
-        return new SeaTunnelRow(outputFields);
+        SeaTunnelRow seaTunnelRow = new SeaTunnelRow(outputFields);
+        seaTunnelRow.setRowKind(inputRow.getRowKind());
+        seaTunnelRow.setTableId(inputRow.getTableId());
+        return seaTunnelRow;
     }
 
     private Object[] scanTable(SeaTunnelRow inputRow) {

Reply via email to