singhpk234 commented on code in PR #8931:
URL: https://github.com/apache/iceberg/pull/8931#discussion_r1374769692
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java:
##########
@@ -108,6 +109,23 @@ public MigrateTableSparkAction backupTableName(String
tableName) {
return this;
}
+ @Override
+ public MigrateTableSparkAction destCatalogName(String catalogName) {
+ CatalogManager catalogManager = spark().sessionState().catalogManager();
+
+ CatalogPlugin catalogPlugin;
+ if (catalogManager.isCatalogRegistered(catalogName)) {
+ catalogPlugin = catalogManager.catalog(catalogName);
+ } else {
+ LOG.warn(
+ "{} doesn't exist in SparkSession. " + "Fallback to current
SparkSession catalog.",
+ catalogName);
+ catalogPlugin = catalogManager.currentCatalog();
+ }
+ this.destCatalog = checkDestinationCatalog(catalogPlugin);
Review Comment:
QQ: earlier we use to use sourceCatalog as destCatalog too, was this a
problem ? can you please add more comments as to why sourceCatalog was picked
as `spark_catalog` rather than the `glue_catalog` ?? since we were calling the
migrate procedure from glue_catalog ?
##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java:
##########
@@ -142,6 +142,48 @@ public void testMigrateWithBackupTableName() throws
IOException {
Assertions.assertThat(spark.catalog().tableExists(dbName + "." +
backupTableName)).isTrue();
}
+ @Test
+ public void testMigrateWithDestCatalogName() throws IOException {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+
+ spark
+ .conf()
+ .set("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog");
+
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ Object result =
+ scalarSql(
+ "CALL %s.system.migrate(table => '%s', drop_backup => false,
dest_catalog_name => '%s')",
+ catalogName, tableName, catalogName);
+ Assertions.assertThat(result).isEqualTo(1L);
+ Assertions.assertThat(spark.catalog().tableExists(tableName +
"_BACKUP_")).isTrue();
+ }
+
+ @Test
+ public void testMigrateWithDestCatalogNameWithNonExistingCatalog() throws
IOException {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+
+ String destCatalogName = "non_existing_catalog";
+
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ Object result =
+ scalarSql(
+ "CALL %s.system.migrate(table => '%s', drop_backup => false,
dest_catalog_name => '%s')",
+ catalogName, tableName, destCatalogName);
+ Assertions.assertThat(result).isEqualTo(1L);
+ Assertions.assertThat(spark.catalog().tableExists(tableName +
"_BACKUP_")).isTrue();
Review Comment:
should check the warning log in this case, was wondering if giving incorrect
destCatalogName should be thrown as Invalid input
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]