This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new abf8fcd67 [Feature][SQL Transform]Add catalog support for SQL
Transform plugin (#4819)
abf8fcd67 is described below
commit abf8fcd67b3025b42fc09015527e7a4c87a55a9a
Author: Marvin <[email protected]>
AuthorDate: Thu Jun 1 11:46:14 2023 +0800
[Feature][SQL Transform]Add catalog support for SQL Transform plugin (#4819)
---
.../src/test/resources/mysqlcdc_to_mysql.conf | 14 +-
.../apache/seatunnel/transform/sql/SQLEngine.java | 4 +-
.../seatunnel/transform/sql/SQLTransform.java | 161 +++++++++++++++++++--
.../transform/sql/SQLTransformFactory.java | 11 +-
.../transform/sql/zeta/ZetaSQLEngine.java | 25 +++-
5 files changed, 196 insertions(+), 19 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
index 7b92bd986..e8d85aecc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
@@ -33,15 +33,27 @@ source {
password = "seatunnel"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+ catalog {
+ factory = MySQL
+ }
}
}
transform {
+ sql {
+ source_table_name = "customers_mysql_cdc"
+ query = """ select id, f_binary, f_blob, f_long_varbinary, f_longblob,
f_tinyblob, f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint,
+ f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal,
+ f_float, f_double, f_double_precision, f_longtext,
f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp,
+ f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar,
f_real, f_time, f_tinyint, f_tinyint_unsigned, f_json, f_year
+ from customers_mysql_cdc """
+ result_table_name = "trans_mysql_cdc"
+ }
}
sink {
jdbc {
- source_table_name = "customers_mysql_cdc"
+ source_table_name = "trans_mysql_cdc"
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user"
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
index f52858fef..b1e734c31 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
@@ -20,10 +20,12 @@ package org.apache.seatunnel.transform.sql;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import java.util.List;
+
public interface SQLEngine {
void init(String inputTableName, SeaTunnelRowType inputRowType, String
sql);
- SeaTunnelRowType typeMapping();
+ SeaTunnelRowType typeMapping(List<String> inputColumnsMapping);
SeaTunnelRow transformBySQL(SeaTunnelRow inputRow);
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index a60640b7d..20a07dcee 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -19,22 +19,40 @@ package org.apache.seatunnel.transform.sql;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
import org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType;
import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
import static
org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType.ZETA;
+@Slf4j
+@NoArgsConstructor
@AutoService(SeaTunnelTransform.class)
-public class SQLTransform extends AbstractSeaTunnelTransform {
+public class SQLTransform extends AbstractCatalogSupportTransform {
+ public static final String PLUGIN_NAME = "Sql";
public static final Option<String> KEY_QUERY =
Options.key("query").stringType().noDefaultValue().withDescription("The query
SQL");
@@ -51,22 +69,46 @@ public class SQLTransform extends
AbstractSeaTunnelTransform {
private transient SQLEngine sqlEngine;
+ public SQLTransform(@NonNull ReadonlyConfig config, @NonNull CatalogTable
catalogTable) {
+ super(catalogTable);
+ this.query = config.get(KEY_QUERY);
+ if (config.getOptional(KEY_ENGINE).isPresent()) {
+ this.engineType =
EngineType.valueOf(config.get(KEY_ENGINE).toUpperCase());
+ } else {
+ this.engineType = ZETA;
+ }
+
+ List<String> sourceTableNames =
config.get(CommonOptions.SOURCE_TABLE_NAME);
+ if (sourceTableNames != null && !sourceTableNames.isEmpty()) {
+ this.inputTableName = sourceTableNames.get(0);
+ } else {
+ this.inputTableName = catalogTable.getTableId().getTableName();
+ }
+ List<Column> columns = catalogTable.getTableSchema().getColumns();
+ String[] fieldNames = new String[columns.size()];
+ SeaTunnelDataType<?>[] fieldTypes = new
SeaTunnelDataType<?>[columns.size()];
+ for (int i = 0; i < columns.size(); i++) {
+ Column column = columns.get(i);
+ fieldNames[i] = column.getName();
+ fieldTypes[i] = column.getDataType();
+ }
+ this.inputRowType = new SeaTunnelRowType(fieldNames, fieldTypes);
+ }
+
@Override
public String getPluginName() {
- return "Sql";
+ return PLUGIN_NAME;
}
@Override
protected void setConfig(Config pluginConfig) {
- CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig,
KEY_QUERY.key());
- if (!checkResult.isSuccess()) {
- throw new IllegalArgumentException("Failed to check config! " +
checkResult.getMsg());
- }
- query = pluginConfig.getString(KEY_QUERY.key());
- if (pluginConfig.hasPath(KEY_ENGINE.key())) {
- engineType =
EngineType.valueOf(pluginConfig.getString(KEY_ENGINE.key()).toUpperCase());
+ ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(pluginConfig);
+ ConfigValidator.of(readonlyConfig).validate(new
SQLTransformFactory().optionRule());
+ this.query = readonlyConfig.get(KEY_QUERY);
+ if (readonlyConfig.getOptional(KEY_ENGINE).isPresent()) {
+ this.engineType =
EngineType.valueOf(readonlyConfig.get(KEY_ENGINE).toUpperCase());
} else {
- engineType = ZETA;
+ this.engineType = ZETA;
}
}
@@ -85,7 +127,7 @@ public class SQLTransform extends AbstractSeaTunnelTransform
{
@Override
protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType)
{
tryOpen();
- return sqlEngine.typeMapping();
+ return sqlEngine.typeMapping(null);
}
@Override
@@ -94,6 +136,97 @@ public class SQLTransform extends
AbstractSeaTunnelTransform {
return sqlEngine.transformBySQL(inputRow);
}
+ @Override
+ protected TableSchema transformTableSchema() {
+ tryOpen();
+ List<String> inputColumnsMapping = new ArrayList<>();
+ SeaTunnelRowType outRowType =
sqlEngine.typeMapping(inputColumnsMapping);
+
+ TableSchema.Builder builder = TableSchema.builder();
+ if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
+ List<String> outPkColumnNames = new ArrayList<>();
+ for (String pkColumnName :
+
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames()) {
+ for (int i = 0; i < inputColumnsMapping.size(); i++) {
+ if (pkColumnName.equals(inputColumnsMapping.get(i))) {
+ outPkColumnNames.add(outRowType.getFieldName(i));
+ }
+ }
+ }
+ if (!outPkColumnNames.isEmpty()) {
+ builder.primaryKey(
+ PrimaryKey.of(
+
inputCatalogTable.getTableSchema().getPrimaryKey().getPrimaryKey(),
+ outPkColumnNames));
+ }
+ }
+ if (inputCatalogTable.getTableSchema().getConstraintKeys() != null) {
+ List<ConstraintKey> outConstraintKey = new ArrayList<>();
+ for (ConstraintKey constraintKey :
+ inputCatalogTable.getTableSchema().getConstraintKeys()) {
+ List<ConstraintKey.ConstraintKeyColumn>
outConstraintColumnKeys = new ArrayList<>();
+ for (ConstraintKey.ConstraintKeyColumn constraintKeyColumn :
+ constraintKey.getColumnNames()) {
+ String constraintColumnName =
constraintKeyColumn.getColumnName();
+ for (int i = 0; i < inputColumnsMapping.size(); i++) {
+ if
(constraintColumnName.equals(inputColumnsMapping.get(i))) {
+ outConstraintColumnKeys.add(
+ ConstraintKey.ConstraintKeyColumn.of(
+ outRowType.getFieldName(i),
+
constraintKeyColumn.getSortType()));
+ }
+ }
+ }
+ if (!outConstraintColumnKeys.isEmpty()) {
+ outConstraintKey.add(
+ ConstraintKey.of(
+ constraintKey.getConstraintType(),
+ constraintKey.getConstraintName(),
+ outConstraintColumnKeys));
+ }
+ }
+ if (!outConstraintKey.isEmpty()) {
+ builder.constraintKey(outConstraintKey);
+ }
+ }
+
+ String[] fieldNames = outRowType.getFieldNames();
+ SeaTunnelDataType<?>[] fieldTypes = outRowType.getFieldTypes();
+ List<Column> columns = new ArrayList<>(fieldNames.length);
+ for (int i = 0; i < fieldNames.length; i++) {
+ Column simpleColumn = null;
+ String inputColumnName = inputColumnsMapping.get(i);
+ if (inputColumnName != null) {
+ for (Column inputColumn :
inputCatalogTable.getTableSchema().getColumns()) {
+ if (inputColumnName.equals(inputColumn.getName())) {
+ simpleColumn = inputColumn;
+ break;
+ }
+ }
+ }
+ Column column;
+ if (simpleColumn != null) {
+ column =
+ PhysicalColumn.of(
+ fieldNames[i],
+ fieldTypes[i],
+ simpleColumn.getColumnLength(),
+ simpleColumn.isNullable(),
+ simpleColumn.getDefaultValue(),
+ simpleColumn.getComment());
+ } else {
+ column = PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0,
true, null, null);
+ }
+ columns.add(column);
+ }
+ return builder.columns(columns).build();
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return inputCatalogTable.getTableId().copy();
+ }
+
@Override
public void close() {
sqlEngine.close();
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
index 3b97c96a7..f509af832 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
@@ -18,7 +18,10 @@
package org.apache.seatunnel.transform.sql;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import com.google.auto.service.AutoService;
@@ -29,11 +32,17 @@ import static
org.apache.seatunnel.transform.sql.SQLTransform.KEY_QUERY;
public class SQLTransformFactory implements TableTransformFactory {
@Override
public String factoryIdentifier() {
- return "Sql";
+ return SQLTransform.PLUGIN_NAME;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder().required(KEY_QUERY).build();
}
+
+ @Override
+ public TableTransform createTransform(TableFactoryContext context) {
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new SQLTransform(context.getOptions(), catalogTable);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 62fa8d901..fa491e7e6 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -38,11 +38,14 @@ import
net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.ServiceLoader;
+import java.util.stream.Collectors;
public class ZetaSQLEngine implements SQLEngine {
private String inputTableName;
+ private SeaTunnelRowType inputRowType;
private String sql;
private PlainSelect selectBody;
@@ -56,6 +59,7 @@ public class ZetaSQLEngine implements SQLEngine {
@Override
public void init(String inputTableName, SeaTunnelRowType inputRowType,
String sql) {
this.inputTableName = inputTableName;
+ this.inputRowType = inputRowType;
this.sql = sql;
List<ZetaUDF> udfList = new ArrayList<>();
@@ -140,11 +144,19 @@ public class ZetaSQLEngine implements SQLEngine {
}
@Override
- public SeaTunnelRowType typeMapping() {
+ public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
List<SelectItem> selectItems = selectBody.getSelectItems();
String[] fieldNames = new String[selectItems.size()];
SeaTunnelDataType<?>[] seaTunnelDataTypes = new
SeaTunnelDataType<?>[selectItems.size()];
+ if (inputColumnsMapping != null) {
+ for (int i = 0; i < selectItems.size(); i++) {
+ inputColumnsMapping.add(null);
+ }
+ }
+
+ List<String> inputColumnNames =
+
Arrays.stream(inputRowType.getFieldNames()).collect(Collectors.toList());
for (int i = 0; i < selectItems.size(); i++) {
SelectItem selectItem = selectItems.get(i);
@@ -162,6 +174,12 @@ public class ZetaSQLEngine implements SQLEngine {
}
}
+ if (inputColumnsMapping != null
+ && expression instanceof Column
+ && inputColumnNames.contains(((Column)
expression).getColumnName())) {
+ inputColumnsMapping.set(i, ((Column)
expression).getColumnName());
+ }
+
seaTunnelDataTypes[i] =
zetaSQLType.getExpressionType(expression);
}
}
@@ -183,7 +201,10 @@ public class ZetaSQLEngine implements SQLEngine {
// Project
Object[] outputFields = project(inputFields);
- return new SeaTunnelRow(outputFields);
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(outputFields);
+ seaTunnelRow.setRowKind(inputRow.getRowKind());
+ seaTunnelRow.setTableId(inputRow.getTableId());
+ return seaTunnelRow;
}
private Object[] scanTable(SeaTunnelRow inputRow) {