This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8540c7f9f [Hotfix][Connector] Fix sqlserver catalog (#4441)
8540c7f9f is described below
commit 8540c7f9f33415b88d0878aee57b952934febd1f
Author: hailin0 <[email protected]>
AuthorDate: Thu Mar 30 10:52:12 2023 +0800
[Hotfix][Connector] Fix sqlserver catalog (#4441)
---
.../api/table/catalog/TableIdentifier.java | 59 +++++++---------------
.../seatunnel/api/table/catalog/TablePath.java | 53 ++++---------------
.../jdbc/catalog/AbstractJdbcCatalog.java | 24 +++++++--
.../jdbc/catalog/sqlserver/SqlServerCatalog.java | 34 +++++++++++--
4 files changed, 80 insertions(+), 90 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
index 102570b82..335ffebc4 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
@@ -17,9 +17,15 @@
package org.apache.seatunnel.api.table.catalog;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
import java.io.Serializable;
-import java.util.Objects;
+@Getter
+@EqualsAndHashCode
+@RequiredArgsConstructor
public final class TableIdentifier implements Serializable {
private static final long serialVersionUID = 1L;
@@ -27,59 +33,32 @@ public final class TableIdentifier implements Serializable {
private final String databaseName;
- private final String tableName;
+ private final String schemaName;
- private TableIdentifier(String catalogName, String databaseName, String
tableName) {
- this.catalogName = catalogName;
- this.databaseName = databaseName;
- this.tableName = tableName;
- }
+ private final String tableName;
public static TableIdentifier of(String catalogName, String databaseName,
String tableName) {
- return new TableIdentifier(catalogName, databaseName, tableName);
+ return new TableIdentifier(catalogName, databaseName, null, tableName);
}
- public String getCatalogName() {
- return catalogName;
- }
-
- public String getDatabaseName() {
- return databaseName;
- }
-
- public String getTableName() {
- return tableName;
+ public static TableIdentifier of(
+ String catalogName, String databaseName, String schemaName, String
tableName) {
+ return new TableIdentifier(catalogName, databaseName, schemaName,
tableName);
}
public TablePath toTablePath() {
- return TablePath.of(databaseName, tableName);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TableIdentifier that = (TableIdentifier) o;
- return catalogName.equals(that.catalogName)
- && databaseName.equals(that.databaseName)
- && tableName.equals(that.tableName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(catalogName, databaseName, tableName);
+ return TablePath.of(databaseName, schemaName, tableName);
}
public TableIdentifier copy() {
- return TableIdentifier.of(catalogName, databaseName, tableName);
+ return TableIdentifier.of(catalogName, databaseName, schemaName,
tableName);
}
@Override
public String toString() {
- return String.join(".", catalogName, databaseName, tableName);
+ if (schemaName == null) {
+ return String.join(".", catalogName, databaseName, tableName);
+ }
+ return String.join(".", catalogName, databaseName, schemaName,
tableName);
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
index 7969e1434..7b2dd6d55 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
@@ -17,33 +17,29 @@
package org.apache.seatunnel.api.table.catalog;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
import java.io.Serializable;
-import java.util.Objects;
+@Getter
+@EqualsAndHashCode
+@RequiredArgsConstructor
public final class TablePath implements Serializable {
private static final long serialVersionUID = 1L;
private final String databaseName;
private final String schemaName;
private final String tableName;
- private TablePath(String databaseName, String tableName) {
- this(databaseName, null, tableName);
- }
-
- private TablePath(String databaseName, String schemaName, String
tableName) {
- this.databaseName = databaseName;
- this.schemaName = schemaName;
- this.tableName = tableName;
- }
-
public static TablePath of(String fullName) {
String[] paths = fullName.split("\\.");
if (paths.length == 2) {
- return new TablePath(paths[0], paths[1]);
+ return of(paths[0], paths[1]);
}
if (paths.length == 3) {
- return new TablePath(paths[0], paths[1], paths[2]);
+ return of(paths[0], paths[1], paths[2]);
}
throw new IllegalArgumentException(
String.format("Cannot get split '%s' to get databaseName and
tableName", fullName));
@@ -57,14 +53,7 @@ public final class TablePath implements Serializable {
return new TablePath(databaseName, schemaName, tableName);
}
- public String getDatabaseName() {
- return databaseName;
- }
-
- public String getTableName() {
- if (schemaName == null) {
- return tableName;
- }
+ public String getSchemaAndTableName() {
return String.format("%s.%s", schemaName, tableName);
}
@@ -89,28 +78,6 @@ public final class TablePath implements Serializable {
quote, databaseName, quote, quote, schemaName, quote, quote,
tableName, quote);
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- TablePath that = (TablePath) o;
-
- return Objects.equals(databaseName, that.databaseName)
- && Objects.equals(schemaName, that.schemaName)
- && Objects.equals(tableName, that.tableName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(databaseName, schemaName, tableName);
- }
-
@Override
public String toString() {
return getFullName();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index d2f413e88..07098e810 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -123,11 +123,17 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
protected Optional<PrimaryKey> getPrimaryKey(
DatabaseMetaData metaData, String database, String table) throws
SQLException {
+ return getPrimaryKey(metaData, database, table, table);
+ }
+
+ protected Optional<PrimaryKey> getPrimaryKey(
+ DatabaseMetaData metaData, String database, String schema, String
table)
+ throws SQLException {
// According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME, not by
KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
- ResultSet rs = metaData.getPrimaryKeys(database, table, table);
+ ResultSet rs = metaData.getPrimaryKeys(database, schema, table);
// seq -> column name
List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
@@ -154,7 +160,13 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
protected List<ConstraintKey> getConstraintKeys(
DatabaseMetaData metaData, String database, String table) throws
SQLException {
- ResultSet resultSet = metaData.getIndexInfo(database, table, table,
false, false);
+ return getConstraintKeys(metaData, database, table, table);
+ }
+
+ protected List<ConstraintKey> getConstraintKeys(
+ DatabaseMetaData metaData, String database, String schema, String
table)
+ throws SQLException {
+ ResultSet resultSet = metaData.getIndexInfo(database, schema, table,
false, false);
// index name -> index
Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
while (resultSet.next()) {
@@ -189,7 +201,13 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
protected Optional<String> getColumnDefaultValue(
DatabaseMetaData metaData, String table, String column) throws
SQLException {
- try (ResultSet resultSet = metaData.getColumns(null, null, table,
column)) {
+ return getColumnDefaultValue(metaData, null, null, table, column);
+ }
+
+ protected Optional<String> getColumnDefaultValue(
+ DatabaseMetaData metaData, String database, String schema, String
table, String column)
+ throws SQLException {
+ try (ResultSet resultSet = metaData.getColumns(database, schema,
table, column)) {
while (resultSet.next()) {
String defaultValue = resultSet.getString("COLUMN_DEF");
return Optional.ofNullable(defaultValue);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 9f110ced7..25c02e6b1 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -118,6 +118,17 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
}
}
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ try {
+ return databaseExists(tablePath.getDatabaseName())
+ && listTables(tablePath.getDatabaseName())
+ .contains(tablePath.getSchemaAndTableName());
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+ }
+
@Override
public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
@@ -129,10 +140,17 @@ public class SqlServerCatalog extends AbstractJdbcCatalog
{
try (Connection conn = DriverManager.getConnection(dbUrl, username,
pwd)) {
DatabaseMetaData metaData = conn.getMetaData();
Optional<PrimaryKey> primaryKey =
- getPrimaryKey(metaData, tablePath.getDatabaseName(),
tablePath.getTableName());
+ getPrimaryKey(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
List<ConstraintKey> constraintKeys =
getConstraintKeys(
- metaData, tablePath.getDatabaseName(),
tablePath.getTableName());
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
try (PreparedStatement ps =
conn.prepareStatement(
@@ -150,7 +168,12 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
boolean isNullable =
tableMetaData.isNullable(i) ==
ResultSetMetaData.columnNullable;
Object defaultValue =
- getColumnDefaultValue(metaData,
tablePath.getTableName(), columnName)
+ getColumnDefaultValue(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName(),
+ columnName)
.orElse(null);
PhysicalColumn physicalColumn =
@@ -169,7 +192,10 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
constraintKeys.forEach(builder::constraintKey);
TableIdentifier tableIdentifier =
TableIdentifier.of(
- catalogName, tablePath.getDatabaseName(),
tablePath.getTableName());
+ catalogName,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
return CatalogTable.of(
tableIdentifier,
builder.build(),