This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 360ab85 Spark: Fix describing v1 table with void transforms (#2454)
360ab85 is described below
commit 360ab856dc559c3f352d667bd9552f0c7c395a5c
Author: jun-he <[email protected]>
AuthorDate: Wed May 19 17:44:36 2021 -0700
Spark: Fix describing v1 table with void transforms (#2454)
---
.../extensions/TestAlterTablePartitionFields.java | 43 ++++++++++++++++++++++
.../java/org/apache/iceberg/spark/Spark3Util.java | 9 ++++-
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git
a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
index b016bb1..af9b837 100644
---
a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
+++
b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
@@ -22,6 +22,10 @@ package org.apache.iceberg.spark.extensions;
import java.util.Map;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.connector.catalog.CatalogManager;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -368,4 +372,43 @@ public class TestAlterTablePartitionFields extends
SparkExtensionsTestBase {
.build();
Assert.assertEquals("Should changed from daily to hourly partitioned
field", expected, table.spec());
}
+
+ @Test
+ public void testSparkTableAddDropPartitions() throws Exception {
+ sql("CREATE TABLE %s (id bigint NOT NULL, ts timestamp, data string) USING
iceberg", tableName);
+ Assert.assertEquals("spark table partition should be empty", 0,
sparkTable().partitioning().length);
+
+ sql("ALTER TABLE %s ADD PARTITION FIELD bucket(16, id) AS shard",
tableName);
+ assertPartitioningEquals(sparkTable(), 1, "bucket(16, id)");
+
+ sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName);
+ assertPartitioningEquals(sparkTable(), 2, "truncate(data, 4)");
+
+ sql("ALTER TABLE %s ADD PARTITION FIELD years(ts)", tableName);
+ assertPartitioningEquals(sparkTable(), 3, "years(ts)");
+
+ sql("ALTER TABLE %s DROP PARTITION FIELD years(ts)", tableName);
+ assertPartitioningEquals(sparkTable(), 2, "truncate(data, 4)");
+
+ sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName);
+ assertPartitioningEquals(sparkTable(), 1, "bucket(16, id)");
+
+ sql("ALTER TABLE %s DROP PARTITION FIELD shard", tableName);
+ sql("DESCRIBE %s", tableName);
+ Assert.assertEquals("spark table partition should be empty", 0,
sparkTable().partitioning().length);
+ }
+
+ private void assertPartitioningEquals(SparkTable table, int len, String
transform) {
+ Assert.assertEquals("spark table partition should be " + len, len,
table.partitioning().length);
+ Assert.assertEquals("latest spark table partition transform should match",
+ transform, table.partitioning()[len - 1].toString());
+ }
+
+ private SparkTable sparkTable() throws Exception {
+ validationCatalog.loadTable(tableIdent).refresh();
+ CatalogManager catalogManager = spark.sessionState().catalogManager();
+ TableCatalog catalog = (TableCatalog) catalogManager.catalog(catalogName);
+ Identifier identifier = Identifier.of(tableIdent.namespace().levels(),
tableIdent.name());
+ return (SparkTable) catalog.loadTable(identifier);
+ }
}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index 3f87a79..9a24b3e 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
@@ -293,12 +294,18 @@ public class Spark3Util {
}
@Override
+ public Transform alwaysNull(int fieldId, String sourceName, int
sourceId) {
+ // do nothing for alwaysNull, it doesn't need to be converted to a
transform
+ return null;
+ }
+
+ @Override
public Transform unknown(int fieldId, String sourceName, int
sourceId, String transform) {
return Expressions.apply(transform,
Expressions.column(sourceName));
}
});
- return transforms.toArray(new Transform[0]);
+ return
transforms.stream().filter(Objects::nonNull).toArray(Transform[]::new);
}
public static Distribution
buildRequiredDistribution(org.apache.iceberg.Table table) {