singhpk234 commented on code in PR #8931:
URL: https://github.com/apache/iceberg/pull/8931#discussion_r1374769692


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java:
##########
@@ -108,6 +109,23 @@ public MigrateTableSparkAction backupTableName(String 
tableName) {
     return this;
   }
 
+  @Override
+  public MigrateTableSparkAction destCatalogName(String catalogName) {
+    CatalogManager catalogManager = spark().sessionState().catalogManager();
+
+    CatalogPlugin catalogPlugin;
+    if (catalogManager.isCatalogRegistered(catalogName)) {
+      catalogPlugin = catalogManager.catalog(catalogName);
+    } else {
+      LOG.warn(
+          "{} doesn't exist in SparkSession. " + "Fallback to current 
SparkSession catalog.",
+          catalogName);
+      catalogPlugin = catalogManager.currentCatalog();
+    }
+    this.destCatalog = checkDestinationCatalog(catalogPlugin);

Review Comment:
   QQ: earlier we use to use sourceCatalog as destCatalog too, was this a 
problem ? can you please add more comments as to why sourceCatalog was picked 
as `spark_catalog` rather than the `glue_catalog` ?? since we were calling the 
migrate procedure from glue_catalog ? 



##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java:
##########
@@ -142,6 +142,48 @@ public void testMigrateWithBackupTableName() throws 
IOException {
     Assertions.assertThat(spark.catalog().tableExists(dbName + "." + 
backupTableName)).isTrue();
   }
 
+  @Test
+  public void testMigrateWithDestCatalogName() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+
+    spark
+        .conf()
+        .set("spark.sql.catalog.spark_catalog", 
"org.apache.iceberg.spark.SparkSessionCatalog");
+
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    Object result =
+        scalarSql(
+            "CALL %s.system.migrate(table => '%s', drop_backup => false, 
dest_catalog_name => '%s')",
+            catalogName, tableName, catalogName);
+    Assertions.assertThat(result).isEqualTo(1L);
+    Assertions.assertThat(spark.catalog().tableExists(tableName + 
"_BACKUP_")).isTrue();
+  }
+
+  @Test
+  public void testMigrateWithDestCatalogNameWithNonExistingCatalog() throws 
IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+
+    String destCatalogName = "non_existing_catalog";
+
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    Object result =
+        scalarSql(
+            "CALL %s.system.migrate(table => '%s', drop_backup => false, 
dest_catalog_name => '%s')",
+            catalogName, tableName, destCatalogName);
+    Assertions.assertThat(result).isEqualTo(1L);
+    Assertions.assertThat(spark.catalog().tableExists(tableName + 
"_BACKUP_")).isTrue();

Review Comment:
   should check the warning log in this case, was wondering if giving incorrect 
destCatalogName should be thrown as Invalid input



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to