This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9a32db6fc [feature][jdbc][TiDB] add TiDB catalog (#4438)
9a32db6fc is described below

commit 9a32db6fc00221ca0a77fb044cc595a90dc37b7a
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Mar 30 20:32:39 2023 +0800

    [feature][jdbc][TiDB] add TiDB catalog (#4438)
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          |  1 -
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java |  2 +-
 .../seatunnel/jdbc/catalog/tidb/TiDBCatalog.java   | 37 ++++++++++
 .../jdbc/catalog/tidb/TiDBCatalogFactory.java      | 62 +++++++++++++++++
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   | 81 ++++++++++++++++++++--
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       | 23 +++++-
 6 files changed, 198 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 07098e810..bd516e325 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -68,7 +68,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
             String catalogName, String username, String pwd, 
JdbcUrlUtil.UrlInfo urlInfo) {
 
         checkArgument(StringUtils.isNotBlank(username));
-        checkArgument(StringUtils.isNotBlank(pwd));
         urlInfo.getDefaultDatabase()
                 .orElseThrow(
                         () -> new IllegalArgumentException("Can't find default 
database in url"));
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 07f6d5198..d8534d5df 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -53,7 +53,7 @@ import java.util.Set;
 
 public class MySqlCatalog extends AbstractJdbcCatalog {
 
-    private static final Set<String> SYS_DATABASES = new HashSet<>(4);
+    protected static final Set<String> SYS_DATABASES = new HashSet<>(4);
 
     static {
         SYS_DATABASES.add("information_schema");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java
new file mode 100644
index 000000000..82a0edbce
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
+
+public class TiDBCatalog extends MySqlCatalog {
+
+    static {
+        SYS_DATABASES.clear();
+        SYS_DATABASES.add("information_schema");
+        SYS_DATABASES.add("mysql");
+        SYS_DATABASES.add("performance_schema");
+        SYS_DATABASES.add("metrics_schema");
+    }
+
+    public TiDBCatalog(
+            String catalogName, String username, String pwd, 
JdbcUrlUtil.UrlInfo urlInfo) {
+        super(catalogName, username, pwd, urlInfo);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java
new file mode 100644
index 000000000..d8eff84bd
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Optional;
+
+@AutoService(Factory.class)
+public class TiDBCatalogFactory implements CatalogFactory {
+
+    public static final String IDENTIFIER = "TiDB";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
+        JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
+        Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
+        if (!defaultDatabase.isPresent()) {
+            throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
+        }
+        return new TiDBCatalog(
+                catalogName,
+                options.get(JdbcCatalogOptions.USERNAME),
+                options.get(JdbcCatalogOptions.PASSWORD),
+                urlInfo);
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return JdbcCatalogOptions.BASE_RULE.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index af92ce4f8..2cce860d2 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -19,17 +19,25 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportDataSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
@@ -37,18 +45,22 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommit
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
 
+import org.apache.commons.lang3.StringUtils;
+
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 @AutoService(SeaTunnelSink.class)
 public class JdbcSink
-        implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, 
JdbcAggregatedCommitInfo> {
-
-    private Config pluginConfig;
+        implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, 
JdbcAggregatedCommitInfo>,
+                SupportDataSaveMode {
 
     private SeaTunnelRowType seaTunnelRowType;
 
@@ -58,6 +70,28 @@ public class JdbcSink
 
     private JdbcDialect dialect;
 
+    private ReadonlyConfig config;
+
+    private DataSaveMode dataSaveMode;
+
+    private CatalogTable catalogTable;
+
+    public JdbcSink(
+            ReadonlyConfig config,
+            JdbcSinkConfig jdbcSinkConfig,
+            JdbcDialect dialect,
+            DataSaveMode dataSaveMode,
+            CatalogTable catalogTable) {
+        this.config = config;
+        this.jdbcSinkConfig = jdbcSinkConfig;
+        this.dialect = dialect;
+        this.dataSaveMode = dataSaveMode;
+        this.catalogTable = catalogTable;
+        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
+    }
+
+    public JdbcSink() {}
+
     @Override
     public String getPluginName() {
         return "Jdbc";
@@ -65,10 +99,10 @@ public class JdbcSink
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
+        this.config = ReadonlyConfig.fromConfig(pluginConfig);
         this.jdbcSinkConfig = JdbcSinkConfig.of(config);
-        this.pluginConfig = pluginConfig;
         this.dialect = 
JdbcDialectLoader.load(jdbcSinkConfig.getJdbcConnectionConfig().getUrl());
+        this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
     }
 
     @Override
@@ -140,4 +174,41 @@ public class JdbcSink
         }
         return Optional.empty();
     }
+
+    @Override
+    public DataSaveMode getDataSaveMode() {
+        return dataSaveMode;
+    }
+
+    @Override
+    public List<DataSaveMode> supportedDataSaveModeValues() {
+        return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
+    }
+
+    @Override
+    public void handleSaveMode(DataSaveMode saveMode) {
+        if (catalogTable != null) {
+            Map<String, String> catalogOptions = 
config.get(CatalogOptions.CATALOG_OPTIONS);
+            if (catalogOptions != null
+                    && TiDBCatalogFactory.IDENTIFIER.equalsIgnoreCase(
+                            
catalogOptions.get(CommonOptions.FACTORY_ID.key()))) {
+                if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
+                    return;
+                }
+                Catalog catalog =
+                        new TiDBCatalogFactory()
+                                .createCatalog(
+                                        TiDBCatalogFactory.IDENTIFIER,
+                                        ReadonlyConfig.fromMap(new 
HashMap<>(catalogOptions)));
+                TablePath tablePath =
+                        TablePath.of(jdbcSinkConfig.getDatabase(), 
jdbcSinkConfig.getTable());
+                if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) {
+                    catalog.createDatabase(tablePath, true);
+                }
+                if (!catalog.tableExists(tablePath)) {
+                    catalog.createTable(tablePath, catalogTable, true);
+                }
+            }
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 0f1025e0b..cfa610665 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -17,9 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
 
 import com.google.auto.service.AutoService;
 
@@ -50,6 +57,20 @@ public class JdbcSinkFactory implements TableSinkFactory {
         return "Jdbc";
     }
 
+    @Override
+    public TableSink createSink(TableFactoryContext context) {
+        ReadonlyConfig config = context.getOptions();
+        JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
+        JdbcDialect dialect = 
JdbcDialectLoader.load(sinkConfig.getJdbcConnectionConfig().getUrl());
+        return () ->
+                new JdbcSink(
+                        config,
+                        sinkConfig,
+                        dialect,
+                        DataSaveMode.KEEP_SCHEMA_AND_DATA,
+                        context.getCatalogTable());
+    }
+
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
@@ -64,13 +85,13 @@ public class JdbcSinkFactory implements TableSinkFactory {
                         GENERATE_SINK_SQL,
                         AUTO_COMMIT,
                         SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST)
+                .optional(MAX_RETRIES)
                 .conditional(
                         IS_EXACTLY_ONCE,
                         true,
                         XA_DATA_SOURCE_CLASS_NAME,
                         MAX_COMMIT_ATTEMPTS,
                         TRANSACTION_TIMEOUT_SEC)
-                .conditional(IS_EXACTLY_ONCE, false, MAX_RETRIES)
                 .conditional(GENERATE_SINK_SQL, true, DATABASE, TABLE)
                 .conditional(GENERATE_SINK_SQL, false, QUERY)
                 .conditional(SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST, true, 
PRIMARY_KEYS)

Reply via email to