This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 97c5700ff5 Core: Fix JDBC Catalog table commit when migrating from 
schema V0 to V1 (#10111)
97c5700ff5 is described below

commit 97c5700ff53a4ae0a991231add7e80ab2cae3978
Author: JB Onofré <[email protected]>
AuthorDate: Tue Apr 16 07:42:50 2024 +0200

    Core: Fix JDBC Catalog table commit when migrating from schema V0 to V1 
(#10111)
---
 .../java/org/apache/iceberg/jdbc/JdbcUtil.java     | 38 +++++++--
 .../org/apache/iceberg/jdbc/TestJdbcCatalog.java   | 62 +++++++++++++-
 .../java/org/apache/iceberg/jdbc/TestJdbcUtil.java | 98 ++++++++++++++++++++++
 3 files changed, 190 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java 
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
index 077c33321b..8f918b4560 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
@@ -54,7 +54,31 @@ final class JdbcUtil {
   static final String TABLE_RECORD_TYPE = "TABLE";
   static final String VIEW_RECORD_TYPE = "VIEW";
 
-  private static final String V1_DO_COMMIT_SQL =
+  private static final String V1_DO_COMMIT_TABLE_SQL =
+      "UPDATE "
+          + CATALOG_TABLE_VIEW_NAME
+          + " SET "
+          + JdbcTableOperations.METADATA_LOCATION_PROP
+          + " = ? , "
+          + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP
+          + " = ?"
+          + " WHERE "
+          + CATALOG_NAME
+          + " = ? AND "
+          + TABLE_NAMESPACE
+          + " = ? AND "
+          + TABLE_NAME
+          + " = ? AND "
+          + JdbcTableOperations.METADATA_LOCATION_PROP
+          + " = ? AND ("
+          + RECORD_TYPE
+          + " = '"
+          + TABLE_RECORD_TYPE
+          + "'"
+          + " OR "
+          + RECORD_TYPE
+          + " IS NULL)";
+  private static final String V1_DO_COMMIT_VIEW_SQL =
       "UPDATE "
           + CATALOG_TABLE_VIEW_NAME
           + " SET "
@@ -72,7 +96,10 @@ final class JdbcUtil {
           + JdbcTableOperations.METADATA_LOCATION_PROP
           + " = ? AND "
           + RECORD_TYPE
-          + " = ?";
+          + " = "
+          + "'"
+          + VIEW_RECORD_TYPE
+          + "'";
   private static final String V0_DO_COMMIT_SQL =
       "UPDATE "
           + CATALOG_TABLE_VIEW_NAME
@@ -504,7 +531,9 @@ final class JdbcUtil {
         conn -> {
           try (PreparedStatement sql =
               conn.prepareStatement(
-                  (schemaVersion == SchemaVersion.V1) ? V1_DO_COMMIT_SQL : 
V0_DO_COMMIT_SQL)) {
+                  (schemaVersion == SchemaVersion.V1)
+                      ? (isTable ? V1_DO_COMMIT_TABLE_SQL : 
V1_DO_COMMIT_VIEW_SQL)
+                      : V0_DO_COMMIT_SQL)) {
             // UPDATE
             sql.setString(1, newMetadataLocation);
             sql.setString(2, oldMetadataLocation);
@@ -513,9 +542,6 @@ final class JdbcUtil {
             sql.setString(4, namespaceToString(identifier.namespace()));
             sql.setString(5, identifier.name());
             sql.setString(6, oldMetadataLocation);
-            if (schemaVersion == SchemaVersion.V1) {
-              sql.setString(7, isTable ? TABLE_RECORD_TYPE : VIEW_RECORD_TYPE);
-            }
 
             return sql.executeUpdate();
           }
diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java 
b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
index d8553a1858..c2f0869d00 100644
--- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
@@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
@@ -239,6 +240,17 @@ public class TestJdbcCatalog extends 
CatalogTests<JdbcCatalog> {
         .create();
 
     
assertThat(jdbcCatalog.listViews(Namespace.of("namespace1"))).hasSize(1).containsExactly(view);
+
+    TableIdentifier tableThree = TableIdentifier.of("namespace2", "table3");
+    jdbcCatalog.createTable(tableThree, SCHEMA);
+    assertThat(jdbcCatalog.tableExists(tableThree)).isTrue();
+
+    // testing append datafile to check commit, it should not throw an 
exception
+    jdbcCatalog.loadTable(tableOne).newAppend().appendFile(FILE_A).commit();
+    jdbcCatalog.loadTable(tableTwo).newAppend().appendFile(FILE_B).commit();
+
+    assertThat(jdbcCatalog.tableExists(tableOne)).isTrue();
+    assertThat(jdbcCatalog.tableExists(tableTwo)).isTrue();
   }
 
   @ParameterizedTest
@@ -1024,7 +1036,49 @@ public class TestJdbcCatalog extends 
CatalogTests<JdbcCatalog> {
     }
   }
 
+  private String createMetadataLocationViaJdbcCatalog(TableIdentifier 
identifier)
+      throws SQLException {
+    // temporary connection just to actually create a concrete metadata 
location
+    String jdbcUrl = null;
+    try {
+      java.nio.file.Path dbFile = Files.createTempFile("temp", "metadata");
+      jdbcUrl = "jdbc:sqlite:" + dbFile.toAbsolutePath();
+    } catch (IOException e) {
+      throw new SQLException("Error while creating temp data", e);
+    }
+
+    Map<String, String> properties = Maps.newHashMap();
+
+    properties.put(CatalogProperties.URI, jdbcUrl);
+
+    warehouseLocation = this.tableDir.toAbsolutePath().toString();
+    properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
+    properties.put("type", "jdbc");
+
+    JdbcCatalog jdbcCatalog =
+        (JdbcCatalog) CatalogUtil.buildIcebergCatalog("TEMP", properties, 
conf);
+    jdbcCatalog.buildTable(identifier, SCHEMA).create();
+
+    SQLiteDataSource dataSource = new SQLiteDataSource();
+    dataSource.setUrl(jdbcUrl);
+
+    try (Connection connection = dataSource.getConnection()) {
+      ResultSet result =
+          connection
+              .prepareStatement("SELECT * FROM " + 
JdbcUtil.CATALOG_TABLE_VIEW_NAME)
+              .executeQuery();
+      result.next();
+      return result.getString(JdbcTableOperations.METADATA_LOCATION_PROP);
+    }
+  }
+
   private void initLegacySchema(String jdbcUrl) throws SQLException {
+    TableIdentifier table1 = TableIdentifier.of(Namespace.of("namespace1"), 
"table1");
+    TableIdentifier table2 = TableIdentifier.of(Namespace.of("namespace2"), 
"table2");
+
+    String table1MetadataLocation = 
createMetadataLocationViaJdbcCatalog(table1);
+    String table2MetadataLocation = 
createMetadataLocationViaJdbcCatalog(table2);
+
     SQLiteDataSource dataSource = new SQLiteDataSource();
     dataSource.setUrl(jdbcUrl);
 
@@ -1045,7 +1099,9 @@ public class TestJdbcCatalog extends 
CatalogTests<JdbcCatalog> {
                   + JdbcTableOperations.METADATA_LOCATION_PROP
                   + ","
                   + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP
-                  + ") VALUES('TEST','namespace1','table1',null,null)")
+                  + ") VALUES('TEST','namespace1','table1','"
+                  + table1MetadataLocation
+                  + "',null)")
           .execute();
       connection
           .prepareStatement(
@@ -1061,7 +1117,9 @@ public class TestJdbcCatalog extends 
CatalogTests<JdbcCatalog> {
                   + JdbcTableOperations.METADATA_LOCATION_PROP
                   + ","
                   + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP
-                  + ") VALUES('TEST','namespace2','table2',null,null)")
+                  + ") VALUES('TEST','namespace2','table2','"
+                  + table2MetadataLocation
+                  + "',null)")
           .execute();
     }
   }
diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java 
b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java
index 7dde37d4b5..cfea9740eb 100644
--- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java
+++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java
@@ -20,10 +20,16 @@ package org.apache.iceberg.jdbc;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.nio.file.Files;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.util.Map;
 import java.util.Properties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.junit.jupiter.api.Test;
+import org.sqlite.SQLiteDataSource;
 
 public class TestJdbcUtil {
 
@@ -45,4 +51,96 @@ public class TestJdbcUtil {
 
     assertThat(expected).isEqualTo(actual);
   }
+
+  @Test
+  public void testV0toV1SqlStatements() throws Exception {
+    java.nio.file.Path dbFile = Files.createTempFile("icebergSchemaUpdate", 
"db");
+    String jdbcUrl = "jdbc:sqlite:" + dbFile.toAbsolutePath();
+
+    SQLiteDataSource dataSource = new SQLiteDataSource();
+    dataSource.setUrl(jdbcUrl);
+
+    try (JdbcClientPool connections = new JdbcClientPool(jdbcUrl, 
Maps.newHashMap())) {
+      // create "old style" SQL schema
+      
connections.newClient().prepareStatement(JdbcUtil.V0_CREATE_CATALOG_SQL).executeUpdate();
+
+      // inserting tables
+      JdbcUtil.doCommitCreateTable(
+          JdbcUtil.SchemaVersion.V0,
+          connections,
+          "TEST",
+          Namespace.of("namespace1"),
+          TableIdentifier.of(Namespace.of("namespace1"), "table1"),
+          "testLocation");
+      JdbcUtil.doCommitCreateTable(
+          JdbcUtil.SchemaVersion.V0,
+          connections,
+          "TEST",
+          Namespace.of("namespace1"),
+          TableIdentifier.of(Namespace.of("namespace1"), "table2"),
+          "testLocation");
+
+      try (PreparedStatement statement =
+          
connections.newClient().prepareStatement(JdbcUtil.V0_LIST_TABLE_SQL)) {
+        statement.setString(1, "TEST");
+        statement.setString(2, "namespace1");
+        ResultSet tables = statement.executeQuery();
+        tables.next();
+        assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table1");
+        tables.next();
+        assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table2");
+      }
+
+      // updating the schema from V0 to V1
+      
connections.newClient().prepareStatement(JdbcUtil.V1_UPDATE_CATALOG_SQL).execute();
+
+      // trying to add a table on the updated schema
+      JdbcUtil.doCommitCreateTable(
+          JdbcUtil.SchemaVersion.V1,
+          connections,
+          "TEST",
+          Namespace.of("namespace1"),
+          TableIdentifier.of(Namespace.of("namespace1"), "table3"),
+          "testLocation");
+
+      // testing the tables after migration and new table added
+      try (PreparedStatement statement =
+          
connections.newClient().prepareStatement(JdbcUtil.V0_LIST_TABLE_SQL)) {
+        statement.setString(1, "TEST");
+        statement.setString(2, "namespace1");
+        ResultSet tables = statement.executeQuery();
+        tables.next();
+        assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table1");
+        assertThat(tables.getString(JdbcUtil.RECORD_TYPE)).isNull();
+        tables.next();
+        assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table2");
+        assertThat(tables.getString(JdbcUtil.RECORD_TYPE)).isNull();
+        tables.next();
+        assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table3");
+        
assertThat(tables.getString(JdbcUtil.RECORD_TYPE)).isEqualTo(JdbcUtil.TABLE_RECORD_TYPE);
+      }
+
+      // update a table (commit) created on V1 schema
+      int updated =
+          JdbcUtil.updateTable(
+              JdbcUtil.SchemaVersion.V1,
+              connections,
+              "TEST",
+              TableIdentifier.of(Namespace.of("namespace1"), "table3"),
+              "newLocation",
+              "testLocation");
+      assertThat(updated).isEqualTo(1);
+
+      // update a table (commit) migrated from V0 schema
+      updated =
+          JdbcUtil.updateTable(
+              JdbcUtil.SchemaVersion.V1,
+              connections,
+              "TEST",
+              TableIdentifier.of(Namespace.of("namespace1"), "table1"),
+              "newLocation",
+              "testLocation");
+      assertThat(updated).isEqualTo(1);
+    }
+  }
 }

Reply via email to