This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new ce11d265 [feature] add transform sql interface (#102)
ce11d265 is described below

commit ce11d265196ebc1ff3507bbe17e015fcaf047260
Author: XiaoJiang521 <[email protected]>
AuthorDate: Thu Aug 24 14:07:45 2023 +0800

    [feature] add transform sql interface (#102)
---
 .../app/controller/SchemaDerivationController.java |  49 ++++++
 .../app/service/ISchemaDerivationService.java      |  26 +++
 .../service/impl/SchemaDerivationServiceImpl.java  | 186 +++++++++++++++++++++
 3 files changed, 261 insertions(+)

diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SchemaDerivationController.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SchemaDerivationController.java
new file mode 100644
index 00000000..03d2afb0
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SchemaDerivationController.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.domain.request.job.TableSchemaReq;
+import org.apache.seatunnel.app.domain.request.job.transform.SQL;
+import org.apache.seatunnel.app.service.ISchemaDerivationService;
+
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import io.swagger.annotations.ApiParam;
+
+import javax.annotation.Resource;
+
+@RestController
+@RequestMapping("/seatunnel/api/v1/schema/derivation")
+public class SchemaDerivationController {
+
+    @Resource private ISchemaDerivationService schemaDerivationService;
+
+    @PostMapping("/sql")
+    Result<TableSchemaReq> SQLSchemaDerivation(
+            @ApiParam(value = "job version id", required = true) @RequestParam 
long jobVersionId,
+            @ApiParam(value = "inputPluginId", required = true) @RequestParam 
String inputPluginId,
+            @RequestBody SQL sql) {
+        return Result.success(
+                schemaDerivationService.derivationSQL(jobVersionId, 
inputPluginId, sql));
+    }
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ISchemaDerivationService.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ISchemaDerivationService.java
new file mode 100644
index 00000000..922f24d8
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ISchemaDerivationService.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.service;
+
+import org.apache.seatunnel.app.domain.request.job.TableSchemaReq;
+import org.apache.seatunnel.app.domain.request.job.transform.SQL;
+
+public interface ISchemaDerivationService {
+
+    TableSchemaReq derivationSQL(long jobVersionId, String inputPluginId, SQL 
sql);
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java
new file mode 100644
index 00000000..03cacb2b
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.service.impl;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.app.domain.request.job.DatabaseTableSchemaReq;
+import org.apache.seatunnel.app.domain.request.job.PluginConfig;
+import org.apache.seatunnel.app.domain.request.job.TableSchemaReq;
+import org.apache.seatunnel.app.domain.request.job.transform.SQL;
+import org.apache.seatunnel.app.service.IJobTaskService;
+import org.apache.seatunnel.app.service.ISchemaDerivationService;
+import org.apache.seatunnel.datasource.plugin.api.model.TableField;
+import org.apache.seatunnel.transform.sql.SQLTransform;
+
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Service
+public class SchemaDerivationServiceImpl implements ISchemaDerivationService {
+
+    @Resource private IJobTaskService jobTaskService;
+
+    private static final Pattern decimalPattern = 
Pattern.compile("DECIMAL\\((\\d+), (\\d+)\\)");
+
+    @Override
+    public TableSchemaReq derivationSQL(long jobVersionId, String 
inputPluginId, SQL sql) {
+
+        PluginConfig pluginConfig = jobTaskService.getSingleTask(jobVersionId, 
inputPluginId);
+        TableTransformFactory factory =
+                FactoryUtil.discoverFactory(
+                        Thread.currentThread().getContextClassLoader(),
+                        TableTransformFactory.class,
+                        "Sql");
+        List<DatabaseTableSchemaReq> tableSchemaReqs = 
pluginConfig.getOutputSchema();
+        if (tableSchemaReqs.isEmpty()) {
+            throw new IllegalArgumentException("outputSchema is empty, please 
add input plugin");
+        }
+        DatabaseTableSchemaReq tableSchema = tableSchemaReqs.get(0);
+        TableSchema.Builder builder = TableSchema.builder();
+        List<String> primaryKeys = new ArrayList<>();
+        for (TableField f : tableSchema.getFields()) {
+            if (f.getPrimaryKey()) {
+                primaryKeys.add(f.getName());
+            }
+            builder.column(
+                    PhysicalColumn.of(
+                            f.getName(),
+                            stringToDataType(f.getOutputDataType()),
+                            0,
+                            f.getNullable(),
+                            f.getDefaultValue(),
+                            f.getComment()));
+        }
+        builder.primaryKey(PrimaryKey.of("PrimaryKeys", primaryKeys));
+
+        CatalogTable table =
+                CatalogTable.of(
+                        TableIdentifier.of(
+                                "default", tableSchema.getDatabase(), 
tableSchema.getTableName()),
+                        builder.build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        tableSchema.getTableName());
+        Map<String, Object> config = new HashMap<>();
+        config.put(SQLTransform.KEY_QUERY.key(), sql.getQuery());
+        TableFactoryContext context =
+                new TableFactoryContext(
+                        Collections.singletonList(table),
+                        ReadonlyConfig.fromMap(config),
+                        Thread.currentThread().getContextClassLoader());
+        TableTransform<SeaTunnelRow> transform = 
factory.createTransform(context);
+        SQLTransform sqlTransform = (SQLTransform) transform.createTransform();
+        CatalogTable result = sqlTransform.getProducedCatalogTable();
+        List<String> primaryKeysList = new ArrayList<>();
+        if (result.getTableSchema().getPrimaryKey() != null) {
+            
primaryKeysList.addAll(result.getTableSchema().getPrimaryKey().getColumnNames());
+        }
+        List<TableField> fields = new ArrayList<>();
+        for (Column column : result.getTableSchema().getColumns()) {
+            TableField field = new TableField();
+            field.setName(column.getName());
+            field.setComment(column.getComment());
+            field.setDefaultValue(
+                    column.getDefaultValue() != null ? 
column.getDefaultValue().toString() : null);
+            field.setNullable(column.isNullable());
+            field.setOutputDataType(column.getDataType().toString());
+            field.setPrimaryKey(primaryKeysList.contains(column.getName()));
+            field.setType(column.getDataType().toString());
+            fields.add(field);
+        }
+
+        TableSchemaReq tableSchemaRes = new TableSchemaReq();
+        tableSchemaRes.setFields(fields);
+        tableSchemaRes.setTableName(tableSchema.getTableName());
+        return tableSchemaRes;
+    }
+
+    private SeaTunnelDataType<?> stringToDataType(String dataTypeStr) {
+        dataTypeStr = dataTypeStr.toUpperCase();
+        switch (dataTypeStr) {
+            case "STRING":
+                return BasicType.STRING_TYPE;
+            case "BOOLEAN":
+                return BasicType.BOOLEAN_TYPE;
+            case "TINYINT":
+                return BasicType.BYTE_TYPE;
+            case "SMALLINT":
+                return BasicType.SHORT_TYPE;
+            case "INT":
+                return BasicType.INT_TYPE;
+            case "BIGINT":
+                return BasicType.LONG_TYPE;
+            case "FLOAT":
+                return BasicType.FLOAT_TYPE;
+            case "DOUBLE":
+                return BasicType.DOUBLE_TYPE;
+            case "NULL":
+                return BasicType.VOID_TYPE;
+            case "BYTES":
+                return ArrayType.BYTE_ARRAY_TYPE;
+            case "DATE":
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case "TIME":
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case "TIMESTAMP":
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case "DECIMAL":
+                return new DecimalType(38, 18);
+            case "ARRAY":
+            case "MAP":
+            case "ROW":
+            case "MULTIPLE_ROW":
+                return BasicType.STRING_TYPE;
+            default:
+                break;
+        }
+
+        Matcher matcher = decimalPattern.matcher(dataTypeStr);
+        if (matcher.matches()) {
+            int precision = Integer.parseInt(matcher.group(1));
+            int scale = Integer.parseInt(matcher.group(2));
+            return new DecimalType(precision, scale);
+        }
+        return BasicType.STRING_TYPE;
+    }
+}

Reply via email to