This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new eeb4c0b704 [Fix][Connector-V2] Fix wrong column discovery when
tableNamePattern is treated as LIKE (#10422)
eeb4c0b704 is described below
commit eeb4c0b7040a0e309f2a1686caa5874884bd99d0
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Feb 4 20:21:12 2026 +0800
[Fix][Connector-V2] Fix wrong column discovery when tableNamePattern is
treated as LIKE (#10422)
---
docs/en/connectors/source/Jdbc.md | 9 +-
docs/en/connectors/source/SqlServer-CDC.md | 10 +-
docs/zh/connectors/source/Jdbc.md | 7 +
docs/zh/connectors/source/SqlServer-CDC.md | 9 +-
.../connector/sqlserver/SqlServerConnection.java | 31 +++
.../sqlserver/SqlServerConnectionTest.java | 140 ++++++++++
.../jdbc/catalog/utils/JdbcColumnConverter.java | 33 +++
.../jdbc/catalog/utils/JdbcIdentifierUtils.java | 79 ++++++
.../internal/dialect/JdbcDialectTypeMapper.java | 35 +++
.../jdbc/catalog/utils/CatalogUtilsTest.java | 303 +++++++++++++++++++++
.../jdbc/catalog/utils/TestDatabaseMetaData.java | 3 +
11 files changed, 656 insertions(+), 3 deletions(-)
diff --git a/docs/en/connectors/source/Jdbc.md
b/docs/en/connectors/source/Jdbc.md
index 0d573a8235..2f8df5dd0e 100644
--- a/docs/en/connectors/source/Jdbc.md
+++ b/docs/en/connectors/source/Jdbc.md
@@ -79,6 +79,13 @@ supports query SQL and can achieve projection effect.
The JDBC Source connector supports two ways to specify tables:
+#### Notes
+
+- Many JDBC drivers treat `DatabaseMetaData.getColumns(..., schemaPattern,
tableNamePattern, ...)` as SQL LIKE patterns.
+ If your schema/table names contain `_` or `%`, column discovery may return
rows from other tables. SeaTunnel filters the
+ returned metadata rows by exact schema/table identifier to avoid mixing
columns.
+- For case-sensitive databases, make sure the configured schema/table names
use the exact identifier case.
+
1. **Exact Table Path**: Use `table_path` to specify a single table with its
full path.
```hocon
table_path = "testdb.table1"
@@ -468,4 +475,4 @@ Jdbc {
## Changelog
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/en/connectors/source/SqlServer-CDC.md
b/docs/en/connectors/source/SqlServer-CDC.md
index 134f82fa47..eb91c0a0ea 100644
--- a/docs/en/connectors/source/SqlServer-CDC.md
+++ b/docs/en/connectors/source/SqlServer-CDC.md
@@ -27,6 +27,14 @@ import ChangeLog from
'../changelog/connector-cdc-sqlserver.md';
The Sql Server CDC connector allows for reading snapshot data and incremental
data from SqlServer database. This document
describes how to setup the Sql Server CDC connector to run SQL queries against
SqlServer databases.
+:::tip
+
+When discovering table columns via JDBC metadata, SeaTunnel filters metadata
rows by the exact schema/table identifier to
+avoid mixing columns from other tables (some drivers treat
`schemaPattern`/`tableNamePattern` as SQL LIKE patterns). For
+case-sensitive databases, make sure the configured identifier case matches the
database.
+
+:::
+
## Supported DataSource Info
| Datasource | Supported versions |
Driver | Url
| Maven
|
@@ -234,4 +242,4 @@ sink {
## Changelog
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connectors/source/Jdbc.md
b/docs/zh/connectors/source/Jdbc.md
index 5f778c5b8d..6dde74db26 100644
--- a/docs/zh/connectors/source/Jdbc.md
+++ b/docs/zh/connectors/source/Jdbc.md
@@ -73,6 +73,13 @@ import ChangeLog from '../changelog/connector-jdbc.md';
JDBC 源连接器支持两种方式指定表:
+#### 注意事项
+
+- 许多 JDBC 驱动会将 `DatabaseMetaData.getColumns(..., schemaPattern,
tableNamePattern, ...)` 视为 SQL LIKE 的模式匹配。
+ 当 schema/table 名称中包含 `_` 或 `%` 时,列发现可能会返回其他表的列。SeaTunnel 会按精确的 schema/table
标识符对返回结果做二次过滤,
+ 以避免混入其他表的列。
+- 对于大小写敏感的数据库,请确保配置的 schema/table 名称与数据库中实际标识符大小写一致。
+
1. **精确表路径**:使用 `table_path` 指定单个表及其完整路径。
```hocon
table_path = "testdb.table1"
diff --git a/docs/zh/connectors/source/SqlServer-CDC.md
b/docs/zh/connectors/source/SqlServer-CDC.md
index 7afbacc2f0..064c770008 100644
--- a/docs/zh/connectors/source/SqlServer-CDC.md
+++ b/docs/zh/connectors/source/SqlServer-CDC.md
@@ -26,6 +26,13 @@ import ChangeLog from
'../changelog/connector-cdc-sqlserver.md';
Sql Server CDC 连接器允许从 SqlServer 数据库读取快照数据和增量数据。本文档描述了如何设置 Sql Server CDC 连接器来对
SqlServer 数据库运行 SQL 查询。
+:::tip
+
+在通过 JDBC 元数据发现表列信息时,SeaTunnel 会按精确的 schema/table
标识符对返回结果做二次过滤,以避免混入其他表的列(部分驱动会将
+`schemaPattern`/`tableNamePattern` 视为 SQL LIKE
模式匹配)。对于大小写敏感的数据库,请确保配置的标识符大小写与数据库一致。
+
+:::
+
## 支持的数据源信息
| 数据源 | 支持版本 | 驱动
| Url
| Maven |
@@ -233,4 +240,4 @@ sink {
## 变更日志
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
index a3884a0280..3a2bb3c729 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
@@ -19,6 +19,8 @@ package io.debezium.connector.sqlserver;
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.JdbcIdentifierUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -696,8 +698,11 @@ public class SqlServerConnection extends JdbcConnection {
public Table getTableSchemaFromTable(String databaseName,
SqlServerChangeTable changeTable)
throws SQLException {
final DatabaseMetaData metadata = connection().getMetaData();
+ JdbcIdentifierUtils.IdentifierCaseStrategy identifierCaseStrategy =
+ JdbcIdentifierUtils.identifierCaseStrategy(metadata);
List<Column> columns = new ArrayList<>();
+ int filteredRows = 0;
try (ResultSet rs =
metadata.getColumns(
databaseName,
@@ -705,6 +710,24 @@ public class SqlServerConnection extends JdbcConnection {
changeTable.getSourceTableId().table(),
null)) {
while (rs.next()) {
+ // `tableNamePattern` is treated as a SQL LIKE pattern by many
drivers, so filter
+ // the ResultSet by exact table/schema to avoid mixing columns
from other tables.
+ String actualTableName = rs.getString("TABLE_NAME");
+ if (!JdbcIdentifierUtils.identifierEquals(
+ identifierCaseStrategy,
+ changeTable.getSourceTableId().table(),
+ actualTableName)) {
+ filteredRows++;
+ continue;
+ }
+ String actualSchemaName = rs.getString("TABLE_SCHEM");
+ if (!JdbcIdentifierUtils.identifierEquals(
+ identifierCaseStrategy,
+ changeTable.getSourceTableId().schema(),
+ actualSchemaName)) {
+ filteredRows++;
+ continue;
+ }
readTableColumn(rs, changeTable.getSourceTableId(), null)
.ifPresent(
ce -> {
@@ -715,6 +738,14 @@ public class SqlServerConnection extends JdbcConnection {
});
}
}
+ if (columns.isEmpty() && filteredRows > 0) {
+ LOGGER.warn(
+ "No columns found for table '{}' in database '{}'.
Filtered {} rows returned by JDBC driver. "
+ + "The table may not exist or the database
requires exact identifier case.",
+ changeTable.getSourceTableId(),
+ databaseName,
+ filteredRows);
+ }
final List<String> pkColumnNames =
readPrimaryKeyOrUniqueIndexNames(metadata,
changeTable.getSourceTableId()).stream()
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionTest.java
new file mode 100644
index 0000000000..f74f545fd4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.sqlserver;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.config.Configuration;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.relational.Column;
+import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SqlServerConnectionTest {
+
+ @Test
+ void testGetTableSchemaFromTableFiltersOutWildcardTables() throws
Exception {
+ String databaseName = "test_db";
+ TableId tableId = TableId.parse(databaseName + ".dbo.user_info");
+
+ SqlServerChangeTable changeTable = mock(SqlServerChangeTable.class);
+ when(changeTable.getSourceTableId()).thenReturn(tableId);
+
when(changeTable.getCapturedColumns()).thenReturn(Collections.singletonList("id"));
+
+ ResultSet columnsRs = mock(ResultSet.class);
+ when(columnsRs.next()).thenReturn(true, true, false);
+ when(columnsRs.getString("TABLE_NAME")).thenReturn("user_info",
"userAinfo");
+ when(columnsRs.getString("TABLE_SCHEM")).thenReturn("dbo", "dbo");
+ when(columnsRs.getString("COLUMN_NAME")).thenReturn("id", "bad");
+
+ DatabaseMetaData metadata = mock(DatabaseMetaData.class);
+ when(metadata.getColumns(eq(databaseName), eq("dbo"), eq("user_info"),
isNull()))
+ .thenReturn(columnsRs);
+
+ Connection jdbcConnection = mock(Connection.class);
+ when(jdbcConnection.getMetaData()).thenReturn(metadata);
+
+ TestSqlServerConnection connection = new
TestSqlServerConnection(jdbcConnection);
+ Table table = connection.getTableSchemaFromTable(databaseName,
changeTable);
+
+ Assertions.assertEquals(1, table.columns().size());
+ Assertions.assertEquals("id", table.columns().get(0).name());
+ }
+
+ @Test
+ void testGetTableSchemaFromTableCaseSensitiveRequiresExactMatch() throws
Exception {
+ String databaseName = "test_db";
+ TableId tableId = TableId.parse(databaseName + ".dbo.UserInfo");
+
+ SqlServerChangeTable changeTable = mock(SqlServerChangeTable.class);
+ when(changeTable.getSourceTableId()).thenReturn(tableId);
+
when(changeTable.getCapturedColumns()).thenReturn(Collections.singletonList("id"));
+
+ ResultSet columnsRs = mock(ResultSet.class);
+ when(columnsRs.next()).thenReturn(true, false);
+ when(columnsRs.getString("TABLE_NAME")).thenReturn("userinfo");
+ when(columnsRs.getString("TABLE_SCHEM")).thenReturn("dbo");
+ when(columnsRs.getString("COLUMN_NAME")).thenReturn("id");
+
+ DatabaseMetaData metadata = mock(DatabaseMetaData.class);
+ when(metadata.supportsMixedCaseIdentifiers()).thenReturn(true);
+ when(metadata.getColumns(eq(databaseName), eq("dbo"), eq("UserInfo"),
isNull()))
+ .thenReturn(columnsRs);
+
+ Connection jdbcConnection = mock(Connection.class);
+ when(jdbcConnection.getMetaData()).thenReturn(metadata);
+
+ TestSqlServerConnection connection = new
TestSqlServerConnection(jdbcConnection);
+ Table table = connection.getTableSchemaFromTable(databaseName,
changeTable);
+
+ Assertions.assertTrue(table.columns().isEmpty());
+ }
+
+ private static final class TestSqlServerConnection extends
SqlServerConnection {
+ private final Connection jdbcConnection;
+
+ private TestSqlServerConnection(Connection jdbcConnection) {
+ super(
+ JdbcConfiguration.adapt(Configuration.create().build()),
+ SourceTimestampMode.COMMIT,
+ mock(SqlServerValueConverters.class),
+ SqlServerConnectionTest.class::getClassLoader,
+ Collections.emptySet(),
+ false);
+ this.jdbcConnection = jdbcConnection;
+ }
+
+ @Override
+ public synchronized Connection connection(boolean executeOnConnect)
throws SQLException {
+ return jdbcConnection;
+ }
+
+ @Override
+ protected Optional<ColumnEditor> readTableColumn(
+ ResultSet columnMetadata, TableId tableId,
Tables.ColumnNameFilter columnFilter)
+ throws SQLException {
+ String columnName = columnMetadata.getString("COLUMN_NAME");
+ ColumnEditor editor =
+
Column.editor().name(columnName).type("INT").jdbcType(Types.INTEGER);
+ return Optional.of(editor);
+ }
+
+ @Override
+ protected List<String> readPrimaryKeyOrUniqueIndexNames(
+ DatabaseMetaData metadata, TableId tableId) throws
SQLException {
+ return Collections.emptyList();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
index 1184e5e59e..50d0a1d53f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
@@ -26,6 +26,9 @@ import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -69,10 +72,14 @@ import static java.sql.Types.VARCHAR;
*/
@Deprecated
public class JdbcColumnConverter {
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcColumnConverter.class);
public static List<Column> convert(DatabaseMetaData metadata, TablePath
tablePath)
throws SQLException {
List<Column> columns = new ArrayList<>();
+ int filteredRows = 0;
+ JdbcIdentifierUtils.IdentifierCaseStrategy identifierCaseStrategy =
+ JdbcIdentifierUtils.identifierCaseStrategy(metadata);
try (ResultSet columnsResultSet =
metadata.getColumns(
@@ -82,6 +89,23 @@ public class JdbcColumnConverter {
null)) {
while (columnsResultSet.next()) {
+ // `tableNamePattern` is treated as a SQL LIKE pattern by many
drivers, so filter
+ // the ResultSet by exact table/schema to avoid mixing columns
from other tables.
+ String actualTableName =
columnsResultSet.getString("TABLE_NAME");
+ if (!JdbcIdentifierUtils.identifierEquals(
+ identifierCaseStrategy, tablePath.getTableName(),
actualTableName)) {
+ filteredRows++;
+ continue;
+ }
+ if (tablePath.getSchemaName() != null) {
+ String actualSchemaName =
columnsResultSet.getString("TABLE_SCHEM");
+ if (!JdbcIdentifierUtils.identifierEquals(
+ identifierCaseStrategy, tablePath.getSchemaName(),
actualSchemaName)) {
+ filteredRows++;
+ continue;
+ }
+ }
+
String columnName = columnsResultSet.getString("COLUMN_NAME");
int jdbcType = columnsResultSet.getInt("DATA_TYPE");
String nativeType = columnsResultSet.getString("TYPE_NAME");
@@ -102,6 +126,15 @@ public class JdbcColumnConverter {
columns.add(column);
}
}
+ if (columns.isEmpty() && filteredRows > 0) {
+ LOG.warn(
+ "No columns found for catalog '{}', schema '{}', table
'{}'. Filtered {} rows returned by JDBC driver. "
+ + "The table may not exist or the database
requires exact identifier case.",
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName(),
+ filteredRows);
+ }
return columns;
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcIdentifierUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcIdentifierUtils.java
new file mode 100644
index 0000000000..a788f985a2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcIdentifierUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;
+
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Locale;
+
+public final class JdbcIdentifierUtils {
+
+ private JdbcIdentifierUtils() {}
+
+ public enum IdentifierCaseStrategy {
+ CASE_SENSITIVE,
+ LOWER_CASE,
+ UPPER_CASE,
+ CASE_INSENSITIVE
+ }
+
+ /**
+ * Resolve case handling strategy for unquoted identifiers based on {@link
DatabaseMetaData}.
+ *
+ * <p>Note: JDBC metadata APIs often treat {@code schemaPattern}/{@code
tableNamePattern} as
+ * patterns (e.g. SQL LIKE), while identifier case sensitivity depends on
the database. This
+ * method provides a best-effort strategy to compare identifiers returned
by JDBC metadata APIs.
+ */
+ public static IdentifierCaseStrategy
identifierCaseStrategy(DatabaseMetaData metadata)
+ throws SQLException {
+ if (metadata == null) {
+ return IdentifierCaseStrategy.CASE_INSENSITIVE;
+ }
+ if (metadata.supportsMixedCaseIdentifiers()) {
+ return IdentifierCaseStrategy.CASE_SENSITIVE;
+ }
+ if (metadata.storesLowerCaseIdentifiers()) {
+ return IdentifierCaseStrategy.LOWER_CASE;
+ }
+ if (metadata.storesUpperCaseIdentifiers()) {
+ return IdentifierCaseStrategy.UPPER_CASE;
+ }
+ return IdentifierCaseStrategy.CASE_INSENSITIVE;
+ }
+
+ public static boolean identifierEquals(
+ IdentifierCaseStrategy caseStrategy, String expected, String
actual) {
+ if (expected == null) {
+ return true;
+ }
+ if (actual == null) {
+ return false;
+ }
+ switch (caseStrategy) {
+ case CASE_SENSITIVE:
+ return actual.equals(expected);
+ case LOWER_CASE:
+ return
actual.toLowerCase(Locale.ROOT).equals(expected.toLowerCase(Locale.ROOT));
+ case UPPER_CASE:
+ return
actual.toUpperCase(Locale.ROOT).equals(expected.toUpperCase(Locale.ROOT));
+ case CASE_INSENSITIVE:
+ default:
+ return actual.equalsIgnoreCase(expected);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
index 45da6d7611..544a7bc19d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
@@ -21,6 +21,10 @@ import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.JdbcIdentifierUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.sql.DatabaseMetaData;
@@ -46,6 +50,7 @@ import static java.sql.Types.VARCHAR;
/** Separate the jdbc meta-information type to SeaTunnelDataType into the
interface. */
public interface JdbcDialectTypeMapper extends Serializable {
+ Logger LOG = LoggerFactory.getLogger(JdbcDialectTypeMapper.class);
/**
* @deprecated instead by {@link #mappingColumn(BasicTypeDefine)}
@@ -89,9 +94,30 @@ public interface JdbcDialectTypeMapper extends Serializable {
String columnNamePattern)
throws SQLException {
List<Column> columns = new ArrayList<>();
+ int filteredRows = 0;
+ JdbcIdentifierUtils.IdentifierCaseStrategy identifierCaseStrategy =
+ JdbcIdentifierUtils.identifierCaseStrategy(metadata);
try (ResultSet rs =
metadata.getColumns(catalog, schemaPattern, tableNamePattern,
columnNamePattern)) {
while (rs.next()) {
+ // `tableNamePattern` is treated as a SQL LIKE pattern by many
drivers, so filter
+ // the ResultSet by exact table/schema to avoid mixing columns
from other tables.
+ if (tableNamePattern != null) {
+ String actualTableName = rs.getString("TABLE_NAME");
+ if (!JdbcIdentifierUtils.identifierEquals(
+ identifierCaseStrategy, tableNamePattern,
actualTableName)) {
+ filteredRows++;
+ continue;
+ }
+ }
+ if (schemaPattern != null) {
+ String actualSchemaName = rs.getString("TABLE_SCHEM");
+ if (!JdbcIdentifierUtils.identifierEquals(
+ identifierCaseStrategy, schemaPattern,
actualSchemaName)) {
+ filteredRows++;
+ continue;
+ }
+ }
String columnName = rs.getString("COLUMN_NAME");
String nativeType = rs.getString("TYPE_NAME");
int sqlType = rs.getInt("DATA_TYPE");
@@ -115,6 +141,15 @@ public interface JdbcDialectTypeMapper extends
Serializable {
columns.add(mappingColumn(typeDefine));
}
}
+ if (columns.isEmpty() && filteredRows > 0) {
+ LOG.warn(
+ "No columns found for catalog '{}', schema '{}', table
'{}'. Filtered {} rows returned by JDBC driver. "
+ + "The table may not exist or the database
requires exact identifier case.",
+ catalog,
+ schemaPattern,
+ tableNamePattern,
+ filteredRows);
+ }
return columns;
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
index 2ae9df7951..eed75c213f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
@@ -35,7 +35,11 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static org.mockito.Mockito.mock;
@@ -91,6 +95,305 @@ public class CatalogUtilsTest {
Assertions.assertEquals("id comment",
tableSchema2.getColumns().get(0).getComment());
}
+ @Test
+ void testGetTableSchemaFiltersOutOtherMatchedTables() throws SQLException {
+ TestDatabaseMetaData metadata =
+ new TestDatabaseMetaData() {
+ @Override
+ public java.sql.ResultSet getColumns(
+ String catalog,
+ String schemaPattern,
+ String tableNamePattern,
+ String columnNamePattern)
+ throws SQLException {
+ List<Map<String, Object>> value = new ArrayList<>();
+ value.add(
+ new HashMap<String, Object>() {
+ {
+ put("TABLE_NAME", "user_info");
+ put("TABLE_SCHEM", "public");
+ put("COLUMN_NAME", "id");
+ put("DATA_TYPE", 1);
+ put("TYPE_NAME", "INT");
+ put("COLUMN_SIZE", 11);
+ put("DECIMAL_DIGITS", 0);
+ put("NULLABLE", 0);
+ put("REMARKS", "id comment");
+ }
+ });
+ value.add(
+ new HashMap<String, Object>() {
+ {
+ put("TABLE_NAME", "userAinfo");
+ put("TABLE_SCHEM", "public");
+ put("COLUMN_NAME", "bad");
+ put("DATA_TYPE", 1);
+ put("TYPE_NAME", "INT");
+ put("COLUMN_SIZE", 11);
+ put("DECIMAL_DIGITS", 0);
+ put("NULLABLE", 0);
+ put("REMARKS", "should be filtered");
+ }
+ });
+ return new TestResultSet(value);
+ }
+ };
+
+ TablePath tablePath = TablePath.of("test_db", "public", "user_info");
+
+ TableSchema tableSchema =
+ CatalogUtils.getTableSchema(
+ metadata,
+ tablePath,
+ new JdbcDialectTypeMapper() {
+ @Override
+ public Column mappingColumn(BasicTypeDefine
typeDefine) {
+ return PhysicalColumn.of(
+ typeDefine.getName(),
+ BasicType.VOID_TYPE,
+ typeDefine.getLength(),
+ typeDefine.isNullable(),
+ typeDefine.getScale(),
+ typeDefine.getComment());
+ }
+ });
+
+ Assertions.assertEquals(1, tableSchema.getColumns().size());
+ Assertions.assertEquals("id",
tableSchema.getColumns().get(0).getName());
+ Assertions.assertEquals("id comment",
tableSchema.getColumns().get(0).getComment());
+
+ TableSchema fallbackTableSchema =
+ CatalogUtils.getTableSchema(metadata, tablePath, new
JdbcDialectTypeMapper() {});
+ Assertions.assertEquals(1, fallbackTableSchema.getColumns().size());
+ Assertions.assertEquals("id",
fallbackTableSchema.getColumns().get(0).getName());
+ }
+
+ @Test
+ void testGetTableSchemaFiltersOutPercentageWildcard() throws SQLException {
+ TestDatabaseMetaData metadata =
+ new TestDatabaseMetaData() {
+ @Override
+ public java.sql.ResultSet getColumns(
+ String catalog,
+ String schemaPattern,
+ String tableNamePattern,
+ String columnNamePattern)
+ throws SQLException {
+ List<Map<String, Object>> value = new ArrayList<>();
+ value.add(
+ new HashMap<String, Object>() {
+ {
+ put("TABLE_NAME", "user%info");
+ put("TABLE_SCHEM", "public");
+ put("COLUMN_NAME", "id");
+ put("DATA_TYPE", 1);
+ put("TYPE_NAME", "INT");
+ put("COLUMN_SIZE", 11);
+ put("DECIMAL_DIGITS", 0);
+ put("NULLABLE", 0);
+ put("REMARKS", "id comment");
+ }
+ });
+ value.add(
+ new HashMap<String, Object>() {
+ {
+ put("TABLE_NAME", "userXYZinfo");
+ put("TABLE_SCHEM", "public");
+ put("COLUMN_NAME", "bad");
+ put("DATA_TYPE", 1);
+ put("TYPE_NAME", "INT");
+ put("COLUMN_SIZE", 11);
+ put("DECIMAL_DIGITS", 0);
+ put("NULLABLE", 0);
+ put("REMARKS", "should be filtered");
+ }
+ });
+ return new TestResultSet(value);
+ }
+ };
+
+ TablePath tablePath = TablePath.of("test_db", "public", "user%info");
+ TableSchema tableSchema =
+ CatalogUtils.getTableSchema(metadata, tablePath, new
JdbcDialectTypeMapper() {});
+ Assertions.assertEquals(1, tableSchema.getColumns().size());
+ Assertions.assertEquals("id",
tableSchema.getColumns().get(0).getName());
+ }
+
+ @Test
+ void testGetTableSchemaFiltersOutSchemaWildcard() throws SQLException {
+ TestDatabaseMetaData metadata =
+ new TestDatabaseMetaData() {
+ @Override
+ public java.sql.ResultSet getColumns(
+ String catalog,
+ String schemaPattern,
+ String tableNamePattern,
+ String columnNamePattern)
+ throws SQLException {
+ List<Map<String, Object>> value = new ArrayList<>();
+ value.add(
+ new HashMap<String, Object>() {
+ {
+ put("TABLE_NAME", "user_info");
+ put("TABLE_SCHEM", "pub_lic");
+ put("COLUMN_NAME", "id");
+ put("DATA_TYPE", 1);
+ put("TYPE_NAME", "INT");
+ put("COLUMN_SIZE", 11);
+ put("DECIMAL_DIGITS", 0);
+ put("NULLABLE", 0);
+ put("REMARKS", "id comment");
+ }
+ });
+ value.add(
+ new HashMap<String, Object>() {
+ {
+ put("TABLE_NAME", "user_info");
+ put("TABLE_SCHEM", "pubAlic");
+ put("COLUMN_NAME", "bad");
+ put("DATA_TYPE", 1);
+ put("TYPE_NAME", "INT");
+ put("COLUMN_SIZE", 11);
+ put("DECIMAL_DIGITS", 0);
+ put("NULLABLE", 0);
+ put("REMARKS", "should be filtered");
+ }
+ });
+ return new TestResultSet(value);
+ }
+ };
+
+ TablePath tablePath = TablePath.of("test_db", "pub_lic", "user_info");
+ TableSchema tableSchema =
+ CatalogUtils.getTableSchema(metadata, tablePath, new
JdbcDialectTypeMapper() {});
+ Assertions.assertEquals(1, tableSchema.getColumns().size());
+ Assertions.assertEquals("id",
tableSchema.getColumns().get(0).getName());
+ }
+
+ @Test
+ void testGetTableSchemaEmptyWhenAllFiltered() throws SQLException {
+ TestDatabaseMetaData metadata =
+ new TestDatabaseMetaData() {
+ @Override
+ public java.sql.ResultSet getColumns(
+ String catalog,
+ String schemaPattern,
+ String tableNamePattern,
+ String columnNamePattern)
+ throws SQLException {
+ List<Map<String, Object>> value = new ArrayList<>();
+ value.add(
+ new HashMap<String, Object>() {
+ {
+ put("TABLE_NAME", "other_table");
+ put("TABLE_SCHEM", "public");
+ put("COLUMN_NAME", "bad");
+ put("DATA_TYPE", 1);
+ put("TYPE_NAME", "INT");
+ put("COLUMN_SIZE", 11);
+ put("DECIMAL_DIGITS", 0);
+ put("NULLABLE", 0);
+ put("REMARKS", "should be filtered");
+ }
+ });
+ return new TestResultSet(value);
+ }
+ };
+
+ TablePath tablePath = TablePath.of("test_db", "public", "user_info");
+ TableSchema tableSchema =
+ CatalogUtils.getTableSchema(metadata, tablePath, new
JdbcDialectTypeMapper() {});
+ Assertions.assertTrue(tableSchema.getColumns().isEmpty());
+ }
+
+ @Test
+ void testGetTableSchemaCaseSensitiveIdentifiersRequireExactMatch() throws
SQLException {
+ TestDatabaseMetaData metadata =
+ new TestDatabaseMetaData() {
+ @Override
+ public boolean supportsMixedCaseIdentifiers() throws
SQLException {
+ return true;
+ }
+
+ @Override
+ public java.sql.ResultSet getColumns(
+ String catalog,
+ String schemaPattern,
+ String tableNamePattern,
+ String columnNamePattern)
+ throws SQLException {
+ List<Map<String, Object>> value = new ArrayList<>();
+ value.add(
+ new HashMap<String, Object>() {
+ {
+ put("TABLE_NAME", "userinfo");
+ put("TABLE_SCHEM", "public");
+ put("COLUMN_NAME", "id");
+ put("DATA_TYPE", 1);
+ put("TYPE_NAME", "INT");
+ put("COLUMN_SIZE", 11);
+ put("DECIMAL_DIGITS", 0);
+ put("NULLABLE", 0);
+ put("REMARKS", "id comment");
+ }
+ });
+ return new TestResultSet(value);
+ }
+ };
+
+ TablePath tablePath = TablePath.of("test_db", "public", "UserInfo");
+ TableSchema tableSchema =
+ CatalogUtils.getTableSchema(metadata, tablePath, new
JdbcDialectTypeMapper() {});
+ Assertions.assertEquals(Collections.emptyList(),
tableSchema.getColumns());
+ }
+
+ @Test
+ void testGetTableSchemaStoresUpperCaseIdentifiersCanMatchLowerCaseInput()
throws SQLException {
+ TestDatabaseMetaData metadata =
+ new TestDatabaseMetaData() {
+ @Override
+ public boolean supportsMixedCaseIdentifiers() throws
SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean storesUpperCaseIdentifiers() throws
SQLException {
+ return true;
+ }
+
+ @Override
+ public java.sql.ResultSet getColumns(
+ String catalog,
+ String schemaPattern,
+ String tableNamePattern,
+ String columnNamePattern)
+ throws SQLException {
+ List<Map<String, Object>> value = new ArrayList<>();
+ value.add(
+ new HashMap<String, Object>() {
+ {
+ put("TABLE_NAME", "USER_INFO");
+ put("TABLE_SCHEM", "PUBLIC");
+ put("COLUMN_NAME", "id");
+ put("DATA_TYPE", 1);
+ put("TYPE_NAME", "INT");
+ put("COLUMN_SIZE", 11);
+ put("DECIMAL_DIGITS", 0);
+ put("NULLABLE", 0);
+ put("REMARKS", "id comment");
+ }
+ });
+ return new TestResultSet(value);
+ }
+ };
+
+ TablePath tablePath = TablePath.of("test_db", "public", "user_info");
+ TableSchema tableSchema =
+ CatalogUtils.getTableSchema(metadata, tablePath, new
JdbcDialectTypeMapper() {});
+ Assertions.assertEquals(1, tableSchema.getColumns().size());
+ Assertions.assertEquals("id",
tableSchema.getColumns().get(0).getName());
+ }
+
@Test
void testGetCatalogTableWithPrimaryKeyFromQuery() throws SQLException {
Connection connection = mock(Connection.class);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java
index b7f6085138..eb073ae415 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java
@@ -664,6 +664,9 @@ public class TestDatabaseMetaData implements
DatabaseMetaData {
value.add(
new HashMap<String, Object>() {
{
+ put("TABLE_CAT", catalog);
+ put("TABLE_SCHEM", schemaPattern);
+ put("TABLE_NAME", tableNamePattern);
put("COLUMN_NAME", "id");
put("DATA_TYPE", 1);
put("TYPE_NAME", "INT");