This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new eeb4c0b704 [Fix][Connector-V2] Fix wrong column discovery when 
tableNamePattern is treated as LIKE (#10422)
eeb4c0b704 is described below

commit eeb4c0b7040a0e309f2a1686caa5874884bd99d0
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Feb 4 20:21:12 2026 +0800

    [Fix][Connector-V2] Fix wrong column discovery when tableNamePattern is 
treated as LIKE (#10422)
---
 docs/en/connectors/source/Jdbc.md                  |   9 +-
 docs/en/connectors/source/SqlServer-CDC.md         |  10 +-
 docs/zh/connectors/source/Jdbc.md                  |   7 +
 docs/zh/connectors/source/SqlServer-CDC.md         |   9 +-
 .../connector/sqlserver/SqlServerConnection.java   |  31 +++
 .../sqlserver/SqlServerConnectionTest.java         | 140 ++++++++++
 .../jdbc/catalog/utils/JdbcColumnConverter.java    |  33 +++
 .../jdbc/catalog/utils/JdbcIdentifierUtils.java    |  79 ++++++
 .../internal/dialect/JdbcDialectTypeMapper.java    |  35 +++
 .../jdbc/catalog/utils/CatalogUtilsTest.java       | 303 +++++++++++++++++++++
 .../jdbc/catalog/utils/TestDatabaseMetaData.java   |   3 +
 11 files changed, 656 insertions(+), 3 deletions(-)

diff --git a/docs/en/connectors/source/Jdbc.md 
b/docs/en/connectors/source/Jdbc.md
index 0d573a8235..2f8df5dd0e 100644
--- a/docs/en/connectors/source/Jdbc.md
+++ b/docs/en/connectors/source/Jdbc.md
@@ -79,6 +79,13 @@ supports query SQL and can achieve projection effect.
 
 The JDBC Source connector supports two ways to specify tables:
 
+#### Notes
+
+- Many JDBC drivers treat `DatabaseMetaData.getColumns(..., schemaPattern, 
tableNamePattern, ...)` as SQL LIKE patterns.
+  If your schema/table names contain `_` or `%`, column discovery may return 
rows from other tables. SeaTunnel filters the
+  returned metadata rows by exact schema/table identifier to avoid mixing 
columns.
+- For case-sensitive databases, make sure the configured schema/table names 
use the exact identifier case.
+
 1. **Exact Table Path**: Use `table_path` to specify a single table with its 
full path.
    ```hocon
    table_path = "testdb.table1"
@@ -468,4 +475,4 @@ Jdbc {
 
 ## Changelog
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/en/connectors/source/SqlServer-CDC.md 
b/docs/en/connectors/source/SqlServer-CDC.md
index 134f82fa47..eb91c0a0ea 100644
--- a/docs/en/connectors/source/SqlServer-CDC.md
+++ b/docs/en/connectors/source/SqlServer-CDC.md
@@ -27,6 +27,14 @@ import ChangeLog from 
'../changelog/connector-cdc-sqlserver.md';
 The Sql Server CDC connector allows for reading snapshot data and incremental 
data from SqlServer database. This document
 describes how to setup the Sql Server CDC connector to run SQL queries against 
SqlServer databases.
 
+:::tip
+
+When discovering table columns via JDBC metadata, SeaTunnel filters metadata 
rows by the exact schema/table identifier to
+avoid mixing columns from other tables (some drivers treat 
`schemaPattern`/`tableNamePattern` as SQL LIKE patterns). For
+case-sensitive databases, make sure the configured identifier case matches the 
database.
+
+:::
+
 ## Supported DataSource Info
 
 | Datasource |                      Supported versions                       | 
                   Driver                    |                              Url 
                             |                                 Maven            
                     |
@@ -234,4 +242,4 @@ sink {
 
 ## Changelog
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connectors/source/Jdbc.md 
b/docs/zh/connectors/source/Jdbc.md
index 5f778c5b8d..6dde74db26 100644
--- a/docs/zh/connectors/source/Jdbc.md
+++ b/docs/zh/connectors/source/Jdbc.md
@@ -73,6 +73,13 @@ import ChangeLog from '../changelog/connector-jdbc.md';
 
 JDBC 源连接器支持两种方式指定表:
 
+#### 注意事项
+
+- 许多 JDBC 驱动会将 `DatabaseMetaData.getColumns(..., schemaPattern, 
tableNamePattern, ...)` 视为 SQL LIKE 的模式匹配。
+  当 schema/table 名称中包含 `_` 或 `%` 时,列发现可能会返回其他表的列。SeaTunnel 会按精确的 schema/table 
标识符对返回结果做二次过滤,
+  以避免混入其他表的列。
+- 对于大小写敏感的数据库,请确保配置的 schema/table 名称与数据库中实际标识符大小写一致。
+
 1. **精确表路径**:使用 `table_path` 指定单个表及其完整路径。
    ```hocon
    table_path = "testdb.table1"
diff --git a/docs/zh/connectors/source/SqlServer-CDC.md 
b/docs/zh/connectors/source/SqlServer-CDC.md
index 7afbacc2f0..064c770008 100644
--- a/docs/zh/connectors/source/SqlServer-CDC.md
+++ b/docs/zh/connectors/source/SqlServer-CDC.md
@@ -26,6 +26,13 @@ import ChangeLog from 
'../changelog/connector-cdc-sqlserver.md';
 
 Sql Server CDC 连接器允许从 SqlServer 数据库读取快照数据和增量数据。本文档描述了如何设置 Sql Server CDC 连接器来对 
SqlServer 数据库运行 SQL 查询。
 
+:::tip
+
+在通过 JDBC 元数据发现表列信息时,SeaTunnel 会按精确的 schema/table 
标识符对返回结果做二次过滤,以避免混入其他表的列(部分驱动会将
+`schemaPattern`/`tableNamePattern` 视为 SQL LIKE 
模式匹配)。对于大小写敏感的数据库,请确保配置的标识符大小写与数据库一致。
+
+:::
+
 ## 支持的数据源信息
 
 | 数据源    | 支持版本                                      | 驱动                      
                   | Url                                                        
   | Maven                                                                 |
@@ -233,4 +240,4 @@ sink {
 
 ## 变更日志
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
index a3884a0280..3a2bb3c729 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
@@ -19,6 +19,8 @@ package io.debezium.connector.sqlserver;
 
 import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.JdbcIdentifierUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -696,8 +698,11 @@ public class SqlServerConnection extends JdbcConnection {
     public Table getTableSchemaFromTable(String databaseName, 
SqlServerChangeTable changeTable)
             throws SQLException {
         final DatabaseMetaData metadata = connection().getMetaData();
+        JdbcIdentifierUtils.IdentifierCaseStrategy identifierCaseStrategy =
+                JdbcIdentifierUtils.identifierCaseStrategy(metadata);
 
         List<Column> columns = new ArrayList<>();
+        int filteredRows = 0;
         try (ResultSet rs =
                 metadata.getColumns(
                         databaseName,
@@ -705,6 +710,24 @@ public class SqlServerConnection extends JdbcConnection {
                         changeTable.getSourceTableId().table(),
                         null)) {
             while (rs.next()) {
+                // `tableNamePattern` is treated as a SQL LIKE pattern by many 
drivers, so filter
+                // the ResultSet by exact table/schema to avoid mixing columns 
from other tables.
+                String actualTableName = rs.getString("TABLE_NAME");
+                if (!JdbcIdentifierUtils.identifierEquals(
+                        identifierCaseStrategy,
+                        changeTable.getSourceTableId().table(),
+                        actualTableName)) {
+                    filteredRows++;
+                    continue;
+                }
+                String actualSchemaName = rs.getString("TABLE_SCHEM");
+                if (!JdbcIdentifierUtils.identifierEquals(
+                        identifierCaseStrategy,
+                        changeTable.getSourceTableId().schema(),
+                        actualSchemaName)) {
+                    filteredRows++;
+                    continue;
+                }
                 readTableColumn(rs, changeTable.getSourceTableId(), null)
                         .ifPresent(
                                 ce -> {
@@ -715,6 +738,14 @@ public class SqlServerConnection extends JdbcConnection {
                                 });
             }
         }
+        if (columns.isEmpty() && filteredRows > 0) {
+            LOGGER.warn(
+                    "No columns found for table '{}' in database '{}'. 
Filtered {} rows returned by JDBC driver. "
+                            + "The table may not exist or the database 
requires exact identifier case.",
+                    changeTable.getSourceTableId(),
+                    databaseName,
+                    filteredRows);
+        }
 
         final List<String> pkColumnNames =
                 readPrimaryKeyOrUniqueIndexNames(metadata, 
changeTable.getSourceTableId()).stream()
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionTest.java
new file mode 100644
index 0000000000..f74f545fd4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.sqlserver;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.config.Configuration;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.relational.Column;
+import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SqlServerConnectionTest {
+
+    @Test
+    void testGetTableSchemaFromTableFiltersOutWildcardTables() throws 
Exception {
+        String databaseName = "test_db";
+        TableId tableId = TableId.parse(databaseName + ".dbo.user_info");
+
+        SqlServerChangeTable changeTable = mock(SqlServerChangeTable.class);
+        when(changeTable.getSourceTableId()).thenReturn(tableId);
+        
when(changeTable.getCapturedColumns()).thenReturn(Collections.singletonList("id"));
+
+        ResultSet columnsRs = mock(ResultSet.class);
+        when(columnsRs.next()).thenReturn(true, true, false);
+        when(columnsRs.getString("TABLE_NAME")).thenReturn("user_info", 
"userAinfo");
+        when(columnsRs.getString("TABLE_SCHEM")).thenReturn("dbo", "dbo");
+        when(columnsRs.getString("COLUMN_NAME")).thenReturn("id", "bad");
+
+        DatabaseMetaData metadata = mock(DatabaseMetaData.class);
+        when(metadata.getColumns(eq(databaseName), eq("dbo"), eq("user_info"), 
isNull()))
+                .thenReturn(columnsRs);
+
+        Connection jdbcConnection = mock(Connection.class);
+        when(jdbcConnection.getMetaData()).thenReturn(metadata);
+
+        TestSqlServerConnection connection = new 
TestSqlServerConnection(jdbcConnection);
+        Table table = connection.getTableSchemaFromTable(databaseName, 
changeTable);
+
+        Assertions.assertEquals(1, table.columns().size());
+        Assertions.assertEquals("id", table.columns().get(0).name());
+    }
+
+    @Test
+    void testGetTableSchemaFromTableCaseSensitiveRequiresExactMatch() throws 
Exception {
+        String databaseName = "test_db";
+        TableId tableId = TableId.parse(databaseName + ".dbo.UserInfo");
+
+        SqlServerChangeTable changeTable = mock(SqlServerChangeTable.class);
+        when(changeTable.getSourceTableId()).thenReturn(tableId);
+        
when(changeTable.getCapturedColumns()).thenReturn(Collections.singletonList("id"));
+
+        ResultSet columnsRs = mock(ResultSet.class);
+        when(columnsRs.next()).thenReturn(true, false);
+        when(columnsRs.getString("TABLE_NAME")).thenReturn("userinfo");
+        when(columnsRs.getString("TABLE_SCHEM")).thenReturn("dbo");
+        when(columnsRs.getString("COLUMN_NAME")).thenReturn("id");
+
+        DatabaseMetaData metadata = mock(DatabaseMetaData.class);
+        when(metadata.supportsMixedCaseIdentifiers()).thenReturn(true);
+        when(metadata.getColumns(eq(databaseName), eq("dbo"), eq("UserInfo"), 
isNull()))
+                .thenReturn(columnsRs);
+
+        Connection jdbcConnection = mock(Connection.class);
+        when(jdbcConnection.getMetaData()).thenReturn(metadata);
+
+        TestSqlServerConnection connection = new 
TestSqlServerConnection(jdbcConnection);
+        Table table = connection.getTableSchemaFromTable(databaseName, 
changeTable);
+
+        Assertions.assertTrue(table.columns().isEmpty());
+    }
+
+    private static final class TestSqlServerConnection extends 
SqlServerConnection {
+        private final Connection jdbcConnection;
+
+        private TestSqlServerConnection(Connection jdbcConnection) {
+            super(
+                    JdbcConfiguration.adapt(Configuration.create().build()),
+                    SourceTimestampMode.COMMIT,
+                    mock(SqlServerValueConverters.class),
+                    SqlServerConnectionTest.class::getClassLoader,
+                    Collections.emptySet(),
+                    false);
+            this.jdbcConnection = jdbcConnection;
+        }
+
+        @Override
+        public synchronized Connection connection(boolean executeOnConnect) 
throws SQLException {
+            return jdbcConnection;
+        }
+
+        @Override
+        protected Optional<ColumnEditor> readTableColumn(
+                ResultSet columnMetadata, TableId tableId, 
Tables.ColumnNameFilter columnFilter)
+                throws SQLException {
+            String columnName = columnMetadata.getString("COLUMN_NAME");
+            ColumnEditor editor =
+                    
Column.editor().name(columnName).type("INT").jdbcType(Types.INTEGER);
+            return Optional.of(editor);
+        }
+
+        @Override
+        protected List<String> readPrimaryKeyOrUniqueIndexNames(
+                DatabaseMetaData metadata, TableId tableId) throws 
SQLException {
+            return Collections.emptyList();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
index 1184e5e59e..50d0a1d53f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
@@ -26,6 +26,9 @@ import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
@@ -69,10 +72,14 @@ import static java.sql.Types.VARCHAR;
  */
 @Deprecated
 public class JdbcColumnConverter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcColumnConverter.class);
 
     public static List<Column> convert(DatabaseMetaData metadata, TablePath 
tablePath)
             throws SQLException {
         List<Column> columns = new ArrayList<>();
+        int filteredRows = 0;
+        JdbcIdentifierUtils.IdentifierCaseStrategy identifierCaseStrategy =
+                JdbcIdentifierUtils.identifierCaseStrategy(metadata);
 
         try (ResultSet columnsResultSet =
                 metadata.getColumns(
@@ -82,6 +89,23 @@ public class JdbcColumnConverter {
                         null)) {
 
             while (columnsResultSet.next()) {
+                // `tableNamePattern` is treated as a SQL LIKE pattern by many 
drivers, so filter
+                // the ResultSet by exact table/schema to avoid mixing columns 
from other tables.
+                String actualTableName = 
columnsResultSet.getString("TABLE_NAME");
+                if (!JdbcIdentifierUtils.identifierEquals(
+                        identifierCaseStrategy, tablePath.getTableName(), 
actualTableName)) {
+                    filteredRows++;
+                    continue;
+                }
+                if (tablePath.getSchemaName() != null) {
+                    String actualSchemaName = 
columnsResultSet.getString("TABLE_SCHEM");
+                    if (!JdbcIdentifierUtils.identifierEquals(
+                            identifierCaseStrategy, tablePath.getSchemaName(), 
actualSchemaName)) {
+                        filteredRows++;
+                        continue;
+                    }
+                }
+
                 String columnName = columnsResultSet.getString("COLUMN_NAME");
                 int jdbcType = columnsResultSet.getInt("DATA_TYPE");
                 String nativeType = columnsResultSet.getString("TYPE_NAME");
@@ -102,6 +126,15 @@ public class JdbcColumnConverter {
                 columns.add(column);
             }
         }
+        if (columns.isEmpty() && filteredRows > 0) {
+            LOG.warn(
+                    "No columns found for catalog '{}', schema '{}', table 
'{}'. Filtered {} rows returned by JDBC driver. "
+                            + "The table may not exist or the database 
requires exact identifier case.",
+                    tablePath.getDatabaseName(),
+                    tablePath.getSchemaName(),
+                    tablePath.getTableName(),
+                    filteredRows);
+        }
         return columns;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcIdentifierUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcIdentifierUtils.java
new file mode 100644
index 0000000000..a788f985a2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcIdentifierUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;
+
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Locale;
+
+public final class JdbcIdentifierUtils {
+
+    private JdbcIdentifierUtils() {}
+
+    public enum IdentifierCaseStrategy {
+        CASE_SENSITIVE,
+        LOWER_CASE,
+        UPPER_CASE,
+        CASE_INSENSITIVE
+    }
+
+    /**
+     * Resolve case handling strategy for unquoted identifiers based on {@link 
DatabaseMetaData}.
+     *
+     * <p>Note: JDBC metadata APIs often treat {@code schemaPattern}/{@code 
tableNamePattern} as
+     * patterns (e.g. SQL LIKE), while identifier case sensitivity depends on 
the database. This
+     * method provides a best-effort strategy to compare identifiers returned 
by JDBC metadata APIs.
+     */
+    public static IdentifierCaseStrategy 
identifierCaseStrategy(DatabaseMetaData metadata)
+            throws SQLException {
+        if (metadata == null) {
+            return IdentifierCaseStrategy.CASE_INSENSITIVE;
+        }
+        if (metadata.supportsMixedCaseIdentifiers()) {
+            return IdentifierCaseStrategy.CASE_SENSITIVE;
+        }
+        if (metadata.storesLowerCaseIdentifiers()) {
+            return IdentifierCaseStrategy.LOWER_CASE;
+        }
+        if (metadata.storesUpperCaseIdentifiers()) {
+            return IdentifierCaseStrategy.UPPER_CASE;
+        }
+        return IdentifierCaseStrategy.CASE_INSENSITIVE;
+    }
+
+    public static boolean identifierEquals(
+            IdentifierCaseStrategy caseStrategy, String expected, String 
actual) {
+        if (expected == null) {
+            return true;
+        }
+        if (actual == null) {
+            return false;
+        }
+        switch (caseStrategy) {
+            case CASE_SENSITIVE:
+                return actual.equals(expected);
+            case LOWER_CASE:
+                return 
actual.toLowerCase(Locale.ROOT).equals(expected.toLowerCase(Locale.ROOT));
+            case UPPER_CASE:
+                return 
actual.toUpperCase(Locale.ROOT).equals(expected.toUpperCase(Locale.ROOT));
+            case CASE_INSENSITIVE:
+            default:
+                return actual.equalsIgnoreCase(expected);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
index 45da6d7611..544a7bc19d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
@@ -21,6 +21,10 @@ import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.JdbcIdentifierUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.sql.DatabaseMetaData;
@@ -46,6 +50,7 @@ import static java.sql.Types.VARCHAR;
 
 /** Separate the jdbc meta-information type to SeaTunnelDataType into the 
interface. */
 public interface JdbcDialectTypeMapper extends Serializable {
+    Logger LOG = LoggerFactory.getLogger(JdbcDialectTypeMapper.class);
 
     /**
      * @deprecated instead by {@link #mappingColumn(BasicTypeDefine)}
@@ -89,9 +94,30 @@ public interface JdbcDialectTypeMapper extends Serializable {
             String columnNamePattern)
             throws SQLException {
         List<Column> columns = new ArrayList<>();
+        int filteredRows = 0;
+        JdbcIdentifierUtils.IdentifierCaseStrategy identifierCaseStrategy =
+                JdbcIdentifierUtils.identifierCaseStrategy(metadata);
         try (ResultSet rs =
                 metadata.getColumns(catalog, schemaPattern, tableNamePattern, 
columnNamePattern)) {
             while (rs.next()) {
+                // `tableNamePattern` is treated as a SQL LIKE pattern by many 
drivers, so filter
+                // the ResultSet by exact table/schema to avoid mixing columns 
from other tables.
+                if (tableNamePattern != null) {
+                    String actualTableName = rs.getString("TABLE_NAME");
+                    if (!JdbcIdentifierUtils.identifierEquals(
+                            identifierCaseStrategy, tableNamePattern, 
actualTableName)) {
+                        filteredRows++;
+                        continue;
+                    }
+                }
+                if (schemaPattern != null) {
+                    String actualSchemaName = rs.getString("TABLE_SCHEM");
+                    if (!JdbcIdentifierUtils.identifierEquals(
+                            identifierCaseStrategy, schemaPattern, 
actualSchemaName)) {
+                        filteredRows++;
+                        continue;
+                    }
+                }
                 String columnName = rs.getString("COLUMN_NAME");
                 String nativeType = rs.getString("TYPE_NAME");
                 int sqlType = rs.getInt("DATA_TYPE");
@@ -115,6 +141,15 @@ public interface JdbcDialectTypeMapper extends 
Serializable {
                 columns.add(mappingColumn(typeDefine));
             }
         }
+        if (columns.isEmpty() && filteredRows > 0) {
+            LOG.warn(
+                    "No columns found for catalog '{}', schema '{}', table 
'{}'. Filtered {} rows returned by JDBC driver. "
+                            + "The table may not exist or the database 
requires exact identifier case.",
+                    catalog,
+                    schemaPattern,
+                    tableNamePattern,
+                    filteredRows);
+        }
         return columns;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
index 2ae9df7951..eed75c213f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
@@ -35,7 +35,11 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import static org.mockito.Mockito.mock;
@@ -91,6 +95,305 @@ public class CatalogUtilsTest {
         Assertions.assertEquals("id comment", 
tableSchema2.getColumns().get(0).getComment());
     }
 
+    @Test
+    void testGetTableSchemaFiltersOutOtherMatchedTables() throws SQLException {
+        TestDatabaseMetaData metadata =
+                new TestDatabaseMetaData() {
+                    @Override
+                    public java.sql.ResultSet getColumns(
+                            String catalog,
+                            String schemaPattern,
+                            String tableNamePattern,
+                            String columnNamePattern)
+                            throws SQLException {
+                        List<Map<String, Object>> value = new ArrayList<>();
+                        value.add(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("TABLE_NAME", "user_info");
+                                        put("TABLE_SCHEM", "public");
+                                        put("COLUMN_NAME", "id");
+                                        put("DATA_TYPE", 1);
+                                        put("TYPE_NAME", "INT");
+                                        put("COLUMN_SIZE", 11);
+                                        put("DECIMAL_DIGITS", 0);
+                                        put("NULLABLE", 0);
+                                        put("REMARKS", "id comment");
+                                    }
+                                });
+                        value.add(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("TABLE_NAME", "userAinfo");
+                                        put("TABLE_SCHEM", "public");
+                                        put("COLUMN_NAME", "bad");
+                                        put("DATA_TYPE", 1);
+                                        put("TYPE_NAME", "INT");
+                                        put("COLUMN_SIZE", 11);
+                                        put("DECIMAL_DIGITS", 0);
+                                        put("NULLABLE", 0);
+                                        put("REMARKS", "should be filtered");
+                                    }
+                                });
+                        return new TestResultSet(value);
+                    }
+                };
+
+        TablePath tablePath = TablePath.of("test_db", "public", "user_info");
+
+        TableSchema tableSchema =
+                CatalogUtils.getTableSchema(
+                        metadata,
+                        tablePath,
+                        new JdbcDialectTypeMapper() {
+                            @Override
+                            public Column mappingColumn(BasicTypeDefine 
typeDefine) {
+                                return PhysicalColumn.of(
+                                        typeDefine.getName(),
+                                        BasicType.VOID_TYPE,
+                                        typeDefine.getLength(),
+                                        typeDefine.isNullable(),
+                                        typeDefine.getScale(),
+                                        typeDefine.getComment());
+                            }
+                        });
+
+        Assertions.assertEquals(1, tableSchema.getColumns().size());
+        Assertions.assertEquals("id", 
tableSchema.getColumns().get(0).getName());
+        Assertions.assertEquals("id comment", 
tableSchema.getColumns().get(0).getComment());
+
+        TableSchema fallbackTableSchema =
+                CatalogUtils.getTableSchema(metadata, tablePath, new 
JdbcDialectTypeMapper() {});
+        Assertions.assertEquals(1, fallbackTableSchema.getColumns().size());
+        Assertions.assertEquals("id", 
fallbackTableSchema.getColumns().get(0).getName());
+    }
+
+    @Test
+    void testGetTableSchemaFiltersOutPercentageWildcard() throws SQLException {
+        TestDatabaseMetaData metadata =
+                new TestDatabaseMetaData() {
+                    @Override
+                    public java.sql.ResultSet getColumns(
+                            String catalog,
+                            String schemaPattern,
+                            String tableNamePattern,
+                            String columnNamePattern)
+                            throws SQLException {
+                        List<Map<String, Object>> value = new ArrayList<>();
+                        value.add(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("TABLE_NAME", "user%info");
+                                        put("TABLE_SCHEM", "public");
+                                        put("COLUMN_NAME", "id");
+                                        put("DATA_TYPE", 1);
+                                        put("TYPE_NAME", "INT");
+                                        put("COLUMN_SIZE", 11);
+                                        put("DECIMAL_DIGITS", 0);
+                                        put("NULLABLE", 0);
+                                        put("REMARKS", "id comment");
+                                    }
+                                });
+                        value.add(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("TABLE_NAME", "userXYZinfo");
+                                        put("TABLE_SCHEM", "public");
+                                        put("COLUMN_NAME", "bad");
+                                        put("DATA_TYPE", 1);
+                                        put("TYPE_NAME", "INT");
+                                        put("COLUMN_SIZE", 11);
+                                        put("DECIMAL_DIGITS", 0);
+                                        put("NULLABLE", 0);
+                                        put("REMARKS", "should be filtered");
+                                    }
+                                });
+                        return new TestResultSet(value);
+                    }
+                };
+
+        TablePath tablePath = TablePath.of("test_db", "public", "user%info");
+        TableSchema tableSchema =
+                CatalogUtils.getTableSchema(metadata, tablePath, new 
JdbcDialectTypeMapper() {});
+        Assertions.assertEquals(1, tableSchema.getColumns().size());
+        Assertions.assertEquals("id", 
tableSchema.getColumns().get(0).getName());
+    }
+
+    @Test
+    void testGetTableSchemaFiltersOutSchemaWildcard() throws SQLException {
+        TestDatabaseMetaData metadata =
+                new TestDatabaseMetaData() {
+                    @Override
+                    public java.sql.ResultSet getColumns(
+                            String catalog,
+                            String schemaPattern,
+                            String tableNamePattern,
+                            String columnNamePattern)
+                            throws SQLException {
+                        List<Map<String, Object>> value = new ArrayList<>();
+                        value.add(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("TABLE_NAME", "user_info");
+                                        put("TABLE_SCHEM", "pub_lic");
+                                        put("COLUMN_NAME", "id");
+                                        put("DATA_TYPE", 1);
+                                        put("TYPE_NAME", "INT");
+                                        put("COLUMN_SIZE", 11);
+                                        put("DECIMAL_DIGITS", 0);
+                                        put("NULLABLE", 0);
+                                        put("REMARKS", "id comment");
+                                    }
+                                });
+                        value.add(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("TABLE_NAME", "user_info");
+                                        put("TABLE_SCHEM", "pubAlic");
+                                        put("COLUMN_NAME", "bad");
+                                        put("DATA_TYPE", 1);
+                                        put("TYPE_NAME", "INT");
+                                        put("COLUMN_SIZE", 11);
+                                        put("DECIMAL_DIGITS", 0);
+                                        put("NULLABLE", 0);
+                                        put("REMARKS", "should be filtered");
+                                    }
+                                });
+                        return new TestResultSet(value);
+                    }
+                };
+
+        TablePath tablePath = TablePath.of("test_db", "pub_lic", "user_info");
+        TableSchema tableSchema =
+                CatalogUtils.getTableSchema(metadata, tablePath, new 
JdbcDialectTypeMapper() {});
+        Assertions.assertEquals(1, tableSchema.getColumns().size());
+        Assertions.assertEquals("id", 
tableSchema.getColumns().get(0).getName());
+    }
+
+    @Test
+    void testGetTableSchemaEmptyWhenAllFiltered() throws SQLException {
+        TestDatabaseMetaData metadata =
+                new TestDatabaseMetaData() {
+                    @Override
+                    public java.sql.ResultSet getColumns(
+                            String catalog,
+                            String schemaPattern,
+                            String tableNamePattern,
+                            String columnNamePattern)
+                            throws SQLException {
+                        List<Map<String, Object>> value = new ArrayList<>();
+                        value.add(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("TABLE_NAME", "other_table");
+                                        put("TABLE_SCHEM", "public");
+                                        put("COLUMN_NAME", "bad");
+                                        put("DATA_TYPE", 1);
+                                        put("TYPE_NAME", "INT");
+                                        put("COLUMN_SIZE", 11);
+                                        put("DECIMAL_DIGITS", 0);
+                                        put("NULLABLE", 0);
+                                        put("REMARKS", "should be filtered");
+                                    }
+                                });
+                        return new TestResultSet(value);
+                    }
+                };
+
+        TablePath tablePath = TablePath.of("test_db", "public", "user_info");
+        TableSchema tableSchema =
+                CatalogUtils.getTableSchema(metadata, tablePath, new 
JdbcDialectTypeMapper() {});
+        Assertions.assertTrue(tableSchema.getColumns().isEmpty());
+    }
+
+    @Test
+    void testGetTableSchemaCaseSensitiveIdentifiersRequireExactMatch() throws 
SQLException {
+        TestDatabaseMetaData metadata =
+                new TestDatabaseMetaData() {
+                    @Override
+                    public boolean supportsMixedCaseIdentifiers() throws 
SQLException {
+                        return true;
+                    }
+
+                    @Override
+                    public java.sql.ResultSet getColumns(
+                            String catalog,
+                            String schemaPattern,
+                            String tableNamePattern,
+                            String columnNamePattern)
+                            throws SQLException {
+                        List<Map<String, Object>> value = new ArrayList<>();
+                        value.add(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("TABLE_NAME", "userinfo");
+                                        put("TABLE_SCHEM", "public");
+                                        put("COLUMN_NAME", "id");
+                                        put("DATA_TYPE", 1);
+                                        put("TYPE_NAME", "INT");
+                                        put("COLUMN_SIZE", 11);
+                                        put("DECIMAL_DIGITS", 0);
+                                        put("NULLABLE", 0);
+                                        put("REMARKS", "id comment");
+                                    }
+                                });
+                        return new TestResultSet(value);
+                    }
+                };
+
+        TablePath tablePath = TablePath.of("test_db", "public", "UserInfo");
+        TableSchema tableSchema =
+                CatalogUtils.getTableSchema(metadata, tablePath, new 
JdbcDialectTypeMapper() {});
+        Assertions.assertEquals(Collections.emptyList(), 
tableSchema.getColumns());
+    }
+
+    @Test
+    void testGetTableSchemaStoresUpperCaseIdentifiersCanMatchLowerCaseInput() 
throws SQLException {
+        TestDatabaseMetaData metadata =
+                new TestDatabaseMetaData() {
+                    @Override
+                    public boolean supportsMixedCaseIdentifiers() throws 
SQLException {
+                        return false;
+                    }
+
+                    @Override
+                    public boolean storesUpperCaseIdentifiers() throws 
SQLException {
+                        return true;
+                    }
+
+                    @Override
+                    public java.sql.ResultSet getColumns(
+                            String catalog,
+                            String schemaPattern,
+                            String tableNamePattern,
+                            String columnNamePattern)
+                            throws SQLException {
+                        List<Map<String, Object>> value = new ArrayList<>();
+                        value.add(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("TABLE_NAME", "USER_INFO");
+                                        put("TABLE_SCHEM", "PUBLIC");
+                                        put("COLUMN_NAME", "id");
+                                        put("DATA_TYPE", 1);
+                                        put("TYPE_NAME", "INT");
+                                        put("COLUMN_SIZE", 11);
+                                        put("DECIMAL_DIGITS", 0);
+                                        put("NULLABLE", 0);
+                                        put("REMARKS", "id comment");
+                                    }
+                                });
+                        return new TestResultSet(value);
+                    }
+                };
+
+        TablePath tablePath = TablePath.of("test_db", "public", "user_info");
+        TableSchema tableSchema =
+                CatalogUtils.getTableSchema(metadata, tablePath, new 
JdbcDialectTypeMapper() {});
+        Assertions.assertEquals(1, tableSchema.getColumns().size());
+        Assertions.assertEquals("id", 
tableSchema.getColumns().get(0).getName());
+    }
+
     @Test
     void testGetCatalogTableWithPrimaryKeyFromQuery() throws SQLException {
         Connection connection = mock(Connection.class);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java
index b7f6085138..eb073ae415 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java
@@ -664,6 +664,9 @@ public class TestDatabaseMetaData implements 
DatabaseMetaData {
         value.add(
                 new HashMap<String, Object>() {
                     {
+                        put("TABLE_CAT", catalog);
+                        put("TABLE_SCHEM", schemaPattern);
+                        put("TABLE_NAME", tableNamePattern);
                         put("COLUMN_NAME", "id");
                         put("DATA_TYPE", 1);
                         put("TYPE_NAME", "INT");


Reply via email to