RussellSpitzer commented on a change in pull request #1525:
URL: https://github.com/apache/iceberg/pull/1525#discussion_r516124483



##########
File path: spark/src/main/java/org/apache/iceberg/actions/CreateAction.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+
+interface CreateAction extends Action<Long> {
+
+  /**
+   * Adds additional properties to the newly created Iceberg Table. Any 
properties with
+   * the same key name will be overwritten.
+   * @param properties a map of properties to be included
+   * @return this for chaining
+   */
+  CreateAction withAdditionalProperties(Map<String, String> properties);
+
+  /**
+   * Adds an additional property to the newly created Iceberg Table. Any 
properties
+   * with the same key name will be overwritten.
+   * @param key the key of the property to add
+   * @param value the value of the property to add
+   * @return this for chaining
+   */
+  CreateAction withAdditionalProperty(String key, String value);

Review comment:
       This was actually part of an earlier discussion I had with @aokolnychyi, 
We were discussing whether we should be preserving the properties set in the 
origin table for the operation. I had "additional" here because I settled on a 
behavior we had previously be using internally which copied over whatever 
properties the original source originally had. 
   
   I can go back to "set and setAll" but I would like to hear your opinion on 
copying properties from the origins.

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -543,4 +550,63 @@ private static String 
sqlString(org.apache.iceberg.expressions.Literal<?> lit) {
       }
     }
   }
