This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 97c5700ff5 Core: Fix JDBC Catalog table commit when migrating from
schema V0 to V1 (#10111)
97c5700ff5 is described below
commit 97c5700ff53a4ae0a991231add7e80ab2cae3978
Author: JB Onofré <[email protected]>
AuthorDate: Tue Apr 16 07:42:50 2024 +0200
Core: Fix JDBC Catalog table commit when migrating from schema V0 to V1
(#10111)
---
.../java/org/apache/iceberg/jdbc/JdbcUtil.java | 38 +++++++--
.../org/apache/iceberg/jdbc/TestJdbcCatalog.java | 62 +++++++++++++-
.../java/org/apache/iceberg/jdbc/TestJdbcUtil.java | 98 ++++++++++++++++++++++
3 files changed, 190 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
index 077c33321b..8f918b4560 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
@@ -54,7 +54,31 @@ final class JdbcUtil {
static final String TABLE_RECORD_TYPE = "TABLE";
static final String VIEW_RECORD_TYPE = "VIEW";
- private static final String V1_DO_COMMIT_SQL =
+ private static final String V1_DO_COMMIT_TABLE_SQL =
+ "UPDATE "
+ + CATALOG_TABLE_VIEW_NAME
+ + " SET "
+ + JdbcTableOperations.METADATA_LOCATION_PROP
+ + " = ? , "
+ + JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP
+ + " = ?"
+ + " WHERE "
+ + CATALOG_NAME
+ + " = ? AND "
+ + TABLE_NAMESPACE
+ + " = ? AND "
+ + TABLE_NAME
+ + " = ? AND "
+ + JdbcTableOperations.METADATA_LOCATION_PROP
+ + " = ? AND ("
+ + RECORD_TYPE
+ + " = '"
+ + TABLE_RECORD_TYPE
+ + "'"
+ + " OR "
+ + RECORD_TYPE
+ + " IS NULL)";
+ private static final String V1_DO_COMMIT_VIEW_SQL =
"UPDATE "
+ CATALOG_TABLE_VIEW_NAME
+ " SET "
@@ -72,7 +96,10 @@ final class JdbcUtil {
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ " = ? AND "
+ RECORD_TYPE
- + " = ?";
+ + " = "
+ + "'"
+ + VIEW_RECORD_TYPE
+ + "'";
private static final String V0_DO_COMMIT_SQL =
"UPDATE "
+ CATALOG_TABLE_VIEW_NAME
@@ -504,7 +531,9 @@ final class JdbcUtil {
conn -> {
try (PreparedStatement sql =
conn.prepareStatement(
- (schemaVersion == SchemaVersion.V1) ? V1_DO_COMMIT_SQL :
V0_DO_COMMIT_SQL)) {
+ (schemaVersion == SchemaVersion.V1)
+ ? (isTable ? V1_DO_COMMIT_TABLE_SQL :
V1_DO_COMMIT_VIEW_SQL)
+ : V0_DO_COMMIT_SQL)) {
// UPDATE
sql.setString(1, newMetadataLocation);
sql.setString(2, oldMetadataLocation);
@@ -513,9 +542,6 @@ final class JdbcUtil {
sql.setString(4, namespaceToString(identifier.namespace()));
sql.setString(5, identifier.name());
sql.setString(6, oldMetadataLocation);
- if (schemaVersion == SchemaVersion.V1) {
- sql.setString(7, isTable ? TABLE_RECORD_TYPE : VIEW_RECORD_TYPE);
- }
return sql.executeUpdate();
}
diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
index d8553a1858..c2f0869d00 100644
--- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
@@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@@ -239,6 +240,17 @@ public class TestJdbcCatalog extends
CatalogTests<JdbcCatalog> {
.create();
assertThat(jdbcCatalog.listViews(Namespace.of("namespace1"))).hasSize(1).containsExactly(view);
+
+ TableIdentifier tableThree = TableIdentifier.of("namespace2", "table3");
+ jdbcCatalog.createTable(tableThree, SCHEMA);
+ assertThat(jdbcCatalog.tableExists(tableThree)).isTrue();
+
+ // testing append datafile to check commit, it should not throw an
exception
+ jdbcCatalog.loadTable(tableOne).newAppend().appendFile(FILE_A).commit();
+ jdbcCatalog.loadTable(tableTwo).newAppend().appendFile(FILE_B).commit();
+
+ assertThat(jdbcCatalog.tableExists(tableOne)).isTrue();
+ assertThat(jdbcCatalog.tableExists(tableTwo)).isTrue();
}
@ParameterizedTest
@@ -1024,7 +1036,49 @@ public class TestJdbcCatalog extends
CatalogTests<JdbcCatalog> {
}
}
+ private String createMetadataLocationViaJdbcCatalog(TableIdentifier
identifier)
+ throws SQLException {
+ // temporary connection just to actually create a concrete metadata
location
+ String jdbcUrl = null;
+ try {
+ java.nio.file.Path dbFile = Files.createTempFile("temp", "metadata");
+ jdbcUrl = "jdbc:sqlite:" + dbFile.toAbsolutePath();
+ } catch (IOException e) {
+ throw new SQLException("Error while creating temp data", e);
+ }
+
+ Map<String, String> properties = Maps.newHashMap();
+
+ properties.put(CatalogProperties.URI, jdbcUrl);
+
+ warehouseLocation = this.tableDir.toAbsolutePath().toString();
+ properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
+ properties.put("type", "jdbc");
+
+ JdbcCatalog jdbcCatalog =
+ (JdbcCatalog) CatalogUtil.buildIcebergCatalog("TEMP", properties,
conf);
+ jdbcCatalog.buildTable(identifier, SCHEMA).create();
+
+ SQLiteDataSource dataSource = new SQLiteDataSource();
+ dataSource.setUrl(jdbcUrl);
+
+ try (Connection connection = dataSource.getConnection()) {
+ ResultSet result =
+ connection
+ .prepareStatement("SELECT * FROM " +
JdbcUtil.CATALOG_TABLE_VIEW_NAME)
+ .executeQuery();
+ result.next();
+ return result.getString(JdbcTableOperations.METADATA_LOCATION_PROP);
+ }
+ }
+
private void initLegacySchema(String jdbcUrl) throws SQLException {
+ TableIdentifier table1 = TableIdentifier.of(Namespace.of("namespace1"),
"table1");
+ TableIdentifier table2 = TableIdentifier.of(Namespace.of("namespace2"),
"table2");
+
+ String table1MetadataLocation =
createMetadataLocationViaJdbcCatalog(table1);
+ String table2MetadataLocation =
createMetadataLocationViaJdbcCatalog(table2);
+
SQLiteDataSource dataSource = new SQLiteDataSource();
dataSource.setUrl(jdbcUrl);
@@ -1045,7 +1099,9 @@ public class TestJdbcCatalog extends
CatalogTests<JdbcCatalog> {
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ ","
+ JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP
- + ") VALUES('TEST','namespace1','table1',null,null)")
+ + ") VALUES('TEST','namespace1','table1','"
+ + table1MetadataLocation
+ + "',null)")
.execute();
connection
.prepareStatement(
@@ -1061,7 +1117,9 @@ public class TestJdbcCatalog extends
CatalogTests<JdbcCatalog> {
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ ","
+ JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP
- + ") VALUES('TEST','namespace2','table2',null,null)")
+ + ") VALUES('TEST','namespace2','table2','"
+ + table2MetadataLocation
+ + "',null)")
.execute();
}
}
diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java
b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java
index 7dde37d4b5..cfea9740eb 100644
--- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java
+++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java
@@ -20,10 +20,16 @@ package org.apache.iceberg.jdbc;
import static org.assertj.core.api.Assertions.assertThat;
+import java.nio.file.Files;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.util.Map;
import java.util.Properties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.Test;
+import org.sqlite.SQLiteDataSource;
public class TestJdbcUtil {
@@ -45,4 +51,96 @@ public class TestJdbcUtil {
assertThat(expected).isEqualTo(actual);
}
+
+ @Test
+ public void testV0toV1SqlStatements() throws Exception {
+ java.nio.file.Path dbFile = Files.createTempFile("icebergSchemaUpdate",
"db");
+ String jdbcUrl = "jdbc:sqlite:" + dbFile.toAbsolutePath();
+
+ SQLiteDataSource dataSource = new SQLiteDataSource();
+ dataSource.setUrl(jdbcUrl);
+
+ try (JdbcClientPool connections = new JdbcClientPool(jdbcUrl,
Maps.newHashMap())) {
+ // create "old style" SQL schema
+
connections.newClient().prepareStatement(JdbcUtil.V0_CREATE_CATALOG_SQL).executeUpdate();
+
+ // inserting tables
+ JdbcUtil.doCommitCreateTable(
+ JdbcUtil.SchemaVersion.V0,
+ connections,
+ "TEST",
+ Namespace.of("namespace1"),
+ TableIdentifier.of(Namespace.of("namespace1"), "table1"),
+ "testLocation");
+ JdbcUtil.doCommitCreateTable(
+ JdbcUtil.SchemaVersion.V0,
+ connections,
+ "TEST",
+ Namespace.of("namespace1"),
+ TableIdentifier.of(Namespace.of("namespace1"), "table2"),
+ "testLocation");
+
+ try (PreparedStatement statement =
+
connections.newClient().prepareStatement(JdbcUtil.V0_LIST_TABLE_SQL)) {
+ statement.setString(1, "TEST");
+ statement.setString(2, "namespace1");
+ ResultSet tables = statement.executeQuery();
+ tables.next();
+ assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table1");
+ tables.next();
+ assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table2");
+ }
+
+ // updating the schema from V0 to V1
+
connections.newClient().prepareStatement(JdbcUtil.V1_UPDATE_CATALOG_SQL).execute();
+
+ // trying to add a table on the updated schema
+ JdbcUtil.doCommitCreateTable(
+ JdbcUtil.SchemaVersion.V1,
+ connections,
+ "TEST",
+ Namespace.of("namespace1"),
+ TableIdentifier.of(Namespace.of("namespace1"), "table3"),
+ "testLocation");
+
+ // testing the tables after migration and new table added
+ try (PreparedStatement statement =
+
connections.newClient().prepareStatement(JdbcUtil.V0_LIST_TABLE_SQL)) {
+ statement.setString(1, "TEST");
+ statement.setString(2, "namespace1");
+ ResultSet tables = statement.executeQuery();
+ tables.next();
+ assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table1");
+ assertThat(tables.getString(JdbcUtil.RECORD_TYPE)).isNull();
+ tables.next();
+ assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table2");
+ assertThat(tables.getString(JdbcUtil.RECORD_TYPE)).isNull();
+ tables.next();
+ assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table3");
+
assertThat(tables.getString(JdbcUtil.RECORD_TYPE)).isEqualTo(JdbcUtil.TABLE_RECORD_TYPE);
+ }
+
+ // update a table (commit) created on V1 schema
+ int updated =
+ JdbcUtil.updateTable(
+ JdbcUtil.SchemaVersion.V1,
+ connections,
+ "TEST",
+ TableIdentifier.of(Namespace.of("namespace1"), "table3"),
+ "newLocation",
+ "testLocation");
+ assertThat(updated).isEqualTo(1);
+
+ // update a table (commit) migrated from V0 schema
+ updated =
+ JdbcUtil.updateTable(
+ JdbcUtil.SchemaVersion.V1,
+ connections,
+ "TEST",
+ TableIdentifier.of(Namespace.of("namespace1"), "table1"),
+ "newLocation",
+ "testLocation");
+ assertThat(updated).isEqualTo(1);
+ }
+ }
}