This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9a32db6fc [feature][jdbc][TiDB] add TiDB catalog (#4438)
9a32db6fc is described below
commit 9a32db6fc00221ca0a77fb044cc595a90dc37b7a
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Mar 30 20:32:39 2023 +0800
[feature][jdbc][TiDB] add TiDB catalog (#4438)
---
.../jdbc/catalog/AbstractJdbcCatalog.java | 1 -
.../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 2 +-
.../seatunnel/jdbc/catalog/tidb/TiDBCatalog.java | 37 ++++++++++
.../jdbc/catalog/tidb/TiDBCatalogFactory.java | 62 +++++++++++++++++
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 81 ++++++++++++++++++++--
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 23 +++++-
6 files changed, 198 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 07098e810..bd516e325 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -68,7 +68,6 @@ public abstract class AbstractJdbcCatalog implements Catalog {
String catalogName, String username, String pwd,
JdbcUrlUtil.UrlInfo urlInfo) {
checkArgument(StringUtils.isNotBlank(username));
- checkArgument(StringUtils.isNotBlank(pwd));
urlInfo.getDefaultDatabase()
.orElseThrow(
() -> new IllegalArgumentException("Can't find default
database in url"));
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 07f6d5198..d8534d5df 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -53,7 +53,7 @@ import java.util.Set;
public class MySqlCatalog extends AbstractJdbcCatalog {
- private static final Set<String> SYS_DATABASES = new HashSet<>(4);
+ protected static final Set<String> SYS_DATABASES = new HashSet<>(4);
static {
SYS_DATABASES.add("information_schema");
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java
new file mode 100644
index 000000000..82a0edbce
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalog.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
+
+public class TiDBCatalog extends MySqlCatalog {
+
+ static {
+ SYS_DATABASES.clear();
+ SYS_DATABASES.add("information_schema");
+ SYS_DATABASES.add("mysql");
+ SYS_DATABASES.add("performance_schema");
+ SYS_DATABASES.add("metrics_schema");
+ }
+
+ public TiDBCatalog(
+ String catalogName, String username, String pwd,
JdbcUrlUtil.UrlInfo urlInfo) {
+ super(catalogName, username, pwd, urlInfo);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java
new file mode 100644
index 000000000..d8eff84bd
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/tidb/TiDBCatalogFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Optional;
+
+@AutoService(Factory.class)
+public class TiDBCatalogFactory implements CatalogFactory {
+
+ public static final String IDENTIFIER = "TiDB";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
+ JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
+ Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
+ if (!defaultDatabase.isPresent()) {
+ throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
+ }
+ return new TiDBCatalog(
+ catalogName,
+ options.get(JdbcCatalogOptions.USERNAME),
+ options.get(JdbcCatalogOptions.PASSWORD),
+ urlInfo);
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return JdbcCatalogOptions.BASE_RULE.build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index af92ce4f8..2cce860d2 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -19,17 +19,25 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportDataSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
@@ -37,18 +45,22 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommit
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+import org.apache.commons.lang3.StringUtils;
+
import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
@AutoService(SeaTunnelSink.class)
public class JdbcSink
- implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo,
JdbcAggregatedCommitInfo> {
-
- private Config pluginConfig;
+ implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo,
JdbcAggregatedCommitInfo>,
+ SupportDataSaveMode {
private SeaTunnelRowType seaTunnelRowType;
@@ -58,6 +70,28 @@ public class JdbcSink
private JdbcDialect dialect;
+ private ReadonlyConfig config;
+
+ private DataSaveMode dataSaveMode;
+
+ private CatalogTable catalogTable;
+
+ public JdbcSink(
+ ReadonlyConfig config,
+ JdbcSinkConfig jdbcSinkConfig,
+ JdbcDialect dialect,
+ DataSaveMode dataSaveMode,
+ CatalogTable catalogTable) {
+ this.config = config;
+ this.jdbcSinkConfig = jdbcSinkConfig;
+ this.dialect = dialect;
+ this.dataSaveMode = dataSaveMode;
+ this.catalogTable = catalogTable;
+ this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
+ }
+
+ public JdbcSink() {}
+
@Override
public String getPluginName() {
return "Jdbc";
@@ -65,10 +99,10 @@ public class JdbcSink
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
+ this.config = ReadonlyConfig.fromConfig(pluginConfig);
this.jdbcSinkConfig = JdbcSinkConfig.of(config);
- this.pluginConfig = pluginConfig;
this.dialect =
JdbcDialectLoader.load(jdbcSinkConfig.getJdbcConnectionConfig().getUrl());
+ this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
}
@Override
@@ -140,4 +174,41 @@ public class JdbcSink
}
return Optional.empty();
}
+
+ @Override
+ public DataSaveMode getDataSaveMode() {
+ return dataSaveMode;
+ }
+
+ @Override
+ public List<DataSaveMode> supportedDataSaveModeValues() {
+ return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
+ }
+
+ @Override
+ public void handleSaveMode(DataSaveMode saveMode) {
+ if (catalogTable != null) {
+ Map<String, String> catalogOptions =
config.get(CatalogOptions.CATALOG_OPTIONS);
+ if (catalogOptions != null
+ && TiDBCatalogFactory.IDENTIFIER.equalsIgnoreCase(
+
catalogOptions.get(CommonOptions.FACTORY_ID.key()))) {
+ if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
+ return;
+ }
+ Catalog catalog =
+ new TiDBCatalogFactory()
+ .createCatalog(
+ TiDBCatalogFactory.IDENTIFIER,
+ ReadonlyConfig.fromMap(new
HashMap<>(catalogOptions)));
+ TablePath tablePath =
+ TablePath.of(jdbcSinkConfig.getDatabase(),
jdbcSinkConfig.getTable());
+ if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) {
+ catalog.createDatabase(tablePath, true);
+ }
+ if (!catalog.tableExists(tablePath)) {
+ catalog.createTable(tablePath, catalogTable, true);
+ }
+ }
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 0f1025e0b..cfa610665 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -17,9 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
import com.google.auto.service.AutoService;
@@ -50,6 +57,20 @@ public class JdbcSinkFactory implements TableSinkFactory {
return "Jdbc";
}
+ @Override
+ public TableSink createSink(TableFactoryContext context) {
+ ReadonlyConfig config = context.getOptions();
+ JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
+ JdbcDialect dialect =
JdbcDialectLoader.load(sinkConfig.getJdbcConnectionConfig().getUrl());
+ return () ->
+ new JdbcSink(
+ config,
+ sinkConfig,
+ dialect,
+ DataSaveMode.KEEP_SCHEMA_AND_DATA,
+ context.getCatalogTable());
+ }
+
@Override
public OptionRule optionRule() {
return OptionRule.builder()
@@ -64,13 +85,13 @@ public class JdbcSinkFactory implements TableSinkFactory {
GENERATE_SINK_SQL,
AUTO_COMMIT,
SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST)
+ .optional(MAX_RETRIES)
.conditional(
IS_EXACTLY_ONCE,
true,
XA_DATA_SOURCE_CLASS_NAME,
MAX_COMMIT_ATTEMPTS,
TRANSACTION_TIMEOUT_SEC)
- .conditional(IS_EXACTLY_ONCE, false, MAX_RETRIES)
.conditional(GENERATE_SINK_SQL, true, DATABASE, TABLE)
.conditional(GENERATE_SINK_SQL, false, QUERY)
.conditional(SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST, true,
PRIMARY_KEYS)