+
+  public static CatalogAndIdentifier catalogAndIdentifier(SparkSession spark, 
String name) {
+    try {
+      return catalogAndIdentifier(spark, 
spark.sessionState().sqlParser().parseMultipartIdentifier(name));
+    } catch (ParseException e) {
+      throw new IllegalArgumentException(String.format("Cannot parse 
identifier %s", name), e);
+    }
+  }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier 
represents
+   * @param spark Spark session to use for resolution
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static CatalogAndIdentifier catalogAndIdentifier(SparkSession spark, 
Seq<String> nameParts) {

Review comment:
       Already upset about that Scala 2.13 change :) Yeah no problem, I'll stay 
Java only in message signatures with the exception of Spark Specific classes.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/actions/Spark3CreateAction.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import jline.internal.Log;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * This action will migrate a known table in a Spark Catalog that is not an 
Iceberg table into an Iceberg table.
+ * The created new table will be able to interact with and modify files in the 
original table.
+ *
+ * There are two main code paths
+ *   - Creating a brand new iceberg table or replacing an existing Iceberg 
table
+ *   This pathway will use a staged table to stage the creation or 
replacement, only committing after
+ *   import has succeeded.
+ *
+ *   - Replacing a table in the Session Catalog with an Iceberg Table with the 
same name
+ *   This pathway will create a temporary table with a different name. This 
replacement table will
+ *   be committed upon a successful import. Then the original session catalog 
entry will be dropped
+ *   and the new replacement table renamed to take its place.
+ */
+class Spark3CreateAction implements CreateAction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(Spark3CreateAction.class);
+  private static final Set<String> ALLOWED_SOURCES = 
ImmutableSet.of("parquet", "avro", "orc", "hive");
+  private static final String ICEBERG_METADATA_FOLDER = "metadata";
+  private static final String REPLACEMENT_NAME = "_REPLACEMENT_";
+
+  private final SparkSession spark;
+
+  // Source Fields
+  private final CatalogTable sourceTable;
+  private final String sourceTableLocation;
+  private final CatalogPlugin sourceCatalog;
+  private final Identifier sourceTableName;
+  private final PartitionSpec sourcePartitionSpec;
+
+  // Destination Fields
+  private final Boolean sessionCatalogReplacement;
+  private final CatalogPlugin destCatalog;
+  private final Identifier destTableName;
+
+  // Optional Parameters for destination
+  private String destDataLocation;
+  private String destMetadataLocation;
+  private Map<String, String> additionalProperties = Maps.newHashMap();
+
+  Spark3CreateAction(SparkSession spark, CatalogPlugin sourceCatalog, 
Identifier sourceTableName,
+                       CatalogPlugin destCatalog,  Identifier destTableName) {
+
+    this.spark = spark;
+    this.sourceCatalog = checkSourceCatalog(sourceCatalog);
+    this.sourceTableName = sourceTableName;
+    this.destCatalog = destCatalog;
+    this.destTableName = destTableName;
+
+    try {
+      String sourceString = String.join(".", sourceTableName.namespace()) + 
"." + sourceTableName.name();
+      sourcePartitionSpec = SparkSchemaUtil.specForTable(spark, sourceString);
+    } catch (AnalysisException e) {
+      throw new IllegalArgumentException("Cannot determining partitioning of " 
+ sourceTableName.toString(), e);
+    }
+
+    try {
+      this.sourceTable = 
spark.sessionState().catalog().getTableMetadata(Spark3Util.toTableIdentifier(sourceTableName));
+    } catch (NoSuchTableException | NoSuchDatabaseException e) {
+      throw new IllegalArgumentException(String.format("Could not find source 
table %s", sourceTableName), e);
+    }
+    validateSourceTable(sourceTable, ALLOWED_SOURCES);
+
+    this.sessionCatalogReplacement = isSessionCatalogReplacement();
+
+    this.sourceTableLocation = 
CatalogUtils.URIToString(sourceTable.storage().locationUri().get());
+    this.destDataLocation = sourceTableLocation;
+    this.destMetadataLocation = sourceTableLocation + "/" + 
ICEBERG_METADATA_FOLDER;
+  }
+
+  private boolean isSessionCatalogReplacement() {
+    boolean sourceIceberg = 
sourceTable.provider().get().toLowerCase(Locale.ROOT).equals("iceberg");
+    boolean sameCatalog = sourceCatalog == destCatalog;
+    boolean sameIdentifier = 
sourceTableName.name().equals(destTableName.name()) &&
+        Arrays.equals(sourceTableName.namespace(), destTableName.namespace());
+    return !sourceIceberg && sameCatalog && sameIdentifier;
+  }
+
+
+  /**
+   * Creates the Iceberg data and metadata at a given location instead of the 
source table
+   * location. New metadata and data files will be added to this
+   * new location and further operations will not effect the source table.
+   *
+   * Use this if you would like to experiment with Iceberg without changing
+   * your original files.
+   * @param newLocation the base directory for the new Iceberg Table
+   * @return this for chaining
+   */
+  CreateAction asSnapshotAtLocation(String newLocation) {
+    this.destDataLocation = newLocation;
+    this.destMetadataLocation = newLocation + "/" + ICEBERG_METADATA_FOLDER;
+    return this;
+  }
+
+  @Override
+  public Long execute() {
+    StagingTableCatalog stagingCatalog = checkDestinationCatalog(destCatalog);
+    Map<String, String> newTableProperties = new ImmutableMap.Builder<String, 
String>()
+        .put(TableCatalog.PROP_PROVIDER, "iceberg")
+        
.putAll(JavaConverters.mapAsJavaMapConverter(sourceTable.properties()).asJava())
+        .putAll(extraIcebergTableProps(destDataLocation, destMetadataLocation))
+        .putAll(additionalProperties)
+        .build();
+
+    StagedTable stagedTable;
+    try {
+      if (sessionCatalogReplacement) {
+        /*
+         * Spark Session Catalog cannot stage a replacement of a Session table 
with an Iceberg Table.
+         * To workaround this we create a replacement table which is renamed 
after it
+         * is successfully constructed.
+         */
+        stagedTable = stagingCatalog.stageCreate(Identifier.of(
+            destTableName.namespace(),
+            destTableName.name() + REPLACEMENT_NAME),
+            sourceTable.schema(),
+            Spark3Util.toTransforms(sourcePartitionSpec), newTableProperties);
+      } else {
+        stagedTable = stagingCatalog.stageCreate(destTableName, 
sourceTable.schema(),
+            Spark3Util.toTransforms(sourcePartitionSpec), newTableProperties);
+      }
+    } catch (NoSuchNamespaceException e) {
+      throw new IllegalArgumentException("Cannot create a new table in a 
namespace which does not exist", e);
+    } catch (TableAlreadyExistsException e) {
+      throw new IllegalArgumentException("Destination table already exists", 
e);
+    }
+
+    String stagingLocation = destMetadataLocation;
+    Table icebergTable = ((SparkTable) stagedTable).table();
+
+    LOG.info("Beginning migration of {} to {}", sourceTableName, 
destTableName);
+    long numMigratedFiles = 0;
+    try {
+      SparkTableUtil.importSparkTable(spark, 
Spark3Util.toTableIdentifier(sourceTableName), icebergTable,
+          stagingLocation);
+
+      Snapshot snapshot = icebergTable.currentSnapshot();
+      numMigratedFiles = 
Long.valueOf(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+
+      stagedTable.commitStagedChanges();
+      LOG.info("Successfully loaded Iceberg metadata for {} files", 
numMigratedFiles);
+    } catch (Exception e) {
+      LOG.error("Error when attempting to commit migration changes, rolling 
back", e);
+      stagedTable.abortStagedChanges();
+      throw e;
+    }
+
+    if (sessionCatalogReplacement) {
+      Identifier replacementTable = Identifier.of(destTableName.namespace(), 
destTableName.name() + REPLACEMENT_NAME);
+      try {
+        stagingCatalog.dropTable(destTableName);
+        stagingCatalog.renameTable(replacementTable, destTableName);
+      } catch (NoSuchTableException e) {
+        LOG.error("Cannot migrate, replacement table is missing. Attempting to 
recreate source table", e);
+        try {
+          stagingCatalog.createTable(sourceTableName, sourceTable.schema(),
+              Spark3Util.toTransforms(sourcePartitionSpec), 
JavaConverters.mapAsJavaMap(sourceTable.properties()));
+        } catch (TableAlreadyExistsException tableAlreadyExistsException) {
+          Log.error("Cannot recreate source table. Source table has already 
been recreated", e);
+          throw new RuntimeException(e);
+        } catch (NoSuchNamespaceException noSuchNamespaceException) {
+          Log.error("Cannot recreate source table. Source namespace has been 
removed, cannot recreate", e);
+          throw new RuntimeException(e);
+        }
+      } catch (TableAlreadyExistsException e) {
+        Log.error("Cannot migrate, Source table was recreated before 
replacement could be moved. " +
+            "Attempting to remove replacement table.", e);
+        stagingCatalog.dropTable(replacementTable);
+        stagedTable.abortStagedChanges();
+      }
+    }
+
+    return numMigratedFiles;
+  }
+
+  @Override
+  public CreateAction withAdditionalProperties(Map<String, String> properties) 
{
+    this.additionalProperties.putAll(properties);
+    return this;
+  }
+
+  @Override
+  public CreateAction withAdditionalProperty(String key, String value) {
+    this.additionalProperties.put(key, value);
+    return this;
+  }
+
+  @Override
+  public CreateAction as(String newName) {
+    Spark3Util.CatalogAndIdentifier newDest = 
Spark3Util.catalogAndIdentifier(spark, newName);
+    return new Spark3CreateAction(spark, sourceCatalog, sourceTableName, 
newDest.catalog(), newDest.identifier())
+        .withAdditionalProperties(this.additionalProperties);
+  }
+
+  private static void validateSourceTable(CatalogTable sourceTable, 
Set<String> supportedSourceTableProviders) {
+    String sourceTableProvider = 
sourceTable.provider().get().toLowerCase(Locale.ROOT);
+
+    if (!supportedSourceTableProviders.contains(sourceTableProvider)) {
+      throw new IllegalArgumentException(
+          String.format("Cannot create an Iceberg table from source provider: 
%s", sourceTableProvider));
+    }
+    if (sourceTable.storage().locationUri().isEmpty()) {
+      throw new IllegalArgumentException("Cannot create an Iceberg table from 
a source without an explicit location");
+    }
+  }
+
+  private static Map<String, String> extraIcebergTableProps(String 
tableLocation, String metadataLocation) {
+    return ImmutableMap.of(
+        TableProperties.WRITE_METADATA_LOCATION, metadataLocation,
+        TableProperties.WRITE_NEW_DATA_LOCATION, tableLocation,
+        "migrated", "true");
+  }
+
+  private static StagingTableCatalog checkDestinationCatalog(CatalogPlugin 
catalog) {
+    if (!(catalog instanceof SparkSessionCatalog) && !(catalog instanceof 
SparkCatalog)) {
+      throw new IllegalArgumentException(String.format("Cannot create Iceberg 
table in non Iceberg Catalog. " +
+              "Catalog %s was of class %s but %s or %s are required", 
catalog.name(), catalog.getClass(),
+          SparkSessionCatalog.class.getName(), SparkCatalog.class.getName()));
+    }
+    return (StagingTableCatalog) catalog;
+  }
+
+  private CatalogPlugin checkSourceCatalog(CatalogPlugin catalog) {
+    // Currently the Import code relies on being able to look up the table in 
the session code

Review comment:
       The limitation here is based on how the current SparkUtil.import code is 
written. It an't handle catalogs or things like that so I thought this would be 
the easiest approach for now.
   
   I think what we should do is modify that utility function to be less 
SparkCatalog specific, or at least have ways of calling it that describe a 
table based on partitions (like in your code) and other properties directly 
rather than assuming it is a "table name" that can be looked up in the catalog.
   
   There other approach I was thinking about was writing a version of the 
function which is just all in on Spark3 and uses the CatalogV2 table api. We 
already have a few discussions going on in issues about how we are going to 
deal with this for other Actions which all have similar limitations at the 
moment.

##########
File path: 
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
+
+public class TestCreateActions extends SparkCatalogTestBase {
+
+  // Only valid for IcebergV2Catalog - Hadoop Catalog does not support Staged 
Tables

Review comment:
       Yeah I can probably allow this and test it better. I was running into 
issues because the HadoopOps fails if you manually specify a location for 
metadata or data since that violates the table structure. Since the current 
implementation always manually specifies these locations it breaks with the 
Hadoop Catalog if you don't pick out the exact right location when snapshotting.
   
   Let me go back and see if I can make that less confusing and support the 
Hadoop backed Catalog better

##########
File path: 
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
+
+public class TestCreateActions extends SparkCatalogTestBase {
+
+  // Only valid for IcebergV2Catalog - Hadoop Catalog does not support Staged 
Tables
+  @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"spark_catalog", SparkSessionCatalog.class.getName(), 
ImmutableMap.of(
+            "type", "hive",
+            "default-namespace", "default",
+            "parquet-enabled", "true",
+            "cache-enabled", "false" // Spark will delete tables using v1, 
leaving the cache out of sync
+        )},
+        new Object[] { "testhive", SparkCatalog.class.getName(), 
ImmutableMap.of(
+            "type", "hive",
+            "default-namespace", "default"
+        )}
+    };
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  String baseTableName = "baseTable";
+  File tableDir;
+  String tableLocation;
+  final String implementation;
+  final String type;
+  final TableCatalog catalog;
+
+  public TestCreateActions(
+      String catalogName,
+      String implementation,
+      Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.catalog = (TableCatalog) 
spark.sessionState().catalogManager().catalog(catalogName);
+    this.implementation = implementation;
+    this.type = config.get("type");
+  }
+
+  @Before
+  public void before() {
+    try {
+      this.tableDir = temp.newFolder();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.tableLocation = tableDir.toURI().toString();
+
+    spark.conf().set("hive.exec.dynamic.partition", "true");
+    spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));
+
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data").orderBy("data").write()
+        .mode("append")
+        .option("path", tableLocation)
+        .saveAsTable(baseTableName);
+  }
+
+  @After
+  public void after() throws IOException {
+    // Drop the hive table.
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));
+  }
+
+
+  @Test
+  public void testMigratePartitioned() throws Exception {
+    String dest = uniqueName("iceberg_migrate_partitioned");
+    String source = uniqueName("test_migrate_partitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet PARTITIONED BY (id) location '%s' AS 
SELECT * FROM %s",
+        () -> Actions.migrate(source).as(dest),
+        3);
+  }
+
+  @Test
+  public void testMigrateUnpartitioned() throws Exception {
+    String dest = uniqueName("iceberg_migrate_unpartitioned");
+    String source = uniqueName("test_migrate_unpartitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet location '%s' AS SELECT * FROM %s",
+        () -> Actions.migrate(source).as(dest),
+        2);
+  }
+
+  @Test
+  public void testMigrateReplace() throws Exception {
+    // We can't do a replacement unless we have overridden the spark_catalog
+    if (catalog.name().equals("spark_catalog")) {

Review comment:
       Yep, just learned about Assume last week. Will implement it here!

##########
File path: 
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
+
+public class TestCreateActions extends SparkCatalogTestBase {
+
+  // Only valid for IcebergV2Catalog - Hadoop Catalog does not support Staged 
Tables
+  @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"spark_catalog", SparkSessionCatalog.class.getName(), 
ImmutableMap.of(
+            "type", "hive",
+            "default-namespace", "default",
+            "parquet-enabled", "true",
+            "cache-enabled", "false" // Spark will delete tables using v1, 
leaving the cache out of sync
+        )},
+        new Object[] { "testhive", SparkCatalog.class.getName(), 
ImmutableMap.of(
+            "type", "hive",
+            "default-namespace", "default"
+        )}
+    };
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  String baseTableName = "baseTable";
+  File tableDir;
+  String tableLocation;
+  final String implementation;
+  final String type;
+  final TableCatalog catalog;
+
+  public TestCreateActions(
+      String catalogName,
+      String implementation,
+      Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.catalog = (TableCatalog) 
spark.sessionState().catalogManager().catalog(catalogName);
+    this.implementation = implementation;
+    this.type = config.get("type");
+  }
+
+  @Before
+  public void before() {
+    try {
+      this.tableDir = temp.newFolder();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.tableLocation = tableDir.toURI().toString();
+
+    spark.conf().set("hive.exec.dynamic.partition", "true");
+    spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));
+
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data").orderBy("data").write()
+        .mode("append")
+        .option("path", tableLocation)
+        .saveAsTable(baseTableName);
+  }
+
+  @After
+  public void after() throws IOException {
+    // Drop the hive table.
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));
+  }
+
+
+  @Test
+  public void testMigratePartitioned() throws Exception {
+    String dest = uniqueName("iceberg_migrate_partitioned");
+    String source = uniqueName("test_migrate_partitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet PARTITIONED BY (id) location '%s' AS 
SELECT * FROM %s",
+        () -> Actions.migrate(source).as(dest),
+        3);
+  }
+
+  @Test
+  public void testMigrateUnpartitioned() throws Exception {
+    String dest = uniqueName("iceberg_migrate_unpartitioned");
+    String source = uniqueName("test_migrate_unpartitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet location '%s' AS SELECT * FROM %s",
+        () -> Actions.migrate(source).as(dest),
+        2);
+  }
+
+  @Test
+  public void testMigrateReplace() throws Exception {
+    // We can't do a replacement unless we have overridden the spark_catalog
+    if (catalog.name().equals("spark_catalog")) {
+      String source = uniqueName(uniqueName("iceberg_migrate_replace"));
+      testCreate(source,
+          source,
+          "CREATE TABLE %s using parquet PARTITIONED BY (id) location '%s' AS 
SELECT * FROM %s",
+          () -> Actions.migrate(source),
+          3);
+    }
+  }
+
+  @Test
+  public void testSnapshotPartitioned() throws Exception {
+    File location = temp.newFolder();
+    String dest = uniqueName("iceberg_snapshot_partitioned");
+    String source = uniqueName("test_snapshot_partitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet PARTITIONED BY (id) location '%s' AS 
SELECT * FROM %s",
+        () -> Actions.snapshot(source, dest, location.toString()),
+        3);
+    testIsolatedSnapshot(source, dest);
+  }
+
+  @Test
+  public void testSnapshotUnpartitioned() throws Exception {
+    File location = temp.newFolder();
+    String dest = uniqueName("iceberg_snapshot_unpartitioned");
+    String source = uniqueName("test_snapshot_unpartitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet location '%s' AS SELECT * FROM %s",
+        () -> Actions.snapshot(source, dest, location.toString()),
+        2);
+    testIsolatedSnapshot(source, dest);
+  }
+
+  @Test
+  public void testSnapshotHiveTable() throws Exception {
+    File location = temp.newFolder();
+    String dest = uniqueName("iceberg_snapshot_hive_table");
+    String source = uniqueName("snapshot_hive_table");
+    testCreate(source,
+        dest,
+        String.format("CREATE EXTERNAL TABLE %s (id Int, data String) STORED 
AS parquet LOCATION '%s'", source,
+            tableLocation),
+        () -> Actions.snapshot(source, dest, location.toString()),
+        3);
+    testIsolatedSnapshot(source, dest);
+  }
+
+  @Test
+  public void testProperties() throws Exception {
+    String dest = uniqueName("iceberg_properties");
+    String source = uniqueName("test_properties_table");
+    Map<String, String> props = Maps.newHashMap();
+    props.put("city", "New Orleans");
+    props.put("note", "Jazz");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet location '%s' AS SELECT * FROM %s",
+        () -> Actions.migrate(source).as(dest)
+        .withAdditionalProperty("dogs", "sundance")
+        .withAdditionalProperties(props),
+        2);
+
+    SparkTable table = loadTable(dest);
+
+
+    Map<String, String> expectedProps = Maps.newHashMap();
+    expectedProps.putAll(props);
+    expectedProps.put("dogs", "sundance");
+
+    for (Map.Entry<String, String> entry : expectedProps.entrySet()) {
+      Assert.assertTrue(
+          "Created table missing property " + entry.getKey(),
+          table.properties().containsKey(entry.getKey()));
+      Assert.assertEquals("Property value is not the expected value",
+          entry.getValue(), table.properties().get(entry.getKey()));
+    }
+  }
+
+  private SparkTable loadTable(String name) throws NoSuchTableException {
+    return (SparkTable) 
catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, name).identifier());
+  }
+
+  // Creates a table, runs the migration command and checks the results.
+  private void testCreate(String source, String dest, String sqlCreate, 
Supplier<CreateAction> action,
+      long expectedMigratedFiles) throws

Review comment:
       Sgtm

##########
File path: 
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
+
+public class TestCreateActions extends SparkCatalogTestBase {
+
+  // Only valid for IcebergV2Catalog - Hadoop Catalog does not support Staged 
Tables
+  @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"spark_catalog", SparkSessionCatalog.class.getName(), 
ImmutableMap.of(
+            "type", "hive",
+            "default-namespace", "default",
+            "parquet-enabled", "true",
+            "cache-enabled", "false" // Spark will delete tables using v1, 
leaving the cache out of sync
+        )},
+        new Object[] { "testhive", SparkCatalog.class.getName(), 
ImmutableMap.of(
+            "type", "hive",
+            "default-namespace", "default"
+        )}
+    };
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  String baseTableName = "baseTable";
+  File tableDir;
+  String tableLocation;
+  final String implementation;
+  final String type;
+  final TableCatalog catalog;
+
+  public TestCreateActions(
+      String catalogName,
+      String implementation,
+      Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.catalog = (TableCatalog) 
spark.sessionState().catalogManager().catalog(catalogName);
+    this.implementation = implementation;
+    this.type = config.get("type");
+  }
+
+  @Before
+  public void before() {
+    try {
+      this.tableDir = temp.newFolder();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.tableLocation = tableDir.toURI().toString();
+
+    spark.conf().set("hive.exec.dynamic.partition", "true");
+    spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));
+
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data").orderBy("data").write()
+        .mode("append")
+        .option("path", tableLocation)
+        .saveAsTable(baseTableName);
+  }
+
+  @After
+  public void after() throws IOException {
+    // Drop the hive table.
+    spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));
+  }
+
+
+  @Test
+  public void testMigratePartitioned() throws Exception {
+    String dest = uniqueName("iceberg_migrate_partitioned");
+    String source = uniqueName("test_migrate_partitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet PARTITIONED BY (id) location '%s' AS 
SELECT * FROM %s",
+        () -> Actions.migrate(source).as(dest),
+        3);
+  }
+
+  @Test
+  public void testMigrateUnpartitioned() throws Exception {
+    String dest = uniqueName("iceberg_migrate_unpartitioned");
+    String source = uniqueName("test_migrate_unpartitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet location '%s' AS SELECT * FROM %s",
+        () -> Actions.migrate(source).as(dest),
+        2);
+  }
+
+  @Test
+  public void testMigrateReplace() throws Exception {
+    // We can't do a replacement unless we have overridden the spark_catalog
+    if (catalog.name().equals("spark_catalog")) {
+      String source = uniqueName(uniqueName("iceberg_migrate_replace"));
+      testCreate(source,
+          source,
+          "CREATE TABLE %s using parquet PARTITIONED BY (id) location '%s' AS 
SELECT * FROM %s",
+          () -> Actions.migrate(source),
+          3);
+    }
+  }
+
+  @Test
+  public void testSnapshotPartitioned() throws Exception {
+    File location = temp.newFolder();
+    String dest = uniqueName("iceberg_snapshot_partitioned");
+    String source = uniqueName("test_snapshot_partitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet PARTITIONED BY (id) location '%s' AS 
SELECT * FROM %s",
+        () -> Actions.snapshot(source, dest, location.toString()),
+        3);
+    testIsolatedSnapshot(source, dest);
+  }
+
+  @Test
+  public void testSnapshotUnpartitioned() throws Exception {
+    File location = temp.newFolder();
+    String dest = uniqueName("iceberg_snapshot_unpartitioned");
+    String source = uniqueName("test_snapshot_unpartitioned_table");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet location '%s' AS SELECT * FROM %s",
+        () -> Actions.snapshot(source, dest, location.toString()),
+        2);
+    testIsolatedSnapshot(source, dest);
+  }
+
+  @Test
+  public void testSnapshotHiveTable() throws Exception {
+    File location = temp.newFolder();
+    String dest = uniqueName("iceberg_snapshot_hive_table");
+    String source = uniqueName("snapshot_hive_table");
+    testCreate(source,
+        dest,
+        String.format("CREATE EXTERNAL TABLE %s (id Int, data String) STORED 
AS parquet LOCATION '%s'", source,
+            tableLocation),
+        () -> Actions.snapshot(source, dest, location.toString()),
+        3);
+    testIsolatedSnapshot(source, dest);
+  }
+
+  @Test
+  public void testProperties() throws Exception {
+    String dest = uniqueName("iceberg_properties");
+    String source = uniqueName("test_properties_table");
+    Map<String, String> props = Maps.newHashMap();
+    props.put("city", "New Orleans");
+    props.put("note", "Jazz");
+    testCreate(source,
+        dest,
+        "CREATE TABLE %s using parquet location '%s' AS SELECT * FROM %s",
+        () -> Actions.migrate(source).as(dest)
+        .withAdditionalProperty("dogs", "sundance")
+        .withAdditionalProperties(props),
+        2);
+
+    SparkTable table = loadTable(dest);
+
+
+    Map<String, String> expectedProps = Maps.newHashMap();
+    expectedProps.putAll(props);
+    expectedProps.put("dogs", "sundance");
+
+    for (Map.Entry<String, String> entry : expectedProps.entrySet()) {
+      Assert.assertTrue(
+          "Created table missing property " + entry.getKey(),
+          table.properties().containsKey(entry.getKey()));
+      Assert.assertEquals("Property value is not the expected value",
+          entry.getValue(), table.properties().get(entry.getKey()));
+    }
+  }
+
+  private SparkTable loadTable(String name) throws NoSuchTableException {
+    return (SparkTable) 
catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, name).identifier());
+  }
+
+  // Creates a table, runs the migration command and checks the results.
+  private void testCreate(String source, String dest, String sqlCreate, 
Supplier<CreateAction> action,
+      long expectedMigratedFiles) throws
+      Exception {
+
+    File location = temp.newFolder();
+
+    spark.sql(String.format(sqlCreate, source, location, baseTableName));
+
+    long migratedFiles = action.get().execute();
+
+    SparkTable table = loadTable(dest);
+
+    Assert.assertEquals("Provider should be iceberg", "iceberg",
+        table.properties().get(TableCatalog.PROP_PROVIDER));
+    Assert.assertEquals("Expected number of migrated files", 
expectedMigratedFiles, migratedFiles);
+    Assert.assertEquals("Expected rows in table ", 3, 
spark.table(dest).count());
+  }
+
+  // Inserts records into the destination, makes sure those records exist and 
source table is unchanged
+  private void testIsolatedSnapshot(String source, String dest) {
+    List<Row> expected = spark.sql(String.format("SELECT * FROM %s", 
source)).collectAsList();
+
+    List<SimpleRecord> extraData = Lists.newArrayList(
+        new SimpleRecord(4, "d")
+    );
+    Dataset<Row> df = spark.createDataFrame(extraData, SimpleRecord.class);
+    df.write().format("iceberg").mode("append").saveAsTable(dest);
+
+    List<Row> result = spark.sql(String.format("SELECT * FROM %s", 
source)).collectAsList();
+    Assert.assertEquals("No additional rows should be added to the original 
table", expected.size(),
+        result.size());
+
+    List<Row> snapshot = spark.sql(String.format("SELECT * FROM %s WHERE id = 
4 AND data = 'd'", dest)).collectAsList();
+    Assert.assertEquals("Added row not found in snapshot", 1, snapshot.size());
+  }
+
+  private String uniqueName(String source) {

Review comment:
       This is mostly because the tests take way too long when I was running 
them locally because of HiveMetastore retries. When every test had a unique 
name I didn't have to drop tables and I wouldn't have to wait for the metastore 
to check for 2 minutes to see whether the table existed when I called DROP IF 
EXISTS. 
   
   This is one of the reasons I brought up the retry length a while back, I 
just couldn't efficiently test this code on my local machine.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to