This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5d4830937de JdbcIO - report schema as part of lineage (#33795)
5d4830937de is described below
commit 5d4830937de224248770d66aa9f8da0bc63168ed
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Wed Jan 29 16:21:01 2025 +0100
JdbcIO - report schema as part of lineage (#33795)
* fix missing schemas
* spotless
* fix tests
---
.../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 28 ++++----
.../java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java | 76 +++++++++++++++++-----
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 12 +++-
.../org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java | 25 ++++---
4 files changed, 97 insertions(+), 44 deletions(-)
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 39769495beb..a31745754d0 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -1611,7 +1611,7 @@ public class JdbcIO {
private @Nullable DataSource dataSource;
private @Nullable Connection connection;
- private @Nullable String reportedLineage;
+ private @Nullable KV<@Nullable String, String> reportedLineage;
private ReadFn(
SerializableFunction<Void, DataSource> dataSourceProviderFn,
@@ -1641,16 +1641,17 @@ public class JdbcIO {
this.connection = connection;
// report Lineage if not haven't done so
- String table = JdbcUtil.extractTableFromReadQuery(query.get());
- if (!table.equals(reportedLineage)) {
+ KV<@Nullable String, String> schemaWithTable =
+ JdbcUtil.extractTableFromReadQuery(query.get());
+ if (schemaWithTable != null &&
!schemaWithTable.equals(reportedLineage)) {
JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource);
if (fqn == null) {
fqn = JdbcUtil.FQNComponents.of(connection);
}
if (fqn != null) {
- fqn.reportLineage(Lineage.getSources(), table);
+ fqn.reportLineage(Lineage.getSources(), schemaWithTable);
}
- reportedLineage = table;
+ reportedLineage = schemaWithTable;
}
}
return connection;
@@ -2665,7 +2666,7 @@ public class JdbcIO {
private @Nullable DataSource dataSource;
private @Nullable Connection connection;
private @Nullable PreparedStatement preparedStatement;
- private @Nullable String reportedLineage;
+ private @Nullable KV<@Nullable String, String> reportedLineage;
private static @Nullable FluentBackoff retryBackOff;
public WriteFn(WriteFnSpec<T, V> spec) {
@@ -2705,20 +2706,21 @@ public class JdbcIO {
connection.prepareStatement(checkStateNotNull(spec.getStatement()).get());
this.connection = connection;
- // report Lineage if haven't done so
- String table = spec.getTable();
- if (Strings.isNullOrEmpty(table) && spec.getStatement() != null) {
- table =
JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get());
+ KV<@Nullable String, String> tableWithSchema;
+ if (Strings.isNullOrEmpty(spec.getTable()) && spec.getStatement() !=
null) {
+ tableWithSchema =
JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get());
+ } else {
+ tableWithSchema = JdbcUtil.extractTableFromTable(spec.getTable());
}
- if (!Objects.equals(table, reportedLineage)) {
+ if (!Objects.equals(tableWithSchema, reportedLineage)) {
JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource);
if (fqn == null) {
fqn = JdbcUtil.FQNComponents.of(connection);
}
if (fqn != null) {
- fqn.reportLineage(Lineage.getSinks(), table);
+ fqn.reportLineage(Lineage.getSinks(), tableWithSchema);
}
- reportedLineage = table;
+ reportedLineage = tableWithSchema;
}
}
return connection;
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
index 503b64e4a44..128f21a8109 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
@@ -678,14 +678,29 @@ public class JdbcUtil {
/** Jdbc fully qualified name components. */
@AutoValue
abstract static class FQNComponents {
+
+ static final String DEFAULT_SCHEMA = "default";
+
abstract String getScheme();
abstract Iterable<String> getSegments();
- void reportLineage(Lineage lineage, @Nullable String table) {
+ void reportLineage(Lineage lineage, @Nullable KV<@Nullable String, String>
tableWithSchema) {
ImmutableList.Builder<String> builder =
ImmutableList.<String>builder().addAll(getSegments());
- if (table != null && !table.isEmpty()) {
- builder.add(table);
+ if (tableWithSchema != null) {
+ if (tableWithSchema.getKey() != null &&
!tableWithSchema.getKey().isEmpty()) {
+ builder.add(tableWithSchema.getKey());
+ } else {
+ // Every database engine has the default schema or search path if
user hasn't provided
+ // one. The name
+ // is specific to db engine. For PostgreSQL it is public, for MSSQL
it is dbo.
+ // Users can have custom default scheme for the benefit of the user
but dataflow is unable
+ // to determine that.
+ builder.add(DEFAULT_SCHEMA);
+ }
+ if (!tableWithSchema.getValue().isEmpty()) {
+ builder.add(tableWithSchema.getValue());
+ }
}
lineage.add(getScheme(), builder.build());
}
@@ -792,41 +807,66 @@ public class JdbcUtil {
}
}
+ private static final Pattern TABLE_PATTERN =
+ Pattern.compile(
+
"(\\[?`?(?<schemaName>[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?`?(?<tableName>[^\\s\\[\\]`]+)\\]?`?",
+ Pattern.CASE_INSENSITIVE);
+
private static final Pattern READ_STATEMENT_PATTERN =
Pattern.compile(
- "SELECT\\s+.+?\\s+FROM\\s+\\[?(?<tableName>[^\\s\\[\\]]+)\\]?",
Pattern.CASE_INSENSITIVE);
+
"SELECT\\s+.+?\\s+FROM\\s+(\\[?`?(?<schemaName>[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?`?(?<tableName>[^\\s\\[\\]`]+)\\]?`?",
+ Pattern.CASE_INSENSITIVE);
private static final Pattern WRITE_STATEMENT_PATTERN =
Pattern.compile(
- "INSERT\\s+INTO\\s+\\[?(?<tableName>[^\\s\\[\\]]+)\\]?",
Pattern.CASE_INSENSITIVE);
+
"INSERT\\s+INTO\\s+(\\[?`?(?<schemaName>[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?(?<tableName>[^\\s\\[\\]]+)\\]?",
+ Pattern.CASE_INSENSITIVE);
- /** Extract table name a SELECT statement. Return empty string if fail to
extract. */
- static String extractTableFromReadQuery(@Nullable String query) {
+ /** Extract schema and table name a SELECT statement. Return null if fail to
extract. */
+ static @Nullable KV<@Nullable String, String>
extractTableFromReadQuery(@Nullable String query) {
if (query == null) {
- return "";
+ return null;
}
Matcher matchRead = READ_STATEMENT_PATTERN.matcher(query);
if (matchRead.find()) {
- String matched = matchRead.group("tableName");
- if (matched != null) {
- return matched;
+ String matchedTable = matchRead.group("tableName");
+ String matchedSchema = matchRead.group("schemaName");
+ System.out.println(matchedSchema);
+ if (matchedTable != null) {
+ return KV.of(matchedSchema, matchedTable);
+ }
+ }
+ return null;
+ }
+
+ static @Nullable KV<@Nullable String, String>
extractTableFromTable(@Nullable String table) {
+ if (table == null) {
+ return null;
+ }
+ Matcher matchRead = TABLE_PATTERN.matcher(table);
+ if (matchRead.find()) {
+ String matchedTable = matchRead.group("tableName");
+ String matchedSchema = matchRead.group("schemaName");
+ if (matchedTable != null) {
+ return KV.of(matchedSchema, matchedTable);
}
}
- return "";
+ return null;
}
/** Extract table name from an INSERT statement. Return empty string if fail
to extract. */
- static String extractTableFromWriteQuery(@Nullable String query) {
+ static @Nullable KV<@Nullable String, String>
extractTableFromWriteQuery(@Nullable String query) {
if (query == null) {
- return "";
+ return null;
}
Matcher matchRead = WRITE_STATEMENT_PATTERN.matcher(query);
if (matchRead.find()) {
- String matched = matchRead.group("tableName");
- if (matched != null) {
- return matched;
+ String matchedTable = matchRead.group("tableName");
+ String matchedSchema = matchRead.group("schemaName");
+ if (matchedTable != null) {
+ return KV.of(matchedSchema, matchedTable);
}
}
- return "";
+ return null;
}
}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 8725ef4b3f7..39e1a45d7a9 100644
---
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -248,7 +248,9 @@ public class JdbcIOTest implements Serializable {
PipelineResult result = pipeline.run();
assertThat(
Lineage.query(result.metrics(), Lineage.Type.SOURCE),
- hasItem(Lineage.getFqName("derby", ImmutableList.of("memory",
"testDB", READ_TABLE_NAME))));
+ hasItem(
+ Lineage.getFqName(
+ "derby", ImmutableList.of("memory", "testDB", "default",
READ_TABLE_NAME))));
}
@Test
@@ -271,7 +273,9 @@ public class JdbcIOTest implements Serializable {
PipelineResult result = pipeline.run();
assertThat(
Lineage.query(result.metrics(), Lineage.Type.SOURCE),
- hasItem(Lineage.getFqName("derby", ImmutableList.of("memory",
"testDB", READ_TABLE_NAME))));
+ hasItem(
+ Lineage.getFqName(
+ "derby", ImmutableList.of("memory", "testDB", "default",
READ_TABLE_NAME))));
}
@Test
@@ -543,7 +547,9 @@ public class JdbcIOTest implements Serializable {
assertRowCount(DATA_SOURCE, tableName, EXPECTED_ROW_COUNT);
assertThat(
Lineage.query(result.metrics(), Lineage.Type.SINK),
- hasItem(Lineage.getFqName("derby", ImmutableList.of("memory",
"testDB", tableName))));
+ hasItem(
+ Lineage.getFqName(
+ "derby", ImmutableList.of("memory", "testDB", "default",
tableName))));
} finally {
DatabaseTestHelper.deleteTable(DATA_SOURCE, tableName);
}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java
index 356d6c7f8de..118eaa4df7e 100644
---
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java
@@ -348,20 +348,25 @@ public class JdbcUtilTest {
@Test
public void testExtractTableFromQuery() {
- ImmutableList<KV<String, @Nullable String>> readCases =
+ ImmutableList<KV<String, @Nullable KV<String, String>>> readCases =
ImmutableList.of(
- KV.of("select * from table_1", "table_1"),
- KV.of("SELECT a, b FROM [table-2]", "table-2"),
- KV.of("drop table not-select", ""));
- for (KV<String, @Nullable String> testCase : readCases) {
+ KV.of("select * from table_1", KV.of(null, "table_1")),
+ KV.of("select * from public.table_1", KV.of("public", "table_1")),
+ KV.of("select * from `select`", KV.of(null, "select")),
+ KV.of("select * from `public`.`select`", KV.of("public",
"select")),
+ KV.of("SELECT a, b FROM [table-2]", KV.of(null, "table-2")),
+ KV.of("SELECT a, b FROM [public].[table-2]", KV.of("public",
"table-2")),
+ KV.of("drop table not-select", null));
+ for (KV<String, @Nullable KV<String, String>> testCase : readCases) {
assertEquals(testCase.getValue(),
JdbcUtil.extractTableFromReadQuery(testCase.getKey()));
}
- ImmutableList<KV<String, @Nullable String>> writeCases =
+ ImmutableList<KV<String, @Nullable KV<String, String>>> writeCases =
ImmutableList.of(
- KV.of("insert into table_1 values ...", "table_1"),
- KV.of("INSERT INTO [table-2] values ...", "table-2"),
- KV.of("drop table not-select", ""));
- for (KV<String, @Nullable String> testCase : writeCases) {
+ KV.of("insert into table_1 values ...", KV.of(null, "table_1")),
+ KV.of("INSERT INTO [table-2] values ...", KV.of(null, "table-2")),
+ KV.of("INSERT INTO [foo].[table-2] values ...", KV.of("foo",
"table-2")),
+ KV.of("drop table not-select", null));
+ for (KV<String, @Nullable KV<String, String>> testCase : writeCases) {
assertEquals(testCase.getValue(),
JdbcUtil.extractTableFromWriteQuery(testCase.getKey()));
}
}