This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8540c7f9f [Hotfix][Connector] Fix sqlserver catalog (#4441)
8540c7f9f is described below

commit 8540c7f9f33415b88d0878aee57b952934febd1f
Author: hailin0 <[email protected]>
AuthorDate: Thu Mar 30 10:52:12 2023 +0800

    [Hotfix][Connector] Fix sqlserver catalog (#4441)
---
 .../api/table/catalog/TableIdentifier.java         | 59 +++++++---------------
 .../seatunnel/api/table/catalog/TablePath.java     | 53 ++++---------------
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 24 +++++++--
 .../jdbc/catalog/sqlserver/SqlServerCatalog.java   | 34 +++++++++++--
 4 files changed, 80 insertions(+), 90 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
index 102570b82..335ffebc4 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
@@ -17,9 +17,15 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
 import java.io.Serializable;
-import java.util.Objects;
 
+@Getter
+@EqualsAndHashCode
+@RequiredArgsConstructor
 public final class TableIdentifier implements Serializable {
     private static final long serialVersionUID = 1L;
 
@@ -27,59 +33,32 @@ public final class TableIdentifier implements Serializable {
 
     private final String databaseName;
 
-    private final String tableName;
+    private final String schemaName;
 
-    private TableIdentifier(String catalogName, String databaseName, String 
tableName) {
-        this.catalogName = catalogName;
-        this.databaseName = databaseName;
-        this.tableName = tableName;
-    }
+    private final String tableName;
 
     public static TableIdentifier of(String catalogName, String databaseName, 
String tableName) {
-        return new TableIdentifier(catalogName, databaseName, tableName);
+        return new TableIdentifier(catalogName, databaseName, null, tableName);
     }
 
-    public String getCatalogName() {
-        return catalogName;
-    }
-
-    public String getDatabaseName() {
-        return databaseName;
-    }
-
-    public String getTableName() {
-        return tableName;
+    public static TableIdentifier of(
+            String catalogName, String databaseName, String schemaName, String 
tableName) {
+        return new TableIdentifier(catalogName, databaseName, schemaName, 
tableName);
     }
 
     public TablePath toTablePath() {
-        return TablePath.of(databaseName, tableName);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        TableIdentifier that = (TableIdentifier) o;
-        return catalogName.equals(that.catalogName)
-                && databaseName.equals(that.databaseName)
-                && tableName.equals(that.tableName);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(catalogName, databaseName, tableName);
+        return TablePath.of(databaseName, schemaName, tableName);
     }
 
     public TableIdentifier copy() {
-        return TableIdentifier.of(catalogName, databaseName, tableName);
+        return TableIdentifier.of(catalogName, databaseName, schemaName, 
tableName);
     }
 
     @Override
     public String toString() {
-        return String.join(".", catalogName, databaseName, tableName);
+        if (schemaName == null) {
+            return String.join(".", catalogName, databaseName, tableName);
+        }
+        return String.join(".", catalogName, databaseName, schemaName, 
tableName);
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
index 7969e1434..7b2dd6d55 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
@@ -17,33 +17,29 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
 import java.io.Serializable;
-import java.util.Objects;
 
+@Getter
+@EqualsAndHashCode
+@RequiredArgsConstructor
 public final class TablePath implements Serializable {
     private static final long serialVersionUID = 1L;
     private final String databaseName;
     private final String schemaName;
     private final String tableName;
 
-    private TablePath(String databaseName, String tableName) {
-        this(databaseName, null, tableName);
-    }
-
-    private TablePath(String databaseName, String schemaName, String 
tableName) {
-        this.databaseName = databaseName;
-        this.schemaName = schemaName;
-        this.tableName = tableName;
-    }
-
     public static TablePath of(String fullName) {
         String[] paths = fullName.split("\\.");
 
         if (paths.length == 2) {
-            return new TablePath(paths[0], paths[1]);
+            return of(paths[0], paths[1]);
         }
         if (paths.length == 3) {
-            return new TablePath(paths[0], paths[1], paths[2]);
+            return of(paths[0], paths[1], paths[2]);
         }
         throw new IllegalArgumentException(
                 String.format("Cannot get split '%s' to get databaseName and 
tableName", fullName));
@@ -57,14 +53,7 @@ public final class TablePath implements Serializable {
         return new TablePath(databaseName, schemaName, tableName);
     }
 
-    public String getDatabaseName() {
-        return databaseName;
-    }
-
-    public String getTableName() {
-        if (schemaName == null) {
-            return tableName;
-        }
+    public String getSchemaAndTableName() {
         return String.format("%s.%s", schemaName, tableName);
     }
 
@@ -89,28 +78,6 @@ public final class TablePath implements Serializable {
                 quote, databaseName, quote, quote, schemaName, quote, quote, 
tableName, quote);
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        TablePath that = (TablePath) o;
-
-        return Objects.equals(databaseName, that.databaseName)
-                && Objects.equals(schemaName, that.schemaName)
-                && Objects.equals(tableName, that.tableName);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(databaseName, schemaName, tableName);
-    }
-
     @Override
     public String toString() {
         return getFullName();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index d2f413e88..07098e810 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -123,11 +123,17 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
 
     protected Optional<PrimaryKey> getPrimaryKey(
             DatabaseMetaData metaData, String database, String table) throws 
SQLException {
+        return getPrimaryKey(metaData, database, table, table);
+    }
+
+    protected Optional<PrimaryKey> getPrimaryKey(
+            DatabaseMetaData metaData, String database, String schema, String 
table)
+            throws SQLException {
 
         // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(database, table, table);
+        ResultSet rs = metaData.getPrimaryKeys(database, schema, table);
 
         // seq -> column name
         List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
@@ -154,7 +160,13 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
 
     protected List<ConstraintKey> getConstraintKeys(
             DatabaseMetaData metaData, String database, String table) throws 
SQLException {
-        ResultSet resultSet = metaData.getIndexInfo(database, table, table, 
false, false);
+        return getConstraintKeys(metaData, database, table, table);
+    }
+
+    protected List<ConstraintKey> getConstraintKeys(
+            DatabaseMetaData metaData, String database, String schema, String 
table)
+            throws SQLException {
+        ResultSet resultSet = metaData.getIndexInfo(database, schema, table, 
false, false);
         // index name -> index
         Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
         while (resultSet.next()) {
@@ -189,7 +201,13 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
 
     protected Optional<String> getColumnDefaultValue(
             DatabaseMetaData metaData, String table, String column) throws 
SQLException {
-        try (ResultSet resultSet = metaData.getColumns(null, null, table, 
column)) {
+        return getColumnDefaultValue(metaData, null, null, table, column);
+    }
+
+    protected Optional<String> getColumnDefaultValue(
+            DatabaseMetaData metaData, String database, String schema, String 
table, String column)
+            throws SQLException {
+        try (ResultSet resultSet = metaData.getColumns(database, schema, 
table, column)) {
             while (resultSet.next()) {
                 String defaultValue = resultSet.getString("COLUMN_DEF");
                 return Optional.ofNullable(defaultValue);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 9f110ced7..25c02e6b1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -118,6 +118,17 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         }
     }
 
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            return databaseExists(tablePath.getDatabaseName())
+                    && listTables(tablePath.getDatabaseName())
+                            .contains(tablePath.getSchemaAndTableName());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
+
     @Override
     public CatalogTable getTable(TablePath tablePath)
             throws CatalogException, TableNotExistException {
@@ -129,10 +140,17 @@ public class SqlServerCatalog extends AbstractJdbcCatalog 
{
         try (Connection conn = DriverManager.getConnection(dbUrl, username, 
pwd)) {
             DatabaseMetaData metaData = conn.getMetaData();
             Optional<PrimaryKey> primaryKey =
-                    getPrimaryKey(metaData, tablePath.getDatabaseName(), 
tablePath.getTableName());
+                    getPrimaryKey(
+                            metaData,
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName());
             List<ConstraintKey> constraintKeys =
                     getConstraintKeys(
-                            metaData, tablePath.getDatabaseName(), 
tablePath.getTableName());
+                            metaData,
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName());
 
             try (PreparedStatement ps =
                     conn.prepareStatement(
@@ -150,7 +168,12 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
                     boolean isNullable =
                             tableMetaData.isNullable(i) == 
ResultSetMetaData.columnNullable;
                     Object defaultValue =
-                            getColumnDefaultValue(metaData, 
tablePath.getTableName(), columnName)
+                            getColumnDefaultValue(
+                                            metaData,
+                                            tablePath.getDatabaseName(),
+                                            tablePath.getSchemaName(),
+                                            tablePath.getTableName(),
+                                            columnName)
                                     .orElse(null);
 
                     PhysicalColumn physicalColumn =
@@ -169,7 +192,10 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
                 constraintKeys.forEach(builder::constraintKey);
                 TableIdentifier tableIdentifier =
                         TableIdentifier.of(
-                                catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName());
+                                catalogName,
+                                tablePath.getDatabaseName(),
+                                tablePath.getSchemaName(),
+                                tablePath.getTableName());
                 return CatalogTable.of(
                         tableIdentifier,
                         builder.build(),

Reply via email to