This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d4830937de JdbcIO - report schema as part of lineage (#33795)
5d4830937de is described below

commit 5d4830937de224248770d66aa9f8da0bc63168ed
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Wed Jan 29 16:21:01 2025 +0100

    JdbcIO - report schema as part of lineage (#33795)
    
    * fix missing schemas
    
    * spotless
    
    * fix tests
---
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   | 28 ++++----
 .../java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java | 76 +++++++++++++++++-----
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java    | 12 +++-
 .../org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java  | 25 ++++---
 4 files changed, 97 insertions(+), 44 deletions(-)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 39769495beb..a31745754d0 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -1611,7 +1611,7 @@ public class JdbcIO {
 
     private @Nullable DataSource dataSource;
     private @Nullable Connection connection;
-    private @Nullable String reportedLineage;
+    private @Nullable KV<@Nullable String, String> reportedLineage;
 
     private ReadFn(
         SerializableFunction<Void, DataSource> dataSourceProviderFn,
@@ -1641,16 +1641,17 @@ public class JdbcIO {
         this.connection = connection;
 
         // report Lineage if not haven't done so
-        String table = JdbcUtil.extractTableFromReadQuery(query.get());
-        if (!table.equals(reportedLineage)) {
+        KV<@Nullable String, String> schemaWithTable =
+            JdbcUtil.extractTableFromReadQuery(query.get());
+        if (schemaWithTable != null && 
!schemaWithTable.equals(reportedLineage)) {
           JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource);
           if (fqn == null) {
             fqn = JdbcUtil.FQNComponents.of(connection);
           }
           if (fqn != null) {
-            fqn.reportLineage(Lineage.getSources(), table);
+            fqn.reportLineage(Lineage.getSources(), schemaWithTable);
           }
-          reportedLineage = table;
+          reportedLineage = schemaWithTable;
         }
       }
       return connection;
@@ -2665,7 +2666,7 @@ public class JdbcIO {
     private @Nullable DataSource dataSource;
     private @Nullable Connection connection;
     private @Nullable PreparedStatement preparedStatement;
-    private @Nullable String reportedLineage;
+    private @Nullable KV<@Nullable String, String> reportedLineage;
     private static @Nullable FluentBackoff retryBackOff;
 
     public WriteFn(WriteFnSpec<T, V> spec) {
@@ -2705,20 +2706,21 @@ public class JdbcIO {
             
connection.prepareStatement(checkStateNotNull(spec.getStatement()).get());
         this.connection = connection;
 
-        // report Lineage if haven't done so
-        String table = spec.getTable();
-        if (Strings.isNullOrEmpty(table) && spec.getStatement() != null) {
-          table = 
JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get());
+        KV<@Nullable String, String> tableWithSchema;
+        if (Strings.isNullOrEmpty(spec.getTable()) && spec.getStatement() != 
null) {
+          tableWithSchema = 
JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get());
+        } else {
+          tableWithSchema = JdbcUtil.extractTableFromTable(spec.getTable());
         }
-        if (!Objects.equals(table, reportedLineage)) {
+        if (!Objects.equals(tableWithSchema, reportedLineage)) {
           JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource);
           if (fqn == null) {
             fqn = JdbcUtil.FQNComponents.of(connection);
           }
           if (fqn != null) {
-            fqn.reportLineage(Lineage.getSinks(), table);
+            fqn.reportLineage(Lineage.getSinks(), tableWithSchema);
           }
-          reportedLineage = table;
+          reportedLineage = tableWithSchema;
         }
       }
       return connection;
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
index 503b64e4a44..128f21a8109 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
@@ -678,14 +678,29 @@ public class JdbcUtil {
   /** Jdbc fully qualified name components. */
   @AutoValue
   abstract static class FQNComponents {
+
+    static final String DEFAULT_SCHEMA = "default";
+
     abstract String getScheme();
 
     abstract Iterable<String> getSegments();
 
-    void reportLineage(Lineage lineage, @Nullable String table) {
+    void reportLineage(Lineage lineage, @Nullable KV<@Nullable String, String> 
tableWithSchema) {
       ImmutableList.Builder<String> builder = 
ImmutableList.<String>builder().addAll(getSegments());
-      if (table != null && !table.isEmpty()) {
-        builder.add(table);
+      if (tableWithSchema != null) {
+        if (tableWithSchema.getKey() != null && 
!tableWithSchema.getKey().isEmpty()) {
+          builder.add(tableWithSchema.getKey());
+        } else {
+          // Every database engine has the default schema or search path if 
user hasn't provided
+          // one. The name
+          // is specific to db engine. For PostgreSQL it is public, for MSSQL 
it is dbo.
+          // Users can have custom default scheme for the benefit of the user 
but dataflow is unable
+          // to determine that.
+          builder.add(DEFAULT_SCHEMA);
+        }
+        if (!tableWithSchema.getValue().isEmpty()) {
+          builder.add(tableWithSchema.getValue());
+        }
       }
       lineage.add(getScheme(), builder.build());
     }
@@ -792,41 +807,66 @@ public class JdbcUtil {
     }
   }
 
+  private static final Pattern TABLE_PATTERN =
+      Pattern.compile(
+          
"(\\[?`?(?<schemaName>[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?`?(?<tableName>[^\\s\\[\\]`]+)\\]?`?",
+          Pattern.CASE_INSENSITIVE);
+
   private static final Pattern READ_STATEMENT_PATTERN =
       Pattern.compile(
-          "SELECT\\s+.+?\\s+FROM\\s+\\[?(?<tableName>[^\\s\\[\\]]+)\\]?", 
Pattern.CASE_INSENSITIVE);
+          
"SELECT\\s+.+?\\s+FROM\\s+(\\[?`?(?<schemaName>[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?`?(?<tableName>[^\\s\\[\\]`]+)\\]?`?",
+          Pattern.CASE_INSENSITIVE);
 
   private static final Pattern WRITE_STATEMENT_PATTERN =
       Pattern.compile(
-          "INSERT\\s+INTO\\s+\\[?(?<tableName>[^\\s\\[\\]]+)\\]?", 
Pattern.CASE_INSENSITIVE);
+          
"INSERT\\s+INTO\\s+(\\[?`?(?<schemaName>[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?(?<tableName>[^\\s\\[\\]]+)\\]?",
+          Pattern.CASE_INSENSITIVE);
 
-  /** Extract table name a SELECT statement. Return empty string if fail to 
extract. */
-  static String extractTableFromReadQuery(@Nullable String query) {
+  /** Extract schema and table name a SELECT statement. Return null if fail to 
extract. */
+  static @Nullable KV<@Nullable String, String> 
extractTableFromReadQuery(@Nullable String query) {
     if (query == null) {
-      return "";
+      return null;
     }
     Matcher matchRead = READ_STATEMENT_PATTERN.matcher(query);
     if (matchRead.find()) {
-      String matched = matchRead.group("tableName");
-      if (matched != null) {
-        return matched;
+      String matchedTable = matchRead.group("tableName");
+      String matchedSchema = matchRead.group("schemaName");
+      System.out.println(matchedSchema);
+      if (matchedTable != null) {
+        return KV.of(matchedSchema, matchedTable);
+      }
+    }
+    return null;
+  }
+
+  static @Nullable KV<@Nullable String, String> 
extractTableFromTable(@Nullable String table) {
+    if (table == null) {
+      return null;
+    }
+    Matcher matchRead = TABLE_PATTERN.matcher(table);
+    if (matchRead.find()) {
+      String matchedTable = matchRead.group("tableName");
+      String matchedSchema = matchRead.group("schemaName");
+      if (matchedTable != null) {
+        return KV.of(matchedSchema, matchedTable);
       }
     }
-    return "";
+    return null;
   }
 
   /** Extract table name from an INSERT statement. Return empty string if fail 
to extract. */
-  static String extractTableFromWriteQuery(@Nullable String query) {
+  static @Nullable KV<@Nullable String, String> 
extractTableFromWriteQuery(@Nullable String query) {
     if (query == null) {
-      return "";
+      return null;
     }
     Matcher matchRead = WRITE_STATEMENT_PATTERN.matcher(query);
     if (matchRead.find()) {
-      String matched = matchRead.group("tableName");
-      if (matched != null) {
-        return matched;
+      String matchedTable = matchRead.group("tableName");
+      String matchedSchema = matchRead.group("schemaName");
+      if (matchedTable != null) {
+        return KV.of(matchedSchema, matchedTable);
       }
     }
-    return "";
+    return null;
   }
 }
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 8725ef4b3f7..39e1a45d7a9 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -248,7 +248,9 @@ public class JdbcIOTest implements Serializable {
     PipelineResult result = pipeline.run();
     assertThat(
         Lineage.query(result.metrics(), Lineage.Type.SOURCE),
-        hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", 
"testDB", READ_TABLE_NAME))));
+        hasItem(
+            Lineage.getFqName(
+                "derby", ImmutableList.of("memory", "testDB", "default", 
READ_TABLE_NAME))));
   }
 
   @Test
@@ -271,7 +273,9 @@ public class JdbcIOTest implements Serializable {
     PipelineResult result = pipeline.run();
     assertThat(
         Lineage.query(result.metrics(), Lineage.Type.SOURCE),
-        hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", 
"testDB", READ_TABLE_NAME))));
+        hasItem(
+            Lineage.getFqName(
+                "derby", ImmutableList.of("memory", "testDB", "default", 
READ_TABLE_NAME))));
   }
 
   @Test
@@ -543,7 +547,9 @@ public class JdbcIOTest implements Serializable {
       assertRowCount(DATA_SOURCE, tableName, EXPECTED_ROW_COUNT);
       assertThat(
           Lineage.query(result.metrics(), Lineage.Type.SINK),
-          hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", 
"testDB", tableName))));
+          hasItem(
+              Lineage.getFqName(
+                  "derby", ImmutableList.of("memory", "testDB", "default", 
tableName))));
     } finally {
       DatabaseTestHelper.deleteTable(DATA_SOURCE, tableName);
     }
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java
index 356d6c7f8de..118eaa4df7e 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java
@@ -348,20 +348,25 @@ public class JdbcUtilTest {
 
   @Test
   public void testExtractTableFromQuery() {
-    ImmutableList<KV<String, @Nullable String>> readCases =
+    ImmutableList<KV<String, @Nullable KV<String, String>>> readCases =
         ImmutableList.of(
-            KV.of("select * from table_1", "table_1"),
-            KV.of("SELECT a, b FROM [table-2]", "table-2"),
-            KV.of("drop table not-select", ""));
-    for (KV<String, @Nullable String> testCase : readCases) {
+            KV.of("select * from table_1", KV.of(null, "table_1")),
+            KV.of("select * from public.table_1", KV.of("public", "table_1")),
+            KV.of("select * from `select`", KV.of(null, "select")),
+            KV.of("select * from `public`.`select`", KV.of("public", 
"select")),
+            KV.of("SELECT a, b FROM [table-2]", KV.of(null, "table-2")),
+            KV.of("SELECT a, b FROM [public].[table-2]", KV.of("public", 
"table-2")),
+            KV.of("drop table not-select", null));
+    for (KV<String, @Nullable KV<String, String>> testCase : readCases) {
       assertEquals(testCase.getValue(), 
JdbcUtil.extractTableFromReadQuery(testCase.getKey()));
     }
-    ImmutableList<KV<String, @Nullable String>> writeCases =
+    ImmutableList<KV<String, @Nullable KV<String, String>>> writeCases =
         ImmutableList.of(
-            KV.of("insert into table_1 values ...", "table_1"),
-            KV.of("INSERT INTO [table-2] values ...", "table-2"),
-            KV.of("drop table not-select", ""));
-    for (KV<String, @Nullable String> testCase : writeCases) {
+            KV.of("insert into table_1 values ...", KV.of(null, "table_1")),
+            KV.of("INSERT INTO [table-2] values ...", KV.of(null, "table-2")),
+            KV.of("INSERT INTO [foo].[table-2] values ...", KV.of("foo", 
"table-2")),
+            KV.of("drop table not-select", null));
+    for (KV<String, @Nullable KV<String, String>> testCase : writeCases) {
       assertEquals(testCase.getValue(), 
JdbcUtil.extractTableFromWriteQuery(testCase.getKey()));
     }
   }

Reply via email to