rdblue commented on a change in pull request #1525:
URL: https://github.com/apache/iceberg/pull/1525#discussion_r516209143



##########
File path: 
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
+
+public class TestCreateActions extends SparkCatalogTestBase {
+
+  // Only valid for IcebergV2Catalog - Hadoop Catalog does not support Staged 
Tables

Review comment:
       It sounds like creating a snapshot table should just use the new table's 
default location rather than requiring a location. Why did you choose to 
require a location?

##########
File path: 
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
+
+public class TestCreateActions extends SparkCatalogTestBase {
+
+  // Only valid for IcebergV2Catalog - Hadoop Catalog does not support Staged 
Tables
+  @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"spark_catalog", SparkSessionCatalog.class.getName(), 
ImmutableMap.of(
+            "type", "hive",
+            "default-namespace", "default",
+            "parquet-enabled", "true",
+            "cache-enabled", "false" // Spark will delete tables using v1, 
leaving the cache out of sync
+        )},
+        new Object[] { "testhive", SparkCatalog.class.getName(), 
ImmutableMap.of(
+            "type", "hive",
+            "default-namespace", "default"
+        )}
+    };
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  String baseTableName = "baseTable";
+  File tableDir;
+  String tableLocation;
+  final String implementation;
+  final String type;
+  final TableCatalog catalog;
+
+  public TestCreateActions(
+      String catalogName,
+      String implementation,
+      Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.catalog = (TableCatalog) 
spark.sessionState().catalogManager().catalog(catalogName);
+    this.implementation = implementation;
+    this.type = config.get("type");
+  }
+
+  @Before
+  public void before() {
+    try {
+      this.tableDir = temp.newFolder();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.tableLocation = tableDir.toURI().toString();
+
+    spark.conf().set("hive.exec.dynamic.partition", "true");
+    spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));
+
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data").orderBy("data").write()
+        .mode("append")
+        .option("path", tableLocation)
+        .saveAsTable(baseTableName);
+  }
+
+  @After
+  public void after() throws IOException {
+    // Drop the hive table.
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));
+  }
+
+
+  @Test
+  public void testMigratePartitioned() throws Exception {
+    String dest = uniqueName("iceberg_migrate_partitioned");
+    String source = uniqueName("test_migrate_partitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet PARTITIONED BY (id) location '%s' AS 
SELECT * FROM %s",
+        () -> Actions.migrate(source).as(dest),
+        3);
+  }
+
+  @Test
+  public void testMigrateUnpartitioned() throws Exception {
+    String dest = uniqueName("iceberg_migrate_unpartitioned");
+    String source = uniqueName("test_migrate_unpartitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet location '%s' AS SELECT * FROM %s",
+        () -> Actions.migrate(source).as(dest),
+        2);
+  }
+
+  @Test
+  public void testMigrateReplace() throws Exception {
+    // We can't do a replacement unless we have overridden the spark_catalog
+    if (catalog.name().equals("spark_catalog")) {
+      String source = uniqueName(uniqueName("iceberg_migrate_replace"));
+      testCreate(source,
+          source,
+          "CREATE TABLE %s using parquet PARTITIONED BY (id) location '%s' AS 
SELECT * FROM %s",
+          () -> Actions.migrate(source),
+          3);
+    }
+  }
+
+  @Test
+  public void testSnapshotPartitioned() throws Exception {
+    File location = temp.newFolder();
+    String dest = uniqueName("iceberg_snapshot_partitioned");
+    String source = uniqueName("test_snapshot_partitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet PARTITIONED BY (id) location '%s' AS 
SELECT * FROM %s",
+        () -> Actions.snapshot(source, dest, location.toString()),
+        3);
+    testIsolatedSnapshot(source, dest);
+  }
+
+  @Test
+  public void testSnapshotUnpartitioned() throws Exception {
+    File location = temp.newFolder();
+    String dest = uniqueName("iceberg_snapshot_unpartitioned");
+    String source = uniqueName("test_snapshot_unpartitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet location '%s' AS SELECT * FROM %s",
+        () -> Actions.snapshot(source, dest, location.toString()),
+        2);
+    testIsolatedSnapshot(source, dest);
+  }
+
+  @Test
+  public void testSnapshotHiveTable() throws Exception {
+    File location = temp.newFolder();
+    String dest = uniqueName("iceberg_snapshot_hive_table");
+    String source = uniqueName("snapshot_hive_table");
+    testCreate(source,
+        dest,
+        String.format("CREATE EXTERNAL TABLE %s (id Int, data String) STORED 
AS parquet LOCATION '%s'", source,
+            tableLocation),
+        () -> Actions.snapshot(source, dest, location.toString()),
+        3);
+    testIsolatedSnapshot(source, dest);
+  }
+
+  @Test
+  public void testProperties() throws Exception {
+    String dest = uniqueName("iceberg_properties");
+    String source = uniqueName("test_properties_table");
+    Map<String, String> props = Maps.newHashMap();
+    props.put("city", "New Orleans");
+    props.put("note", "Jazz");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet location '%s' AS SELECT * FROM %s",
+        () -> Actions.migrate(source).as(dest)
+        .withAdditionalProperty("dogs", "sundance")
+        .withAdditionalProperties(props),
+        2);
+
+    SparkTable table = loadTable(dest);
+
+
+    Map<String, String> expectedProps = Maps.newHashMap();
+    expectedProps.putAll(props);
+    expectedProps.put("dogs", "sundance");
+
+    for (Map.Entry<String, String> entry : expectedProps.entrySet()) {
+      Assert.assertTrue(
+          "Created table missing property " + entry.getKey(),
+          table.properties().containsKey(entry.getKey()));
+      Assert.assertEquals("Property value is not the expected value",
+          entry.getValue(), table.properties().get(entry.getKey()));
+    }
+  }
+
+  private SparkTable loadTable(String name) throws NoSuchTableException {
+    return (SparkTable) 
catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, name).identifier());
+  }
+
+  // Creates a table, runs the migration command and checks the results.
+  private void testCreate(String source, String dest, String sqlCreate, 
Supplier<CreateAction> action,
+      long expectedMigratedFiles) throws
+      Exception {
+
+    File location = temp.newFolder();
+
+    spark.sql(String.format(sqlCreate, source, location, baseTableName));
+
+    long migratedFiles = action.get().execute();
+
+    SparkTable table = loadTable(dest);
+
+    Assert.assertEquals("Provider should be iceberg", "iceberg",
+        table.properties().get(TableCatalog.PROP_PROVIDER));
+    Assert.assertEquals("Expected number of migrated files", 
expectedMigratedFiles, migratedFiles);
+    Assert.assertEquals("Expected rows in table ", 3, 
spark.table(dest).count());
+  }
+
+  // Inserts records into the destination, makes sure those records exist and 
source table is unchanged
+  private void testIsolatedSnapshot(String source, String dest) {
+    List<Row> expected = spark.sql(String.format("SELECT * FROM %s", 
source)).collectAsList();
+
+    List<SimpleRecord> extraData = Lists.newArrayList(
+        new SimpleRecord(4, "d")
+    );
+    Dataset<Row> df = spark.createDataFrame(extraData, SimpleRecord.class);
+    df.write().format("iceberg").mode("append").saveAsTable(dest);
+
+    List<Row> result = spark.sql(String.format("SELECT * FROM %s", 
source)).collectAsList();
+    Assert.assertEquals("No additional rows should be added to the original 
table", expected.size(),
+        result.size());
+
+    List<Row> snapshot = spark.sql(String.format("SELECT * FROM %s WHERE id = 
4 AND data = 'd'", dest)).collectAsList();
+    Assert.assertEquals("Added row not found in snapshot", 1, snapshot.size());
+  }
+
+  private String uniqueName(String source) {

Review comment:
       I don't think a `DROP IF EXISTS` should take 2 minutes with the retry. 
That sounds like something in the catalog is broken. The retry should only 
happen if the metadata file is known, but can't be loaded. In that case, the 
table does exist.
   
   Maybe the test before/after methods are dropping the temporary files before 
dropping the table?

##########
File path: spark/src/main/java/org/apache/iceberg/actions/CreateAction.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+
+interface CreateAction extends Action<Long> {
+
+  /**
+   * Adds additional properties to the newly created Iceberg Table. Any 
properties with
+   * the same key name will be overwritten.
+   * @param properties a map of properties to be included
+   * @return this for chaining
+   */
+  CreateAction withAdditionalProperties(Map<String, String> properties);
+
+  /**
+   * Adds an additional property to the newly created Iceberg Table. Any 
properties
+   * with the same key name will be overwritten.
+   * @param key the key of the property to add
+   * @param value the value of the property to add
+   * @return this for chaining
+   */
+  CreateAction withAdditionalProperty(String key, String value);

Review comment:
       I think both migrate and snapshot should copy everything from the 
original table, including properties. That's one reason why I like `set`: it 
makes no guarantees about the other table's properties, only that the given 
key/value will be set in the new table.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to