aokolnychyi commented on a change in pull request #2087:
URL: https://github.com/apache/iceberg/pull/2087#discussion_r558968616
##########
File path:
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -70,6 +79,10 @@
private static final String NAMESPACE = "default";
+ // Error message
+ private static final String OUTPUT_DIFFERENT_ERROR_MSG =
Review comment:
nit: I think it is alright to use the error message in place. To stay on
one line, we can simplify the message like `Output must match` or anything
appropriate.
##########
File path:
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -157,6 +170,192 @@ public void testMigratePartitioned() throws Exception {
assertMigratedFileCount(Actions.migrate(source), source, dest);
}
+ @Test
+ public void testPartitionedTableWithUnRecoveredPartitions() throws Exception
{
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_unrecovered_partitions");
+ String dest = source;
+ File location = temp.newFolder();
+ sql(CREATE_PARTITIONED_PARQUET, source, location);
+
+ // Data generation and partition addition
+ spark.range(5)
+ .selectExpr("id", "cast(id as STRING) as data")
+ .write()
+ .partitionBy("id").mode(SaveMode.Overwrite)
+ .parquet(location.toURI().toString());
+ sql("ALTER TABLE %s ADD PARTITION(id=0)", source);
+
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testPartitionedTableWithCustomPartitions() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_custom_parts");
+ String dest = source;
+ File tblLocation = temp.newFolder();
+ File partitionDataLoc = temp.newFolder();
+
+ // Data generation and partition addition
+ spark.sql(String.format(CREATE_PARTITIONED_PARQUET, source, tblLocation));
Review comment:
nit: spark.sql -> sql
##########
File path:
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -157,6 +170,192 @@ public void testMigratePartitioned() throws Exception {
assertMigratedFileCount(Actions.migrate(source), source, dest);
}
+ @Test
+ public void testPartitionedTableWithUnRecoveredPartitions() throws Exception
{
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_unrecovered_partitions");
+ String dest = source;
+ File location = temp.newFolder();
+ sql(CREATE_PARTITIONED_PARQUET, source, location);
+
+ // Data generation and partition addition
+ spark.range(5)
+ .selectExpr("id", "cast(id as STRING) as data")
+ .write()
+ .partitionBy("id").mode(SaveMode.Overwrite)
+ .parquet(location.toURI().toString());
+ sql("ALTER TABLE %s ADD PARTITION(id=0)", source);
+
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testPartitionedTableWithCustomPartitions() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_custom_parts");
+ String dest = source;
+ File tblLocation = temp.newFolder();
+ File partitionDataLoc = temp.newFolder();
+
+ // Data generation and partition addition
+ spark.sql(String.format(CREATE_PARTITIONED_PARQUET, source, tblLocation));
+ spark.range(10)
+ .selectExpr("cast(id as STRING) as data")
+ .write()
+ .mode(SaveMode.Overwrite).parquet(partitionDataLoc.toURI().toString());
+ spark.sql(String.format("ALTER TABLE %s ADD PARTITION(id=0) LOCATION
'%s'", source,
+ partitionDataLoc.toURI().toString()));
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testAddColumnOnMigratedTable() throws Exception {
+ testAddColumnOnMigratedTableAtEnd();
+ testAddColumnOnMigratedTableAtMiddle();
+ }
+
+ private void testAddColumnOnMigratedTableAtEnd() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_add_column_migrated_table");
+ String dest = source;
+ createSourceTable(CREATE_PARQUET, source);
+ List<Object[]> expected1 = sql("select *, null from %s order by id", dest);
Review comment:
I think it would be more readable if this queried `source` even though
they are identical. As we are trying to ensure the output before and after
migration match.
##########
File path:
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -157,6 +170,192 @@ public void testMigratePartitioned() throws Exception {
assertMigratedFileCount(Actions.migrate(source), source, dest);
}
+ @Test
+ public void testPartitionedTableWithUnRecoveredPartitions() throws Exception
{
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_unrecovered_partitions");
+ String dest = source;
+ File location = temp.newFolder();
+ sql(CREATE_PARTITIONED_PARQUET, source, location);
+
+ // Data generation and partition addition
+ spark.range(5)
+ .selectExpr("id", "cast(id as STRING) as data")
+ .write()
+ .partitionBy("id").mode(SaveMode.Overwrite)
+ .parquet(location.toURI().toString());
+ sql("ALTER TABLE %s ADD PARTITION(id=0)", source);
+
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testPartitionedTableWithCustomPartitions() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_custom_parts");
+ String dest = source;
+ File tblLocation = temp.newFolder();
+ File partitionDataLoc = temp.newFolder();
+
+ // Data generation and partition addition
+ spark.sql(String.format(CREATE_PARTITIONED_PARQUET, source, tblLocation));
+ spark.range(10)
+ .selectExpr("cast(id as STRING) as data")
+ .write()
+ .mode(SaveMode.Overwrite).parquet(partitionDataLoc.toURI().toString());
+ spark.sql(String.format("ALTER TABLE %s ADD PARTITION(id=0) LOCATION
'%s'", source,
+ partitionDataLoc.toURI().toString()));
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testAddColumnOnMigratedTable() throws Exception {
+ testAddColumnOnMigratedTableAtEnd();
+ testAddColumnOnMigratedTableAtMiddle();
+ }
+
+ private void testAddColumnOnMigratedTableAtEnd() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_add_column_migrated_table");
+ String dest = source;
+ createSourceTable(CREATE_PARQUET, source);
+ List<Object[]> expected1 = sql("select *, null from %s order by id", dest);
+ List<Object[]> expected2 = sql("select *, null, null from %s order by id",
dest);
+
+ // migrate table
+ Actions.migrate(source).execute();
Review comment:
@karuppayya, am I correct that the source table is empty when we call
the migrate action?
##########
File path:
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -157,6 +170,192 @@ public void testMigratePartitioned() throws Exception {
assertMigratedFileCount(Actions.migrate(source), source, dest);
}
+ @Test
+ public void testPartitionedTableWithUnRecoveredPartitions() throws Exception
{
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_unrecovered_partitions");
+ String dest = source;
+ File location = temp.newFolder();
+ sql(CREATE_PARTITIONED_PARQUET, source, location);
+
+ // Data generation and partition addition
+ spark.range(5)
+ .selectExpr("id", "cast(id as STRING) as data")
+ .write()
+ .partitionBy("id").mode(SaveMode.Overwrite)
+ .parquet(location.toURI().toString());
+ sql("ALTER TABLE %s ADD PARTITION(id=0)", source);
+
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testPartitionedTableWithCustomPartitions() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_custom_parts");
+ String dest = source;
+ File tblLocation = temp.newFolder();
+ File partitionDataLoc = temp.newFolder();
+
+ // Data generation and partition addition
+ spark.sql(String.format(CREATE_PARTITIONED_PARQUET, source, tblLocation));
+ spark.range(10)
+ .selectExpr("cast(id as STRING) as data")
+ .write()
+ .mode(SaveMode.Overwrite).parquet(partitionDataLoc.toURI().toString());
+ spark.sql(String.format("ALTER TABLE %s ADD PARTITION(id=0) LOCATION
'%s'", source,
Review comment:
nit: spark.sql -> sql
##########
File path:
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -157,6 +170,192 @@ public void testMigratePartitioned() throws Exception {
assertMigratedFileCount(Actions.migrate(source), source, dest);
}
+ @Test
+ public void testPartitionedTableWithUnRecoveredPartitions() throws Exception
{
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_unrecovered_partitions");
+ String dest = source;
+ File location = temp.newFolder();
+ sql(CREATE_PARTITIONED_PARQUET, source, location);
+
+ // Data generation and partition addition
+ spark.range(5)
+ .selectExpr("id", "cast(id as STRING) as data")
+ .write()
+ .partitionBy("id").mode(SaveMode.Overwrite)
+ .parquet(location.toURI().toString());
+ sql("ALTER TABLE %s ADD PARTITION(id=0)", source);
+
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testPartitionedTableWithCustomPartitions() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_custom_parts");
+ String dest = source;
+ File tblLocation = temp.newFolder();
+ File partitionDataLoc = temp.newFolder();
+
+ // Data generation and partition addition
+ spark.sql(String.format(CREATE_PARTITIONED_PARQUET, source, tblLocation));
+ spark.range(10)
+ .selectExpr("cast(id as STRING) as data")
+ .write()
+ .mode(SaveMode.Overwrite).parquet(partitionDataLoc.toURI().toString());
+ spark.sql(String.format("ALTER TABLE %s ADD PARTITION(id=0) LOCATION
'%s'", source,
+ partitionDataLoc.toURI().toString()));
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testAddColumnOnMigratedTable() throws Exception {
+ testAddColumnOnMigratedTableAtEnd();
+ testAddColumnOnMigratedTableAtMiddle();
+ }
+
+ private void testAddColumnOnMigratedTableAtEnd() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_add_column_migrated_table");
+ String dest = source;
+ createSourceTable(CREATE_PARQUET, source);
+ List<Object[]> expected1 = sql("select *, null from %s order by id", dest);
+ List<Object[]> expected2 = sql("select *, null, null from %s order by id",
dest);
+
+ // migrate table
+ Actions.migrate(source).execute();
+ SparkTable sparkTable = loadTable(dest);
+ Table table = sparkTable.table();
+
+ // test column addition on migrated table
+ Schema beforeSchema = table.schema();
+ String newCol1 = "newCol1";
+ sparkTable.table().updateSchema().addColumn(newCol1,
Types.IntegerType.get()).commit();
+ Schema afterSchema = table.schema();
+ Assert.assertNull(beforeSchema.findField(newCol1));
+ Assert.assertNotNull(afterSchema.findField(newCol1));
+
+ // reads should succeed without any exceptions
+ List<Object[]> results1 = sql("select * from %s order by id", dest);
+ Assert.assertTrue(results1.size() > 0);
+ assertEquals(OUTPUT_DIFFERENT_ERROR_MSG, results1, expected1);
+
+ String newCol2 = "newCol2";
+ sql("ALTER TABLE %s ADD COLUMN %s INT", dest, newCol2);
+ StructType schema = spark.table(dest).schema();
+ Assert.assertTrue(Arrays.asList(schema.fieldNames()).contains(newCol2));
+
+ // reads should succeed without any exceptions
+ List<Object[]> results2 = sql("select * from %s order by id", dest);
+ Assert.assertTrue(results2.size() > 0);
+ assertEquals(OUTPUT_DIFFERENT_ERROR_MSG, results2, expected2);
+ }
+
+ private void testAddColumnOnMigratedTableAtMiddle() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_add_column_migrated_table_middle");
+ String dest = source;
+ createSourceTable(CREATE_PARQUET, source);
+
+ // migrate table
+ Actions.migrate(source).execute();
+ SparkTable sparkTable = loadTable(dest);
+ Table table = sparkTable.table();
+ List<Object[]> expected = sql("select id, null, data from %s order by id",
dest);
+
+ // test column addition on migrated table
+ Schema beforeSchema = table.schema();
+ String newCol1 = "newCol";
+ sparkTable.table().updateSchema().addColumn("newCol",
Types.IntegerType.get())
+ .moveAfter(newCol1, "id")
+ .commit();
+ Schema afterSchema = table.schema();
+ Assert.assertNull(beforeSchema.findField(newCol1));
+ Assert.assertNotNull(afterSchema.findField(newCol1));
+
+ // reads should succeed
+ List<Object[]> results = sql("select * from %s order by id", dest);
+ Assert.assertTrue(results.size() > 0);
+ assertEquals(OUTPUT_DIFFERENT_ERROR_MSG, results, expected);
+ }
+
+ @Test
+ public void testRemoveColumnOnMigratedTable() throws Exception {
+ removeColumnsAtEnd();
Review comment:
Same here: deserves two tests
##########
File path:
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -157,6 +170,192 @@ public void testMigratePartitioned() throws Exception {
assertMigratedFileCount(Actions.migrate(source), source, dest);
}
+ @Test
+ public void testPartitionedTableWithUnRecoveredPartitions() throws Exception
{
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_unrecovered_partitions");
+ String dest = source;
+ File location = temp.newFolder();
+ sql(CREATE_PARTITIONED_PARQUET, source, location);
+
+ // Data generation and partition addition
+ spark.range(5)
+ .selectExpr("id", "cast(id as STRING) as data")
+ .write()
+ .partitionBy("id").mode(SaveMode.Overwrite)
+ .parquet(location.toURI().toString());
+ sql("ALTER TABLE %s ADD PARTITION(id=0)", source);
+
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testPartitionedTableWithCustomPartitions() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_custom_parts");
+ String dest = source;
+ File tblLocation = temp.newFolder();
+ File partitionDataLoc = temp.newFolder();
+
+ // Data generation and partition addition
+ spark.sql(String.format(CREATE_PARTITIONED_PARQUET, source, tblLocation));
+ spark.range(10)
+ .selectExpr("cast(id as STRING) as data")
+ .write()
+ .mode(SaveMode.Overwrite).parquet(partitionDataLoc.toURI().toString());
+ spark.sql(String.format("ALTER TABLE %s ADD PARTITION(id=0) LOCATION
'%s'", source,
+ partitionDataLoc.toURI().toString()));
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testAddColumnOnMigratedTable() throws Exception {
+ testAddColumnOnMigratedTableAtEnd();
+ testAddColumnOnMigratedTableAtMiddle();
+ }
+
+ private void testAddColumnOnMigratedTableAtEnd() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_add_column_migrated_table");
+ String dest = source;
+ createSourceTable(CREATE_PARQUET, source);
+ List<Object[]> expected1 = sql("select *, null from %s order by id", dest);
+ List<Object[]> expected2 = sql("select *, null, null from %s order by id",
dest);
+
+ // migrate table
+ Actions.migrate(source).execute();
+ SparkTable sparkTable = loadTable(dest);
+ Table table = sparkTable.table();
+
+ // test column addition on migrated table
+ Schema beforeSchema = table.schema();
+ String newCol1 = "newCol1";
+ sparkTable.table().updateSchema().addColumn(newCol1,
Types.IntegerType.get()).commit();
+ Schema afterSchema = table.schema();
+ Assert.assertNull(beforeSchema.findField(newCol1));
+ Assert.assertNotNull(afterSchema.findField(newCol1));
+
+ // reads should succeed without any exceptions
+ List<Object[]> results1 = sql("select * from %s order by id", dest);
+ Assert.assertTrue(results1.size() > 0);
+ assertEquals(OUTPUT_DIFFERENT_ERROR_MSG, results1, expected1);
+
+ String newCol2 = "newCol2";
+ sql("ALTER TABLE %s ADD COLUMN %s INT", dest, newCol2);
+ StructType schema = spark.table(dest).schema();
+ Assert.assertTrue(Arrays.asList(schema.fieldNames()).contains(newCol2));
+
+ // reads should succeed without any exceptions
+ List<Object[]> results2 = sql("select * from %s order by id", dest);
+ Assert.assertTrue(results2.size() > 0);
+ assertEquals(OUTPUT_DIFFERENT_ERROR_MSG, results2, expected2);
+ }
+
+ private void testAddColumnOnMigratedTableAtMiddle() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_add_column_migrated_table_middle");
+ String dest = source;
+ createSourceTable(CREATE_PARQUET, source);
+
+ // migrate table
+ Actions.migrate(source).execute();
+ SparkTable sparkTable = loadTable(dest);
+ Table table = sparkTable.table();
+ List<Object[]> expected = sql("select id, null, data from %s order by id",
dest);
+
+ // test column addition on migrated table
+ Schema beforeSchema = table.schema();
+ String newCol1 = "newCol";
+ sparkTable.table().updateSchema().addColumn("newCol",
Types.IntegerType.get())
+ .moveAfter(newCol1, "id")
+ .commit();
+ Schema afterSchema = table.schema();
+ Assert.assertNull(beforeSchema.findField(newCol1));
+ Assert.assertNotNull(afterSchema.findField(newCol1));
+
+ // reads should succeed
+ List<Object[]> results = sql("select * from %s order by id", dest);
+ Assert.assertTrue(results.size() > 0);
+ assertEquals(OUTPUT_DIFFERENT_ERROR_MSG, results, expected);
+ }
+
+ @Test
+ public void testRemoveColumnOnMigratedTable() throws Exception {
+ removeColumnsAtEnd();
+ removeColumnFromMiddle();
+ }
+
+ private void removeColumnsAtEnd() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_remove_column_migrated_table");
+ String dest = source;
+
+ String colName1 = "newCol1";
+ String colName2 = "newCol2";
+ File location = temp.newFolder();
+ spark.range(10).selectExpr("cast(id as INT)", "CAST(id as INT) " +
colName1, "CAST(id as INT) " + colName2)
+ .write()
+ .mode(SaveMode.Overwrite).saveAsTable(dest);
+ List<Object[]> expected1 = sql("select id, %s from %s order by id",
colName1, dest);
Review comment:
Same here: we better query `source` so that it is easier to follow.
##########
File path:
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -157,6 +168,105 @@ public void testMigratePartitioned() throws Exception {
assertMigratedFileCount(Actions.migrate(source), source, dest);
}
+ @Test
+ public void testPartitionedTableWithUnRecoveredPartitions() throws Exception
{
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_unrecovered_partitions");
+ String dest = source;
+ File location = temp.newFolder();
+ spark.sql(String.format(CREATE_PARTITIONED_PARQUET, source, location));
+ spark.range(5)
+ .selectExpr("id", "cast(id as STRING) as data")
+ .write()
+ .partitionBy("id").mode("overwrite")
+ .parquet(location.toURI().toString());
+ spark.sql(String.format("alter table %s add partition(id=0)", source));
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testPartitionedTableWithCustomPartitions() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_custom_parts");
+ String dest = source;
+ File tblLocation = temp.newFolder();
+ File partitionDataLoc = temp.newFolder();
+ spark.sql(String.format(CREATE_PARTITIONED_PARQUET, source, tblLocation));
+ spark.range(10)
+ .selectExpr("cast(id as STRING) as data")
+ .write()
+ .mode("overwrite").parquet(partitionDataLoc.toURI().toString());
+ spark.sql(String.format("alter table %s add partition(id=0) LOCATION
'%s'", source,
+ partitionDataLoc.toURI().toString()));
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+
+ @Test
+ public void testAddColumnOnMigratedTable() throws Exception {
Review comment:
Since methods are independent, I think we better have two tests here.
What do you think, @karuppayya?
##########
File path:
spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
##########
@@ -157,6 +170,192 @@ public void testMigratePartitioned() throws Exception {
assertMigratedFileCount(Actions.migrate(source), source, dest);
}
+ @Test
+ public void testPartitionedTableWithUnRecoveredPartitions() throws Exception
{
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_unrecovered_partitions");
+ String dest = source;
+ File location = temp.newFolder();
+ sql(CREATE_PARTITIONED_PARQUET, source, location);
+
+ // Data generation and partition addition
+ spark.range(5)
+ .selectExpr("id", "cast(id as STRING) as data")
+ .write()
+ .partitionBy("id").mode(SaveMode.Overwrite)
+ .parquet(location.toURI().toString());
+ sql("ALTER TABLE %s ADD PARTITION(id=0)", source);
+
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testPartitionedTableWithCustomPartitions() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_custom_parts");
+ String dest = source;
+ File tblLocation = temp.newFolder();
+ File partitionDataLoc = temp.newFolder();
+
+ // Data generation and partition addition
+ spark.sql(String.format(CREATE_PARTITIONED_PARQUET, source, tblLocation));
+ spark.range(10)
+ .selectExpr("cast(id as STRING) as data")
+ .write()
+ .mode(SaveMode.Overwrite).parquet(partitionDataLoc.toURI().toString());
+ spark.sql(String.format("ALTER TABLE %s ADD PARTITION(id=0) LOCATION
'%s'", source,
+ partitionDataLoc.toURI().toString()));
+ assertMigratedFileCount(Actions.migrate(source), source, dest);
+ }
+
+ @Test
+ public void testAddColumnOnMigratedTable() throws Exception {
+ testAddColumnOnMigratedTableAtEnd();
+ testAddColumnOnMigratedTableAtMiddle();
+ }
+
+ private void testAddColumnOnMigratedTableAtEnd() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_add_column_migrated_table");
+ String dest = source;
+ createSourceTable(CREATE_PARQUET, source);
+ List<Object[]> expected1 = sql("select *, null from %s order by id", dest);
+ List<Object[]> expected2 = sql("select *, null, null from %s order by id",
dest);
+
+ // migrate table
+ Actions.migrate(source).execute();
+ SparkTable sparkTable = loadTable(dest);
+ Table table = sparkTable.table();
+
+ // test column addition on migrated table
+ Schema beforeSchema = table.schema();
+ String newCol1 = "newCol1";
+ sparkTable.table().updateSchema().addColumn(newCol1,
Types.IntegerType.get()).commit();
+ Schema afterSchema = table.schema();
+ Assert.assertNull(beforeSchema.findField(newCol1));
+ Assert.assertNotNull(afterSchema.findField(newCol1));
+
+ // reads should succeed without any exceptions
+ List<Object[]> results1 = sql("select * from %s order by id", dest);
+ Assert.assertTrue(results1.size() > 0);
+ assertEquals(OUTPUT_DIFFERENT_ERROR_MSG, results1, expected1);
+
+ String newCol2 = "newCol2";
+ sql("ALTER TABLE %s ADD COLUMN %s INT", dest, newCol2);
+ StructType schema = spark.table(dest).schema();
+ Assert.assertTrue(Arrays.asList(schema.fieldNames()).contains(newCol2));
+
+ // reads should succeed without any exceptions
+ List<Object[]> results2 = sql("select * from %s order by id", dest);
+ Assert.assertTrue(results2.size() > 0);
+ assertEquals(OUTPUT_DIFFERENT_ERROR_MSG, results2, expected2);
+ }
+
+ private void testAddColumnOnMigratedTableAtMiddle() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue("Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_add_column_migrated_table_middle");
+ String dest = source;
+ createSourceTable(CREATE_PARQUET, source);
+
+ // migrate table
+ Actions.migrate(source).execute();
Review comment:
Same here: the source seems empty
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]