This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new d8a0be84c [Feature] [Catalog] Support create/drop table, create/drop
database in catalog (#4075)
d8a0be84c is described below
commit d8a0be84ca4cfc38ccedad33bc77a05d1803b4a4
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Feb 10 14:25:17 2023 +0800
[Feature] [Catalog] Support create/drop table, create/drop database in
catalog (#4075)
* Add create/drop table in catalog
---
.../api/common/SeaTunnelAPIErrorCode.java | 5 +-
.../seatunnel/api/table/catalog/Catalog.java | 33 ++++
.../apache/seatunnel/api/table/catalog/Column.java | 195 +++----------------
.../seatunnel/api/table/catalog/ConstraintKey.java | 76 ++++++++
.../api/table/catalog/MetadataColumn.java | 65 +++++++
.../api/table/catalog/PhysicalColumn.java | 60 ++++++
.../PrimaryKey.java} | 29 +--
.../seatunnel/api/table/catalog/TableSchema.java | 79 ++------
.../exception/DatabaseAlreadyExistException.java | 34 ++++
.../exception/TableAlreadyExistException.java} | 26 ++-
.../api/table/type/SeaTunnelDataType.java | 1 +
.../jdbc/catalog/AbstractJdbcCatalog.java | 126 +++++++++++--
.../seatunnel/jdbc/catalog/MySqlCatalog.java | 168 ++++++++---------
.../catalog/sql/MysqlCreateTableSqlBuilder.java | 209 +++++++++++++++++++++
.../seatunnel/jdbc/utils/DataTypeUtils.java | 138 ++++++++++++++
.../sql/MysqlCreateTableSqlBuilderTest.java | 79 ++++++++
16 files changed, 965 insertions(+), 358 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
index e89875ef9..6adf373ed 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
@@ -25,7 +25,10 @@ public enum SeaTunnelAPIErrorCode implements
SeaTunnelErrorCode {
CATALOG_INITIALIZE_FAILED("API-03", "Catalog initialize failed"),
DATABASE_NOT_EXISTED("API-04", "Database not existed"),
TABLE_NOT_EXISTED("API-05", "Table not existed"),
- FACTORY_INITIALIZE_FAILED("API-06", "Factory initialize failed"),;
+ FACTORY_INITIALIZE_FAILED("API-06", "Factory initialize failed"),
+ DATABASE_ALREADY_EXISTED("API-07", "Database already existed"),
+ TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
+ ;
private final String code;
private final String description;
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index e3f83d9ff..90a443d03 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -18,7 +18,9 @@
package org.apache.seatunnel.api.table.catalog;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -113,4 +115,35 @@ public interface Catalog {
* @throws CatalogException in case of any runtime exception
*/
CatalogTable getTable(TablePath tablePath) throws CatalogException,
TableNotExistException;
+
+ /**
+ * Create a new table in this catalog.
+ *
+ * @param tablePath Path of the table
+ * @param table The table definition
+ * @param ignoreIfExists Flag to specify behavior when a table with the
given name already exist
+ * @throws TableAlreadyExistException thrown if the table already exists
in the catalog and ignoreIfExists is false
+ * @throws DatabaseNotExistException thrown if the database in tablePath
doesn't exist in the catalog
+ * @throws CatalogException in case of any runtime exception
+ */
+ void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException;
+
+ /**
+ * Drop an existing table in this catalog.
+ *
+ * @param tablePath Path of the table
+ * @param ignoreIfNotExists Flag to specify behavior when a table with the
given name doesn't exist
+ * @throws TableNotExistException thrown if the table doesn't exist in the
catalog and ignoreIfNotExists is false
+ * @throws CatalogException in case of any runtime exception
+ */
+ void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException;
+
+ void createDatabase(TablePath tablePath, boolean ignoreIfExists) throws
DatabaseAlreadyExistException, CatalogException;
+
+ void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) throws
DatabaseNotExistException, CatalogException;
+
+ // todo: Support for update table metadata
+
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index aea7fade2..6e11e8a6e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -20,10 +20,17 @@ package org.apache.seatunnel.api.table.catalog;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import lombok.Data;
+
import java.io.Serializable;
-import java.util.Objects;
-import java.util.Optional;
+/**
+ * Represent the column of {@link TableSchema}.
+ *
+ * @see PhysicalColumn
+ * @see MetadataColumn
+ */
+@Data
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class Column implements Serializable {
@@ -37,38 +44,37 @@ public abstract class Column implements Serializable {
/**
* Data type of the column.
*/
+ // todo: use generic type
protected final SeaTunnelDataType<?> dataType;
- protected final String comment;
-
- private Column(String name, SeaTunnelDataType<?> dataType, String comment)
{
- this.name = name;
- this.dataType = dataType;
- this.comment = comment;
- }
+ protected final Integer columnLength;
/**
- * Creates a regular table column that represents physical data.
+ * Does the column can be null
*/
- public static PhysicalColumn physical(String name, SeaTunnelDataType<?>
dataType) {
- return new PhysicalColumn(name, dataType);
- }
+ protected final boolean nullable;
+ // todo: use generic type
/**
- * Creates a metadata column from metadata of the given column name or
from metadata of the
- * given key (if not null).
- *
- * <p>Allows to specify whether the column is virtual or not.
+ * The default value of the column.
*/
- public static MetadataColumn metadata(
- String name, SeaTunnelDataType<?> dataType, String metadataKey) {
- return new MetadataColumn(name, dataType, metadataKey);
- }
+ protected final Object defaultValue;
- /**
- * Add the comment to the column and return the new object.
- */
- public abstract Column withComment(String comment);
+ protected final String comment;
+
+ protected Column(String name,
+ SeaTunnelDataType<?> dataType,
+ Integer columnLength,
+ boolean nullable,
+ Object defaultValue,
+ String comment) {
+ this.name = name;
+ this.dataType = dataType;
+ this.columnLength = columnLength;
+ this.nullable = nullable;
+ this.defaultValue = defaultValue;
+ this.comment = comment;
+ }
/**
* Returns whether the given column is a physical column of a table;
neither computed nor
@@ -76,148 +82,9 @@ public abstract class Column implements Serializable {
*/
public abstract boolean isPhysical();
- /**
- * Returns the data type of this column.
- */
- public SeaTunnelDataType<?> getDataType() {
- return this.dataType;
- }
-
- /**
- * Returns the name of this column.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Returns the comment of this column.
- */
- public Optional<String> getComment() {
- return Optional.ofNullable(comment);
- }
-
/**
* Returns a copy of the column with a replaced {@link SeaTunnelDataType}.
*/
public abstract Column copy(SeaTunnelDataType<?> newType);
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Column that = (Column) o;
- return Objects.equals(this.name, that.name)
- && Objects.equals(this.dataType, that.dataType)
- && Objects.equals(this.comment, that.comment);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(this.name, this.dataType);
- }
-
- //
--------------------------------------------------------------------------------------------
- // Specific kinds of columns
- //
--------------------------------------------------------------------------------------------
-
- /**
- * Representation of a physical column.
- */
- public static final class PhysicalColumn extends Column {
-
- private PhysicalColumn(String name, SeaTunnelDataType<?> dataType) {
- this(name, dataType, null);
- }
-
- private PhysicalColumn(String name, SeaTunnelDataType<?> dataType,
String comment) {
- super(name, dataType, comment);
- }
-
- @Override
- public PhysicalColumn withComment(String comment) {
- if (comment == null) {
- return this;
- }
- return new PhysicalColumn(name, dataType, comment);
- }
-
- @Override
- public boolean isPhysical() {
- return true;
- }
-
- @Override
- public Column copy(SeaTunnelDataType<?> newDataType) {
- return new PhysicalColumn(name, newDataType, comment);
- }
- }
-
- /**
- * Representation of a metadata column.
- */
- public static final class MetadataColumn extends Column {
-
- private final String metadataKey;
-
- private MetadataColumn(
- String name, SeaTunnelDataType<?> dataType, String metadataKey) {
- this(name, dataType, metadataKey, null);
- }
-
- private MetadataColumn(
- String name,
- SeaTunnelDataType<?> dataType,
- String metadataKey,
- String comment) {
- super(name, dataType, comment);
- this.metadataKey = metadataKey;
- }
-
- public Optional<String> getMetadataKey() {
- return Optional.ofNullable(metadataKey);
- }
-
- @Override
- public MetadataColumn withComment(String comment) {
- if (comment == null) {
- return this;
- }
- return new MetadataColumn(name, dataType, metadataKey, comment);
- }
-
- @Override
- public boolean isPhysical() {
- return false;
- }
-
- @Override
- public Column copy(SeaTunnelDataType<?> newDataType) {
- return new MetadataColumn(name, newDataType, metadataKey, comment);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- MetadataColumn that = (MetadataColumn) o;
- return Objects.equals(metadataKey, that.metadataKey);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), metadataKey);
- }
- }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
new file mode 100644
index 000000000..a294c4667
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class ConstraintKey implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final ConstraintType constraintType;
+
+ private final String constraintName;
+
+ private final List<ConstraintKeyColumn> columnNames;
+
+ private ConstraintKey(ConstraintType constraintType,
+ String constraintName,
+ List<ConstraintKeyColumn> columnNames) {
+ checkNotNull(constraintType, "constraintType must not be null");
+
+ this.constraintType = constraintType;
+ this.constraintName = constraintName;
+ this.columnNames = columnNames;
+ }
+
+ public static ConstraintKey of(ConstraintType constraintType,
+ String constraintName,
+ List<ConstraintKeyColumn> columnNames) {
+ return new ConstraintKey(constraintType, constraintName, columnNames);
+ }
+
+ @Data
+ @AllArgsConstructor
+ public static class ConstraintKeyColumn {
+ private final String columnName;
+ private final ColumnSortType sortType;
+
+ public static ConstraintKeyColumn of(String columnName, ColumnSortType
sortType) {
+ return new ConstraintKeyColumn(columnName, sortType);
+ }
+ }
+
+ public enum ConstraintType {
+ KEY,
+ UNIQUE_KEY,
+ FOREIGN_KEY
+ }
+
+ public enum ColumnSortType {
+ ASC,
+ DESC
+ }
+
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
new file mode 100644
index 000000000..a88641861
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Representation of a metadata column.
+ */
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class MetadataColumn extends Column {
+
+ private final String metadataKey;
+
+ protected MetadataColumn(String name,
+ SeaTunnelDataType<?> dataType,
+ Integer columnLength,
+ String metadataKey,
+ boolean nullable,
+ Object defaultValue,
+ String comment) {
+ super(name, dataType, columnLength, nullable, defaultValue, comment);
+ this.metadataKey = metadataKey;
+ }
+
+ public static MetadataColumn of(String name,
+ SeaTunnelDataType<?> dataType,
+ Integer columnLength,
+ String metadataKey,
+ boolean nullable,
+ Object defaultValue,
+ String comment) {
+ return new MetadataColumn(name, dataType, columnLength, metadataKey,
nullable, defaultValue, comment);
+ }
+
+ @Override
+ public boolean isPhysical() {
+ return false;
+ }
+
+ @Override
+ public Column copy(SeaTunnelDataType<?> newType) {
+ return MetadataColumn.of(name, newType, columnLength, metadataKey,
nullable, defaultValue, comment);
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
new file mode 100644
index 000000000..bc22772ac
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Representation of a physical column.
+ */
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class PhysicalColumn extends Column {
+
+ protected PhysicalColumn(String name,
+ SeaTunnelDataType<?> dataType,
+ Integer columnLength,
+ boolean nullable,
+ Object defaultValue,
+ String comment) {
+ super(name, dataType, columnLength, nullable, defaultValue, comment);
+ }
+
+ public static PhysicalColumn of(String name,
+ SeaTunnelDataType<?> dataType,
+ Integer columnLength,
+ boolean nullable,
+ Object defaultValue,
+ String comment) {
+ return new PhysicalColumn(name, dataType, columnLength, nullable,
defaultValue, comment);
+ }
+
+ @Override
+ public boolean isPhysical() {
+ return true;
+ }
+
+ @Override
+ public Column copy(SeaTunnelDataType<?> newType) {
+ return PhysicalColumn.of(name, newType, columnLength, nullable,
defaultValue, comment);
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
similarity index 62%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
index 946f4a728..d5e32c828 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
@@ -15,22 +15,25 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.api.table.catalog;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
import java.io.Serializable;
+import java.util.List;
-/**
- * Logic data type of column in SeaTunnel.
- */
-public interface SeaTunnelDataType<T> extends Serializable {
+@Data
+@AllArgsConstructor
+public class PrimaryKey implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ // This field is not used now
+ private final String primaryKey;
- /**
- * Gets the class of the type represented by this data type.
- */
- Class<T> getTypeClass();
+ private final List<String> columnNames;
- /**
- * Gets the SQL standard type represented by this data type.
- */
- SqlType getSqlType();
+ public static PrimaryKey of(String primaryKey, List<String> columnNames) {
+ return new PrimaryKey(primaryKey, columnNames);
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 0d076943e..74adafc6c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -20,36 +20,30 @@ package org.apache.seatunnel.api.table.catalog;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
/**
* Represent a physical table schema.
*/
+@Data
+@AllArgsConstructor
public final class TableSchema implements Serializable {
private static final long serialVersionUID = 1L;
private final List<Column> columns;
private final PrimaryKey primaryKey;
- private TableSchema(List<Column> columns, PrimaryKey primaryKey) {
- this.columns = columns;
- this.primaryKey = primaryKey;
- }
+ private final List<ConstraintKey> constraintKeys;
- public static TableSchema.Builder builder() {
+ public static Builder builder() {
return new Builder();
}
- /**
- * Returns all {@link Column}s of this schema.
- */
- public List<Column> getColumns() {
- return columns;
- }
-
public SeaTunnelRowType toPhysicalRowDataType() {
SeaTunnelDataType<?>[] fieldTypes = columns.stream()
.filter(Column::isPhysical)
@@ -67,6 +61,8 @@ public final class TableSchema implements Serializable {
private PrimaryKey primaryKey;
+ private final List<ConstraintKey> constraintKeys = new ArrayList<>();
+
public Builder columns(List<Column> columns) {
this.columns.addAll(columns);
return this;
@@ -77,69 +73,18 @@ public final class TableSchema implements Serializable {
return this;
}
- public Builder physicalColumn(String name, SeaTunnelDataType<?>
dataType) {
- this.columns.add(Column.physical(name, dataType));
- return this;
- }
-
public Builder primaryKey(PrimaryKey primaryKey) {
this.primaryKey = primaryKey;
return this;
}
- public Builder primaryKey(String constraintName, List<String>
columnNames) {
- this.primaryKey = PrimaryKey.of(constraintName, columnNames);
+ public Builder constraintKey(ConstraintKey constraintKey) {
+ this.constraintKeys.add(constraintKey);
return this;
}
public TableSchema build() {
- return new TableSchema(columns, primaryKey);
- }
- }
-
- public static final class PrimaryKey implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final String constraintName;
- private final List<String> columnNames;
-
- private PrimaryKey(String constraintName, List<String> columnNames) {
- this.constraintName = constraintName;
- this.columnNames = columnNames;
- }
-
- public static PrimaryKey of(String constraintName, List<String>
columnNames) {
- return new PrimaryKey(constraintName, columnNames);
- }
-
- public String getConstraintName() {
- return constraintName;
- }
-
- public List<String> getColumnNames() {
- return columnNames;
- }
-
- @Override
- public String toString() {
- return String.format("CONSTRAINT %s PRIMARY KEY (%s) NOT
ENFORCED", constraintName, String.join(", ", columnNames));
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PrimaryKey that = (PrimaryKey) o;
- return constraintName.equals(that.constraintName) &&
columnNames.equals(that.columnNames);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(constraintName, columnNames);
+ return new TableSchema(columns, primaryKey, constraintKeys);
}
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseAlreadyExistException.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseAlreadyExistException.java
new file mode 100644
index 000000000..40e4acb98
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseAlreadyExistException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog.exception;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class DatabaseAlreadyExistException extends SeaTunnelRuntimeException {
+ private static final String MSG = "Database %s already exist in Catalog
%s.";
+
+ public DatabaseAlreadyExistException(String catalogName, String
databaseName) {
+ this(catalogName, databaseName, null);
+ }
+
+ public DatabaseAlreadyExistException(String catalogName, String
databaseName, Throwable cause) {
+ super(SeaTunnelAPIErrorCode.DATABASE_ALREADY_EXISTED,
String.format(MSG, databaseName, catalogName), cause);
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableAlreadyExistException.java
similarity index 51%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableAlreadyExistException.java
index 946f4a728..8fe7c368f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableAlreadyExistException.java
@@ -15,22 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.api.table.catalog.exception;
-import java.io.Serializable;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-/**
- * Logic data type of column in SeaTunnel.
- */
-public interface SeaTunnelDataType<T> extends Serializable {
+public class TableAlreadyExistException extends SeaTunnelRuntimeException {
+ private static final String MSG = "Table %s already exist in Catalog %s.";
- /**
- * Gets the class of the type represented by this data type.
- */
- Class<T> getTypeClass();
+ public TableAlreadyExistException(String catalogName, TablePath tablePath)
{
+ this(catalogName, tablePath, null);
+ }
- /**
- * Gets the SQL standard type represented by this data type.
- */
- SqlType getSqlType();
+ public TableAlreadyExistException(String catalogName, TablePath tablePath,
Throwable cause) {
+ super(SeaTunnelAPIErrorCode.TABLE_ALREADY_EXISTED, String.format(MSG,
tablePath.getFullName(), catalogName), cause);
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
index 946f4a728..3f6c2c4b8 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
@@ -33,4 +33,5 @@ public interface SeaTunnelDataType<T> extends Serializable {
* Gets the SQL standard type represented by this data type.
*/
SqlType getSqlType();
+
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 852fbbe0a..79b5397a4 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -19,14 +19,22 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,11 +43,13 @@ import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
public abstract class AbstractJdbcCatalog implements Catalog {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
@@ -163,15 +173,15 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
LOG.info("Catalog {} closing", catalogName);
}
- protected Optional<TableSchema.PrimaryKey> getPrimaryKey(
- DatabaseMetaData metaData, String schema, String table) throws
SQLException {
+ protected Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData,
String database, String table) throws SQLException {
// According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME, not by
KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
- ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+ ResultSet rs = metaData.getPrimaryKeys(database, table, table);
- Map<Integer, String> keySeqColumnName = new HashMap<>();
+ // seq -> column name
+ List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
String pkName = null;
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
@@ -179,15 +189,52 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
pkName = rs.getString("PK_NAME");
int keySeq = rs.getInt("KEY_SEQ");
// KEY_SEQ is 1-based index
- keySeqColumnName.put(keySeq - 1, columnName);
+ primaryKeyColumns.add(Pair.of(keySeq, columnName));
}
// initialize size
- List<String> pkFields = Arrays.asList(new
String[keySeqColumnName.size()]);
- keySeqColumnName.forEach(pkFields::set);
- if (!pkFields.isEmpty()) {
- // PK_NAME maybe null according to the javadoc, generate an unique
name in that case
- pkName = pkName == null ? "pk_" + String.join("_", pkFields) :
pkName;
- return Optional.of(TableSchema.PrimaryKey.of(pkName, pkFields));
+ List<String> pkFields = primaryKeyColumns
+ .stream()
+ .sorted(Comparator.comparingInt(Pair::getKey))
+ .map(Pair::getValue)
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(pkFields)) {
+ return Optional.empty();
+ }
+ return Optional.of(PrimaryKey.of(pkName, pkFields));
+ }
+
+ protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData,
String database, String table) throws SQLException {
+ ResultSet resultSet = metaData.getIndexInfo(database, table, table,
false, false);
+ // index name -> index
+ Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
+ while (resultSet.next()) {
+ String indexName = resultSet.getString("INDEX_NAME");
+ String columnName = resultSet.getString("COLUMN_NAME");
+ String unique = resultSet.getString("NON_UNIQUE");
+
+ ConstraintKey constraintKey =
constraintKeyMap.computeIfAbsent(indexName, s -> {
+ ConstraintKey.ConstraintType constraintType =
ConstraintKey.ConstraintType.KEY;
+ // 0 is unique.
+ if ("0".equals(unique)) {
+ constraintType = ConstraintKey.ConstraintType.UNIQUE_KEY;
+ }
+ return ConstraintKey.of(constraintType, indexName, new
ArrayList<>());
+ });
+
+ ConstraintKey.ColumnSortType sortType =
"A".equals(resultSet.getString("ASC_OR_DESC")) ?
+ ConstraintKey.ColumnSortType.ASC :
ConstraintKey.ColumnSortType.DESC;
+ ConstraintKey.ConstraintKeyColumn constraintKeyColumn = new
ConstraintKey.ConstraintKeyColumn(columnName, sortType);
+ constraintKey.getColumnNames().add(constraintKeyColumn);
+ }
+ return new ArrayList<>(constraintKeyMap.values());
+ }
+
+ protected Optional<String> getColumnDefaultValue(DatabaseMetaData
metaData, String table, String column) throws SQLException {
+ try (ResultSet resultSet = metaData.getColumns(null, null, table,
"MyColumn")) {
+ while (resultSet.next()) {
+ String defaultValue = resultSet.getString("COLUMN_DEF");
+ return Optional.ofNullable(defaultValue);
+ }
}
return Optional.empty();
}
@@ -208,4 +255,57 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
return false;
}
}
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(catalogName,
tablePath.getDatabaseName());
+ }
+ if (!createTableInternal(tablePath, table) && !ignoreIfExists) {
+ throw new TableAlreadyExistException(catalogName, tablePath);
+ }
+ }
+
+ protected abstract boolean createTableInternal(TablePath tablePath,
CatalogTable table) throws CatalogException;
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ if (!dropTableInternal(tablePath) && !ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ }
+
+ protected abstract boolean dropTableInternal(TablePath tablePath) throws
CatalogException;
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(tablePath.getDatabaseName(), "Database name cannot be
null");
+
+ if (databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseAlreadyExistException(catalogName,
tablePath.getDatabaseName());
+ }
+ if (!createDatabaseInternal(tablePath.getDatabaseName()) &&
!ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(catalogName,
tablePath.getDatabaseName());
+ }
+ }
+
+ protected abstract boolean createDatabaseInternal(String databaseName);
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(tablePath.getDatabaseName(), "Database name cannot be
null");
+
+ if (!dropDatabaseInternal(tablePath.getDatabaseName()) &&
!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(catalogName,
tablePath.getDatabaseName());
+ }
+ }
+
+ protected abstract boolean dropDatabaseInternal(String databaseName)
throws CatalogException;
+
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
index 99f928afd..8683d571f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -19,19 +19,18 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql.MysqlCreateTableSqlBuilder;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DataTypeUtils;
import com.mysql.cj.MysqlType;
import com.mysql.cj.jdbc.result.ResultSetImpl;
@@ -73,9 +72,8 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
@Override
public List<String> listDatabases() throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
-
- PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd);
+ PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;")) {
List<String> databases = new ArrayList<>();
ResultSet rs = ps.executeQuery();
@@ -100,9 +98,8 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
throw new DatabaseNotExistException(this.catalogName,
databaseName);
}
- try (Connection conn = DriverManager.getConnection(baseUrl +
databaseName, username, pwd)) {
- PreparedStatement ps =
- conn.prepareStatement("SHOW TABLES;");
+ try (Connection conn = DriverManager.getConnection(baseUrl +
databaseName, username, pwd);
+ PreparedStatement ps = conn.prepareStatement("SHOW TABLES;")) {
ResultSet rs = ps.executeQuery();
@@ -128,26 +125,82 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
String dbUrl = baseUrl + tablePath.getDatabaseName();
try (Connection conn = DriverManager.getConnection(dbUrl, username,
pwd)) {
DatabaseMetaData metaData = conn.getMetaData();
- Optional<TableSchema.PrimaryKey> primaryKey =
- getPrimaryKey(metaData, tablePath.getDatabaseName(),
tablePath.getTableName());
+ Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData,
tablePath.getDatabaseName(), tablePath.getTableName());
+ List<ConstraintKey> constraintKeys = getConstraintKeys(metaData,
tablePath.getDatabaseName(), tablePath.getTableName());
+
+ try (PreparedStatement ps =
conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1 = 0;",
tablePath.getFullName()))) {
+ ResultSetMetaData tableMetaData = ps.getMetaData();
+ TableSchema.Builder builder = TableSchema.builder();
+ // add column
+ for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+ String columnName = tableMetaData.getColumnName(i);
+ SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+ int columnDisplaySize =
tableMetaData.getColumnDisplaySize(i);
+ String comment = tableMetaData.getColumnLabel(i);
+ boolean isNullable = tableMetaData.isNullable(i) ==
ResultSetMetaData.columnNullable;
+ Object defaultValue = getColumnDefaultValue(metaData,
tablePath.getTableName(), columnName).orElse(null);
+
+ PhysicalColumn physicalColumn =
PhysicalColumn.of(columnName, type, columnDisplaySize, isNullable,
defaultValue, comment);
+ builder.column(physicalColumn);
+ }
+ // add primary key
+ primaryKey.ifPresent(builder::primaryKey);
+ // add constraint key
+ constraintKeys.forEach(builder::constraintKey);
+ TableIdentifier tableIdentifier =
TableIdentifier.of(catalogName, tablePath.getDatabaseName(),
tablePath.getTableName());
+ return CatalogTable.of(tableIdentifier, builder.build(),
buildConnectorOptions(tablePath), Collections.emptyList(), "");
+ }
- PreparedStatement ps =
- conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1
= 0;", tablePath.getFullName()));
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Failed getting table
%s", tablePath.getFullName()), e);
+ }
+ }
- ResultSetMetaData tableMetaData = ps.getMetaData();
+ // todo: If the origin source is mysql, we can directly use create table
like to create the target table?
+ @Override
+ protected boolean createTableInternal(TablePath tablePath, CatalogTable
table) throws CatalogException {
+ String dbUrl = baseUrl + tablePath.getDatabaseName();
+ String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath,
table)
+ .build();
+ try (Connection conn = DriverManager.getConnection(dbUrl, username,
pwd);
+ PreparedStatement ps = conn.prepareStatement(createTableSql)) {
+ return ps.execute();
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Failed creating table
%s", tablePath.getFullName()), e);
+ }
+ }
- TableSchema.Builder builder = TableSchema.builder();
- for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
- SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
- builder.physicalColumn(tableMetaData.getColumnName(i), type);
- }
+ @Override
+ protected boolean dropTableInternal(TablePath tablePath) throws
CatalogException {
+ String dbUrl = baseUrl + tablePath.getDatabaseName();
+ try (Connection conn = DriverManager.getConnection(dbUrl, username,
pwd);
+ PreparedStatement ps = conn.prepareStatement(String.format("DROP
TABLE %s IF EXIST;", tablePath.getFullName()))) {
+ // Will there exist concurrent drop for one table?
+ return ps.execute();
+ } catch (SQLException e) {
+ throw new CatalogException(String.format("Failed dropping table
%s", tablePath.getFullName()), e);
+ }
+ }
- primaryKey.ifPresent(builder::primaryKey);
+ @Override
+ protected boolean createDatabaseInternal(String databaseName) throws
CatalogException {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd);
+ PreparedStatement ps =
conn.prepareStatement(String.format("CREATE DATABASE `%s`;", databaseName))) {
+ return ps.execute();
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed creating database %s in catalog %s",
databaseName, this.catalogName), e);
+ }
+ }
- TableIdentifier tableIdentifier = TableIdentifier.of(catalogName,
tablePath.getDatabaseName(), tablePath.getTableName());
- return CatalogTable.of(tableIdentifier, builder.build(),
buildConnectorOptions(tablePath), Collections.emptyList(), "");
+ @Override
+ protected boolean dropDatabaseInternal(String databaseName) throws
CatalogException {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd);
+ PreparedStatement ps = conn.prepareStatement(String.format("DROP
DATABASE `%s`;", databaseName))) {
+ return ps.execute();
} catch (Exception e) {
- throw new CatalogException(String.format("Failed getting table
%s", tablePath.getFullName()), e);
+ throw new CatalogException(
+ String.format("Failed dropping database %s in catalog %s",
databaseName, this.catalogName), e);
}
}
@@ -156,67 +209,10 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
* @see ResultSetImpl#getObjectStoredProc(int, int)
*/
private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int
colIndex) throws SQLException {
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
MysqlType mysqlType =
MysqlType.getByName(metadata.getColumnTypeName(colIndex));
- switch (mysqlType) {
- case NULL:
- return BasicType.VOID_TYPE;
- case BOOLEAN:
- return BasicType.BOOLEAN_TYPE;
- case BIT:
- case TINYINT:
- return BasicType.BYTE_TYPE;
- case TINYINT_UNSIGNED:
- case SMALLINT:
- return BasicType.SHORT_TYPE;
- case SMALLINT_UNSIGNED:
- case INT:
- case MEDIUMINT:
- case MEDIUMINT_UNSIGNED:
- return BasicType.INT_TYPE;
- case INT_UNSIGNED:
- case BIGINT:
- return BasicType.LONG_TYPE;
- case FLOAT:
- case FLOAT_UNSIGNED:
- return BasicType.FLOAT_TYPE;
- case DOUBLE:
- case DOUBLE_UNSIGNED:
- return BasicType.DOUBLE_TYPE;
- case TIME:
- return LocalTimeType.LOCAL_TIME_TYPE;
- case DATE:
- return LocalTimeType.LOCAL_DATE_TYPE;
- case TIMESTAMP:
- case DATETIME:
- return LocalTimeType.LOCAL_DATE_TIME_TYPE;
- // TODO: to confirm
- case CHAR:
- case VARCHAR:
- case TINYTEXT:
- case TEXT:
- case MEDIUMTEXT:
- case LONGTEXT:
- case JSON:
- case ENUM:
- return BasicType.STRING_TYPE;
- case BINARY:
- case VARBINARY:
- case TINYBLOB:
- case BLOB:
- case MEDIUMBLOB:
- case LONGBLOB:
- case GEOMETRY:
- return PrimitiveByteArrayType.INSTANCE;
- case BIGINT_UNSIGNED:
- case DECIMAL:
- case DECIMAL_UNSIGNED:
- int precision = metadata.getPrecision(colIndex);
- int scale = metadata.getScale(colIndex);
- return new DecimalType(precision, scale);
- // TODO: support 'SET' & 'YEAR' type
- default:
- throw new
JdbcConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format("Doesn't support MySQL type '%s' yet", mysqlType.getName()));
- }
+ return DataTypeUtils.toSeaTunnelDataType(mysqlType, precision, scale);
}
@SuppressWarnings("MagicNumber")
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
new file mode 100644
index 000000000..c06b93160
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DataTypeUtils;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MysqlCreateTableSqlBuilder {
+
+
+ private final String tableName;
+ private List<Column> columns;
+
+ private String comment;
+
+ private String engine;
+ private String charset;
+ private String collate;
+
+ private PrimaryKey primaryKey;
+
+ private List<ConstraintKey> constraintKeys;
+
+ private MysqlCreateTableSqlBuilder(String tableName) {
+ checkNotNull(tableName, "tableName must not be null");
+ this.tableName = tableName;
+ }
+
+ public static MysqlCreateTableSqlBuilder builder(TablePath tablePath,
CatalogTable catalogTable) {
+ checkNotNull(tablePath, "tablePath must not be null");
+ checkNotNull(catalogTable, "catalogTable must not be null");
+
+ TableSchema tableSchema = catalogTable.getTableSchema();
+ checkNotNull(tableSchema, "tableSchema must not be null");
+
+ return new MysqlCreateTableSqlBuilder(tablePath.getTableName())
+ .comment(catalogTable.getComment())
+ // todo: set charset and collate
+ .engine(null)
+ .charset(null)
+ .primaryKey(tableSchema.getPrimaryKey())
+ .constraintKeys(tableSchema.getConstraintKeys())
+ .addColumn(tableSchema.getColumns());
+ }
+
+ public MysqlCreateTableSqlBuilder addColumn(List<Column> columns) {
+ checkArgument(CollectionUtils.isNotEmpty(columns), "columns must not
be empty");
+ this.columns = columns;
+ return this;
+ }
+
+ public MysqlCreateTableSqlBuilder primaryKey(PrimaryKey primaryKey) {
+ this.primaryKey = primaryKey;
+ return this;
+ }
+
+ public MysqlCreateTableSqlBuilder constraintKeys(List<ConstraintKey>
constraintKeys) {
+ this.constraintKeys = constraintKeys;
+ return this;
+ }
+
+ public MysqlCreateTableSqlBuilder engine(String engine) {
+ this.engine = engine;
+ return this;
+ }
+
+ public MysqlCreateTableSqlBuilder charset(String charset) {
+ this.charset = charset;
+ return this;
+ }
+
+ public MysqlCreateTableSqlBuilder collate(String collate) {
+ this.collate = collate;
+ return this;
+ }
+
+ public MysqlCreateTableSqlBuilder comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public String build() {
+ List<String> sqls = new ArrayList<>();
+ sqls.add(String.format("CREATE TABLE IF NOT EXISTS %s (\n%s\n)",
tableName, buildColumnsIdentifySql()));
+ if (engine != null) {
+ sqls.add("ENGINE = " + engine);
+ }
+ if (charset != null) {
+ sqls.add("DEFAULT CHARSET = " + charset);
+ }
+ if (collate != null) {
+ sqls.add("COLLATE = " + collate);
+ }
+ if (comment != null) {
+ sqls.add("COMMENT = '" + comment + "'");
+ }
+ return String.join(" ", sqls) + ";";
+ }
+
+ private String buildColumnsIdentifySql() {
+ List<String> columnSqls = new ArrayList<>();
+ for (Column column : columns) {
+ columnSqls.add("\t" + buildColumnIdentifySql(column));
+ }
+ if (primaryKey != null) {
+ columnSqls.add("\t" + buildPrimaryKeySql());
+ }
+ if (CollectionUtils.isNotEmpty(constraintKeys)) {
+ for (ConstraintKey constraintKey : constraintKeys) {
+ columnSqls.add("\t" + buildConstraintKeySql(constraintKey));
+ }
+ }
+ return String.join(", \n", columnSqls);
+ }
+
+ private String buildColumnIdentifySql(Column column) {
+ final List<String> columnSqls = new ArrayList<>();
+ // Column name
+ columnSqls.add(column.getName());
+ // Column type
+
columnSqls.add(DataTypeUtils.toMysqlType(column.getDataType()).getName());
+ // Column length
+ if (column.getColumnLength() != null) {
+ columnSqls.add("(" + column.getColumnLength() + ")");
+ }
+ // nullable
+ if (column.isNullable()) {
+ columnSqls.add("NULL");
+ } else {
+ columnSqls.add("NOT NULL");
+ }
+ // default value
+ if (column.getDefaultValue() != null) {
+ columnSqls.add("DEFAULT '" + column.getDefaultValue() + "'");
+ }
+ // comment
+ if (column.getComment() != null) {
+ columnSqls.add("COMMENT '" + column.getComment() + "'");
+ }
+ return String.join(" ", columnSqls);
+ }
+
+ private String buildPrimaryKeySql() {
+ String key = primaryKey.getColumnNames()
+ .stream()
+ .map(columnName -> "`" + columnName + "`")
+ .collect(Collectors.joining(", "));
+ // add sort type
+ return String.format("PRIMARY KEY (%s)", key);
+ }
+
+ private String buildConstraintKeySql(ConstraintKey constraintKey) {
+ ConstraintKey.ConstraintType constraintType =
constraintKey.getConstraintType();
+ String indexColumns = constraintKey.getColumnNames()
+ .stream()
+ .map(constraintKeyColumn -> {
+ if (constraintKeyColumn.getSortType() == null) {
+ return String.format("`%s`",
constraintKeyColumn.getColumnName());
+ }
+ return String.format("`%s` %s",
constraintKeyColumn.getColumnName(), constraintKeyColumn.getSortType().name());
+ }).collect(Collectors.joining(", "));
+ String keyName = null;
+ switch (constraintType) {
+ case KEY:
+ keyName = "KEY";
+ break;
+ case UNIQUE_KEY:
+ keyName = "UNIQUE KEY";
+ break;
+ case FOREIGN_KEY:
+ keyName = "FOREIGN KEY";
+ // todo:
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported
constraint type: " + constraintType);
+ }
+ return String.format("%s `%s` (%s)", keyName,
constraintKey.getConstraintName(), indexColumns);
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
new file mode 100644
index 000000000..83af004f5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+
+import com.mysql.cj.MysqlType;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@UtilityClass
+public class DataTypeUtils {
+
+ public static SeaTunnelDataType<?> toSeaTunnelDataType(MysqlType
mysqlType, int precision, int scale) {
+ switch (mysqlType) {
+ case NULL:
+ return BasicType.VOID_TYPE;
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case BIT:
+ case TINYINT:
+ return BasicType.BYTE_TYPE;
+ case TINYINT_UNSIGNED:
+ case SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case SMALLINT_UNSIGNED:
+ case INT:
+ case MEDIUMINT:
+ case MEDIUMINT_UNSIGNED:
+ return BasicType.INT_TYPE;
+ case INT_UNSIGNED:
+ case BIGINT:
+ return BasicType.LONG_TYPE;
+ case FLOAT:
+ case FLOAT_UNSIGNED:
+ return BasicType.FLOAT_TYPE;
+ case DOUBLE:
+ case DOUBLE_UNSIGNED:
+ return BasicType.DOUBLE_TYPE;
+ case TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case TIMESTAMP:
+ case DATETIME:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ // TODO: to confirm
+ case CHAR:
+ case VARCHAR:
+ case TINYTEXT:
+ case TEXT:
+ case MEDIUMTEXT:
+ case LONGTEXT:
+ case JSON:
+ case ENUM:
+ return BasicType.STRING_TYPE;
+ case BINARY:
+ case VARBINARY:
+ case TINYBLOB:
+ case BLOB:
+ case MEDIUMBLOB:
+ case LONGBLOB:
+ case GEOMETRY:
+ return PrimitiveByteArrayType.INSTANCE;
+ case BIGINT_UNSIGNED:
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ return new DecimalType(precision, scale);
+ // TODO: support 'SET' & 'YEAR' type
+ default:
+ throw new
JdbcConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format("Doesn't support MySQL type '%s' yet", mysqlType.getName()));
+ }
+ }
+
+ public MysqlType toMysqlType(SeaTunnelDataType<?> seaTunnelDataType) {
+ SqlType sqlType = seaTunnelDataType.getSqlType();
+ // todo: verify
+ switch (sqlType) {
+ case ARRAY:
+ case MAP:
+ case ROW:
+ case STRING:
+ return MysqlType.VARCHAR;
+ case BOOLEAN:
+ return MysqlType.BOOLEAN;
+ case TINYINT:
+ return MysqlType.TINYINT;
+ case SMALLINT:
+ return MysqlType.SMALLINT;
+ case INT:
+ return MysqlType.INT;
+ case BIGINT:
+ return MysqlType.BIGINT;
+ case FLOAT:
+ return MysqlType.FLOAT;
+ case DOUBLE:
+ return MysqlType.DOUBLE;
+ case DECIMAL:
+ return MysqlType.DECIMAL;
+ case NULL:
+ return MysqlType.NULL;
+ case BYTES:
+ return MysqlType.BIT;
+ case DATE:
+ return MysqlType.DATE;
+ case TIME:
+ return MysqlType.DATETIME;
+ case TIMESTAMP:
+ return MysqlType.TIMESTAMP;
+ default:
+ throw new
JdbcConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format("Doesn't support MySQL type '%s' yet", sqlType));
+
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
new file mode 100644
index 000000000..dca2304a7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class MysqlCreateTableSqlBuilderTest {
+
+ private static final PrintStream CONSOLE = System.out;
+
+ @Test
+ public void testBuild() {
+ // todo
+ String dataBaseName = "test_database";
+ String tableName = "test_table";
+ TablePath tablePath = TablePath.of(dataBaseName, tableName);
+ TableSchema tableSchema = TableSchema.builder()
+ .column(PhysicalColumn.of("id", BasicType.LONG_TYPE, 22, false,
null, "id"))
+ .column(PhysicalColumn.of("name", BasicType.STRING_TYPE, 128,
false, null, "name"))
+ .column(PhysicalColumn.of("age", BasicType.INT_TYPE, null, true,
null, "age"))
+ .column(PhysicalColumn.of("createTime",
LocalTimeType.LOCAL_DATE_TIME_TYPE, 3, true, null, "createTime"))
+ .column(PhysicalColumn.of("lastUpdateTime",
LocalTimeType.LOCAL_DATE_TIME_TYPE, 3, true, null, "lastUpdateTime"))
+ .primaryKey(PrimaryKey.of("id", Lists.newArrayList("id")))
+ .constraintKey(ConstraintKey.of(ConstraintKey.ConstraintType.KEY,
"name", Lists.newArrayList(ConstraintKey.ConstraintKeyColumn.of("name", null))))
+ .build();
+ CatalogTable catalogTable = CatalogTable.of(
+ TableIdentifier.of("test_catalog", dataBaseName, tableName),
+ tableSchema,
+ new HashMap<>(),
+ new ArrayList<>(),
+ "User table"
+ );
+
+ String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath,
catalogTable)
+ .build();
+ String expect = "CREATE TABLE IF NOT EXISTS test_table (\n" +
+ "\tid BIGINT (22) NOT NULL COMMENT 'id', \n" +
+ "\tname VARCHAR (128) NOT NULL COMMENT 'name', \n" +
+ "\tage INT NULL COMMENT 'age', \n" +
+ "\tcreateTime TIMESTAMP (3) NULL COMMENT 'createTime', \n" +
+ "\tlastUpdateTime TIMESTAMP (3) NULL COMMENT 'lastUpdateTime', \n"
+
+ "\tPRIMARY KEY (`id`), \n" +
+ "\tKEY `name` (`name`)\n" +
+ ") COMMENT = 'User table';";
+ CONSOLE.println(expect);
+ Assertions.assertEquals(expect, createTableSql);
+ }
+}