eric-maynard commented on code in PR #1427: URL: https://github.com/apache/polaris/pull/1427#discussion_r2056888184
########## plugins/spark/README.md: ########## @@ -37,4 +37,54 @@ client tests for both Scala versions as well. The Jar can also be built alone with a specific version using target `:polaris-spark-3.5_<scala_version>`. For example: - `./gradlew :polaris-spark-3.5_2.12:createPolarisSparkJar` - Build a jar for the Polaris Spark plugin with scala version 2.12. -The result jar is located at plugins/spark/build/<scala_version>/libs after the build. +The result jar is located at plugins/spark/v3.5/build/<scala_version>/libs after the build. + +# Start Spark with Local Polaris Service using built Jar +Once the jar is built, we can manually test it with Spark and a local Polaris service. + +Following command starts a Polaris server for local testing, it runs on localhost:8181 with default +realm `POLARIS` and root credentials `root:secret`. Review Comment: ```suggestion The following command starts a Polaris server for local testing and runs on localhost:8181 with default realm `POLARIS` and root credentials `root:secret`: ``` ########## plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java: ########## @@ -258,4 +260,94 @@ void testListViews() throws Exception { viewCatalog.dropView(Identifier.of(l1ns, view1Name)); namespaceCatalog.dropNamespace(l1ns, true); } + + @Test + void testIcebergTableViewMix() throws Exception { + // initiate two namespaces with nesting + String[] l1ns = new String[] {"ns"}; + namespaceCatalog.createNamespace(l1ns, Maps.newHashMap()); Review Comment: I noticed that this test uses this format, but other tests do like `Namespace.of("ns", "l1ns");`, can we do the same here? ########## plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java: ########## @@ -151,13 +151,19 @@ public Table createTable( String provider = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY); if (PolarisCatalogUtils.useIceberg(provider)) { return this.icebergsSparkCatalog.createTable(ident, schema, transforms, properties); - } else if (PolarisCatalogUtils.useDelta(provider)) { - // For delta table, we load the delta catalog to help dealing with the - // delta log creation. - TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog); - return deltaCatalog.createTable(ident, schema, transforms, properties); } else { - return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties); + if (PolarisCatalogUtils.isTableWithSparkManagedLocation(properties)) { + throw new UnsupportedOperationException( + "Table with spark managed location is currently not supported by Polaris. Please provide location or path to the table."); + } + if (PolarisCatalogUtils.useDelta(provider)) { + // For delta table, we load the delta catalog to help dealing with the Review Comment: ```suggestion // For delta tables, we load the delta catalog to help deal with the ``` ########## plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkDeltaIT.java: ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.quarkus.test.junit.QuarkusIntegrationTest; +import java.io.File; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import org.apache.polaris.service.it.env.IntegrationTestsHelper; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.delta.DeltaAnalysisException; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +@QuarkusIntegrationTest +public class SparkDeltaIT extends SparkIntegrationBase { + private static final Random RANDOM = new Random(); + private static final String defaultNs = "delta_ns_" + RANDOM.nextInt(0, 100000); + private String tableRootDir; + + private String getTableLocation(String tableName) { + return String.format("%s/%s", tableRootDir, tableName); + } + + private String getTableNameWithRandomSuffix() { + return "deltatb" + RANDOM.nextInt(0, 1000); + } + + @BeforeEach + public void createDefaultResources(@TempDir Path tempDir) { + // create a default namespace + sql("CREATE NAMESPACE %s", defaultNs); + sql("USE NAMESPACE %s", defaultNs); + tableRootDir = + IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath(); + } + + @AfterEach + public void cleanupDeltaData() { + File dirToDelete = new File(tableRootDir); + deleteDirectory(dirToDelete); + } + + @Test + public void testBasicTableOperations() { + // create a regular delta table + String deltatb1 = "deltatb1"; + sql( + "CREATE TABLE %s (id INT, name STRING) USING DELTA LOCATION '%s'", + deltatb1, getTableLocation(deltatb1)); + sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", deltatb1); + List<Object[]> results = sql("SELECT * FROM %s ORDER BY id DESC", deltatb1); + assertThat(results.size()).isEqualTo(2); + assertThat(results.get(0)[1]).isEqualTo("bob"); + assertThat(results.get(1)[1]).isEqualTo("anna"); + + // create a detla table with partition + String deltatb2 = "deltatb2"; + sql( + "CREATE TABLE %s (name String, age INT, country STRING) USING DELTA PARTITIONED BY (country) LOCATION '%s'", + deltatb2, getTableLocation(deltatb2)); + sql( + "INSERT INTO %s VALUES ('david', 10, 'US'), ('james', 32, 'US'), ('yan', 16, 'CHINA')", + deltatb2); + results = sql("SELECT name, country FROM %s ORDER BY age", deltatb2); + assertThat(results.size()).isEqualTo(3); + assertThat(results.get(0)).isEqualTo(new Object[] {"david", "US"}); + assertThat(results.get(1)).isEqualTo(new Object[] {"yan", "CHINA"}); + assertThat(results.get(2)).isEqualTo(new Object[] {"james", "US"}); + + // verify the partition dir is created + List<String> subDirs = listDirectory(getTableLocation(deltatb2)); + assertThat(subDirs).contains("_delta_log", "country=CHINA", "country=US"); + + // test listTables + List<Object[]> tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(2); + assertThat(tables) + .contains( + new Object[] {defaultNs, deltatb1, false}, new Object[] {defaultNs, deltatb2, false}); + + sql("DROP TABLE %s", deltatb1); + sql("DROP TABLE %s", deltatb2); + tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(0); + } + + @Test + public void testAlterColumnsOperations() { + String deltatb = getTableNameWithRandomSuffix(); + sql( + "CREATE TABLE %s (id INT, name STRING) USING DELTA LOCATION '%s'", + deltatb, getTableLocation(deltatb)); + sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", deltatb); + + // add two new columns to the table + sql("Alter TABLE %s ADD COLUMNS (city STRING, age INT)", deltatb); + // add one more row to the table + sql("INSERT INTO %s VALUES (3, 'john', 'SFO', 20)", deltatb); + // verify the table now have 4 columns with correct result + List<Object[]> results = sql("SELECT * FROM %s ORDER BY id", deltatb); + assertThat(results.size()).isEqualTo(3); + assertThat(results).contains(new Object[] {1, "anna", null, null}); + assertThat(results).contains(new Object[] {2, "bob", null, null}); + assertThat(results).contains(new Object[] {3, "john", "SFO", 20}); + + // drop and rename column require set the delta.columnMapping property + sql("ALTER TABLE %s SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')", deltatb); + // drop column age + sql("Alter TABLE %s DROP COLUMN age", deltatb); + // verify the table now have 3 columns with correct result + results = sql("SELECT * FROM %s ORDER BY id", deltatb); + assertThat(results.size()).isEqualTo(3); + assertThat(results).contains(new Object[] {1, "anna", null}); + assertThat(results).contains(new Object[] {2, "bob", null}); + assertThat(results).contains(new Object[] {3, "john", "SFO"}); + + // rename column city to address + sql("Alter TABLE %s RENAME COLUMN city TO address", deltatb); + // verify column address exists + results = sql("SELECT id, address FROM %s ORDER BY id", deltatb); + assertThat(results.size()).isEqualTo(3); + assertThat(results).contains(new Object[] {1, null}); + assertThat(results).contains(new Object[] {2, null}); + assertThat(results).contains(new Object[] {3, "SFO"}); + } + + @Test + public void testAlterTableProperties() { + String deltatb = getTableNameWithRandomSuffix(); + sql( + "CREATE TABLE %s (id INT, name STRING) USING DELTA LOCATION '%s'", + deltatb, getTableLocation(deltatb)); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('description' = 'people table', 'test-owner' = 'test-user')", + deltatb); + List<Object[]> tableInfo = sql("DESCRIBE TABLE EXTENDED %s", deltatb); + // find the table properties result + String properties = null; + for (Object[] info : tableInfo) { + if (info[0].equals("Table Properties")) { + properties = (String) info[1]; + break; + } + } + assertThat(properties).contains("description=people table,test-owner=test-user"); + } + + @Test + public void testUnsupportedAlterOperations() { + String deltatb = getTableNameWithRandomSuffix(); + sql( + "CREATE TABLE %s (name String, age INT, country STRING) USING DELTA PARTITIONED BY (country) LOCATION '%s'", + deltatb, getTableLocation(deltatb)); + + // test rename generic table throws unsupported operation error + assertThatThrownBy(() -> sql("ALTER TABLE %s RENAME TO new_delta", deltatb)) + .isInstanceOf(UnsupportedOperationException.class); + + // test table relocation fails with error + assertThatThrownBy(() -> sql("ALTER TABLE %s SET LOCATION '/tmp/new/path'", deltatb)) + .isInstanceOf(DeltaAnalysisException.class); + + // test set table format fails + assertThatThrownBy(() -> sql("ALTER TABLE %s SET FILEFORMAT 'csv'", deltatb)) + .isInstanceOf(ParseException.class); + + // partition management is not supported for delta + assertThatThrownBy(() -> sql("ALTER TABLE %s ADD PARTITION (country='US')", deltatb)) + .isInstanceOf(AnalysisException.class); Review Comment: These are great, can you actually carve these out into their own tests? ########## plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java: ########## @@ -258,4 +260,94 @@ void testListViews() throws Exception { viewCatalog.dropView(Identifier.of(l1ns, view1Name)); namespaceCatalog.dropNamespace(l1ns, true); } + + @Test + void testIcebergTableViewMix() throws Exception { + // initiate two namespaces with nesting + String[] l1ns = new String[] {"ns"}; + namespaceCatalog.createNamespace(l1ns, Maps.newHashMap()); + // create a new namespace under the ns + String[] l2ns = new String[] {"ns", "nsl2"}; + namespaceCatalog.createNamespace(l2ns, Maps.newHashMap()); + + StructType iceberg_schema = + new StructType().add("isManaged", "boolean").add("people", "string"); + + // create two iceberg tables under ns + Identifier l1tb1 = Identifier.of(l1ns, "iceberg_table1"); + tableCatalog.createTable(l1tb1, iceberg_schema, new Transform[0], Maps.newHashMap()); + + Identifier l1tb2 = Identifier.of(l1ns, "iceberg_table2"); + Map<String, String> icebergProperties = Maps.newHashMap(); + icebergProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg"); + tableCatalog.createTable(l1tb2, iceberg_schema, new Transform[0], icebergProperties); + + // create one iceberg view under ns + Identifier l1view = Identifier.of(l1ns, "test_view1"); + String view1SQL = "select id from iceberg_table1 where isManaged = 'true'"; Review Comment: If `isManaged` is a boolean, why are we comparing it with a string literal? Also what is the significance of `isManaged` here? ########## plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java: ########## @@ -108,4 +92,29 @@ private Object[] toJava(Row row) { }) .toArray(Object[]::new); } + + /** Delete the file directory recursively. */ + protected void deleteDirectory(File directory) { Review Comment: `FileUtils.deleteQuietly`? ########## plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java: ########## @@ -45,4 +55,20 @@ protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret") .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2"); } + + @ParameterizedTest + @ValueSource(strings = {"delta", "csv"}) + public void testNoneIcebergTableOperationsFails(String provider) throws Exception { Review Comment: ```suggestion public void testNonIcebergTableOperationsFails(String provider) throws Exception { ``` ########## plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java: ########## @@ -50,6 +50,16 @@ public static boolean useDelta(String provider) { return "delta".equalsIgnoreCase(provider); } + /** + * For tables whose location is manged by Spark Session Catalog, there will be no location or path + * in the properties. + */ + public static boolean isTableWithSparkManagedLocation(Map<String, String> properties) { + boolean hasLocationClause = properties.get(TableCatalog.PROP_LOCATION) != null; + boolean hasPathClause = properties.get(TABLE_PATH_KEY) != null; Review Comment: Can we just use `containsKey`? ########## plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java: ########## @@ -108,4 +92,29 @@ private Object[] toJava(Row row) { }) .toArray(Object[]::new); } + + /** Delete the file directory recursively. */ + protected void deleteDirectory(File directory) { + if (directory.exists()) { + File[] files = directory.listFiles(); + if (files != null) { + for (File file : files) { + if (file.isDirectory()) { + deleteDirectory(file); + } else { + file.delete(); + } + } + } + directory.delete(); + } + } + + protected List<String> listDirectory(String directoryPath) { Review Comment: `FileUtils.listFiles` -- also, is this meant to be recursive? ########## plugins/spark/README.md: ########## @@ -37,4 +37,54 @@ client tests for both Scala versions as well. The Jar can also be built alone with a specific version using target `:polaris-spark-3.5_<scala_version>`. For example: - `./gradlew :polaris-spark-3.5_2.12:createPolarisSparkJar` - Build a jar for the Polaris Spark plugin with scala version 2.12. -The result jar is located at plugins/spark/build/<scala_version>/libs after the build. +The result jar is located at plugins/spark/v3.5/build/<scala_version>/libs after the build. + +# Start Spark with Local Polaris Service using built Jar +Once the jar is built, we can manually test it with Spark and a local Polaris service. + +Following command starts a Polaris server for local testing, it runs on localhost:8181 with default +realm `POLARIS` and root credentials `root:secret`. +```shell +./gradlew run +``` + +Once the local server is running, following command can be used to start the spark-shell with the built Spark client +jar, and uses the local Polaris server as Catalog server. Review Comment: ```suggestion Once the local server is running, the following command can be used to start the spark-shell with the built Spark client jar, and to use the local Polaris server as a Catalog: ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org