This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 2a3f87927b Spark: Fix stats in rewrite metadata action (#5691)
2a3f87927b is described below
commit 2a3f87927be9abfddcd5835621fe7c516a4877c4
Author: Ryan Blue <[email protected]>
AuthorDate: Thu Sep 1 23:20:57 2022 -0700
Spark: Fix stats in rewrite metadata action (#5691)
* Core: Don't show dropped fields from the partition spec
* Use projection instead
* Use StructProjection in SparkDataFile.
Co-authored-by: Fokko Driesprong <[email protected]>
---
.../extensions/TestRewriteManifestsProcedure.java | 29 +++++++++++
.../org/apache/iceberg/spark/SparkDataFile.java | 21 +++++++-
.../spark/actions/RewriteManifestsSparkAction.java | 59 +++++++++++++++++++---
.../extensions/TestRewriteManifestsProcedure.java | 29 +++++++++++
.../org/apache/iceberg/spark/SparkDataFile.java | 21 +++++++-
.../spark/actions/RewriteManifestsSparkAction.java | 59 +++++++++++++++++++---
6 files changed, 200 insertions(+), 18 deletions(-)
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 7c5ec1f5cf..4578820290 100644
---
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -20,6 +20,8 @@ package org.apache.iceberg.spark.extensions;
import static
org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
+import java.sql.Date;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
@@ -193,4 +195,31 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
}
+
+ @Test
+ public void testReplacePartitionField() {
+ sql(
+ "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg
PARTITIONED BY (day_of_ts)",
+ tableName);
+
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')",
tableName);
+ sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n",
tableName);
+ sql(
+ "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP),
CAST('2022-01-01' AS DATE))",
+ tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"),
Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+ sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName,
tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"),
Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+ }
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index 87e8318724..efef23be55 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
@@ -52,13 +53,29 @@ public class SparkDataFile implements DataFile {
private final Type keyMetadataType;
private final SparkStructLike wrappedPartition;
+ private final StructLike partitionProjection;
private Row wrapped;
public SparkDataFile(Types.StructType type, StructType sparkType) {
+ this(type, null, sparkType);
+ }
+
+ public SparkDataFile(
+ Types.StructType type, Types.StructType projectedType, StructType
sparkType) {
this.lowerBoundsType = type.fieldType("lower_bounds");
this.upperBoundsType = type.fieldType("upper_bounds");
this.keyMetadataType = type.fieldType("key_metadata");
- this.wrappedPartition = new
SparkStructLike(type.fieldType("partition").asStructType());
+
+ Types.StructType partitionType =
type.fieldType("partition").asStructType();
+ this.wrappedPartition = new SparkStructLike(partitionType);
+
+ if (projectedType != null) {
+ Types.StructType projectedPartitionType =
projectedType.fieldType("partition").asStructType();
+ this.partitionProjection =
+ StructProjection.create(partitionType,
projectedPartitionType).wrap(wrappedPartition);
+ } else {
+ this.partitionProjection = wrappedPartition;
+ }
Map<String, Integer> positions = Maps.newHashMap();
type.fields()
@@ -115,7 +132,7 @@ public class SparkDataFile implements DataFile {
@Override
public StructLike partition() {
- return wrappedPartition;
+ return partitionProjection;
}
@Override
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 1e0034eb30..43476d21da 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -209,6 +210,7 @@ public class RewriteManifestsSparkAction
Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we rely only on the target number of manifests for unpartitioned tables
// as we should not worry about having too much metadata per partition
@@ -217,7 +219,14 @@ public class RewriteManifestsSparkAction
return manifestEntryDF
.repartition(numManifests)
.mapPartitions(
- toManifests(io, maxNumManifestEntries, stagingLocation,
formatVersion, spec, sparkType),
+ toManifests(
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
manifestEncoder)
.collectAsList();
}
@@ -227,6 +236,7 @@ public class RewriteManifestsSparkAction
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we allow the actual size of manifests to be 10% higher if the
estimation is not precise
// enough
@@ -240,7 +250,13 @@ public class RewriteManifestsSparkAction
.sortWithinPartitions(partitionColumn)
.mapPartitions(
toManifests(
- io, maxNumManifestEntries, stagingLocation,
formatVersion, spec, sparkType),
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
manifestEncoder)
.collectAsList();
});
@@ -337,6 +353,7 @@ public class RewriteManifestsSparkAction
Broadcast<FileIO> io,
String location,
int format,
+ Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType)
throws IOException {
@@ -346,8 +363,9 @@ public class RewriteManifestsSparkAction
OutputFile outputFile =
io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
- Types.StructType dataFileType = DataFile.getType(spec.partitionType());
- SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+ Types.StructType combinedFileType =
DataFile.getType(combinedPartitionType);
+ Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+ SparkDataFile wrapper = new SparkDataFile(combinedFileType,
manifestFileType, sparkType);
ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec,
outputFile, null);
@@ -371,6 +389,7 @@ public class RewriteManifestsSparkAction
long maxNumManifestEntries,
String location,
int format,
+ Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType) {
@@ -384,14 +403,40 @@ public class RewriteManifestsSparkAction
List<ManifestFile> manifests = Lists.newArrayList();
if (rowsAsList.size() <= maxNumManifestEntries) {
manifests.add(
- writeManifest(rowsAsList, 0, rowsAsList.size(), io, location,
format, spec, sparkType));
+ writeManifest(
+ rowsAsList,
+ 0,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
} else {
int midIndex = rowsAsList.size() / 2;
manifests.add(
- writeManifest(rowsAsList, 0, midIndex, io, location, format, spec,
sparkType));
+ writeManifest(
+ rowsAsList,
+ 0,
+ midIndex,
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
manifests.add(
writeManifest(
- rowsAsList, midIndex, rowsAsList.size(), io, location, format,
spec, sparkType));
+ rowsAsList,
+ midIndex,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
}
return manifests.iterator();
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 7c5ec1f5cf..4578820290 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -20,6 +20,8 @@ package org.apache.iceberg.spark.extensions;
import static
org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
+import java.sql.Date;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
@@ -193,4 +195,31 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
}
+
+ @Test
+ public void testReplacePartitionField() {
+ sql(
+ "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg
PARTITIONED BY (day_of_ts)",
+ tableName);
+
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')",
tableName);
+ sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n",
tableName);
+ sql(
+ "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP),
CAST('2022-01-01' AS DATE))",
+ tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"),
Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+ sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName,
tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, Timestamp.valueOf("2022-01-01 10:00:00"),
Date.valueOf("2022-01-01"))),
+ sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+ }
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index 87e8318724..efef23be55 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
@@ -52,13 +53,29 @@ public class SparkDataFile implements DataFile {
private final Type keyMetadataType;
private final SparkStructLike wrappedPartition;
+ private final StructLike partitionProjection;
private Row wrapped;
public SparkDataFile(Types.StructType type, StructType sparkType) {
+ this(type, null, sparkType);
+ }
+
+ public SparkDataFile(
+ Types.StructType type, Types.StructType projectedType, StructType
sparkType) {
this.lowerBoundsType = type.fieldType("lower_bounds");
this.upperBoundsType = type.fieldType("upper_bounds");
this.keyMetadataType = type.fieldType("key_metadata");
- this.wrappedPartition = new
SparkStructLike(type.fieldType("partition").asStructType());
+
+ Types.StructType partitionType =
type.fieldType("partition").asStructType();
+ this.wrappedPartition = new SparkStructLike(partitionType);
+
+ if (projectedType != null) {
+ Types.StructType projectedPartitionType =
projectedType.fieldType("partition").asStructType();
+ this.partitionProjection =
+ StructProjection.create(partitionType,
projectedPartitionType).wrap(wrappedPartition);
+ } else {
+ this.partitionProjection = wrappedPartition;
+ }
Map<String, Integer> positions = Maps.newHashMap();
type.fields()
@@ -115,7 +132,7 @@ public class SparkDataFile implements DataFile {
@Override
public StructLike partition() {
- return wrappedPartition;
+ return partitionProjection;
}
@Override
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 1e0034eb30..43476d21da 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -209,6 +210,7 @@ public class RewriteManifestsSparkAction
Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we rely only on the target number of manifests for unpartitioned tables
// as we should not worry about having too much metadata per partition
@@ -217,7 +219,14 @@ public class RewriteManifestsSparkAction
return manifestEntryDF
.repartition(numManifests)
.mapPartitions(
- toManifests(io, maxNumManifestEntries, stagingLocation,
formatVersion, spec, sparkType),
+ toManifests(
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
manifestEncoder)
.collectAsList();
}
@@ -227,6 +236,7 @@ public class RewriteManifestsSparkAction
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
+ Types.StructType combinedPartitionType = Partitioning.partitionType(table);
// we allow the actual size of manifests to be 10% higher if the
estimation is not precise
// enough
@@ -240,7 +250,13 @@ public class RewriteManifestsSparkAction
.sortWithinPartitions(partitionColumn)
.mapPartitions(
toManifests(
- io, maxNumManifestEntries, stagingLocation,
formatVersion, spec, sparkType),
+ io,
+ maxNumManifestEntries,
+ stagingLocation,
+ formatVersion,
+ combinedPartitionType,
+ spec,
+ sparkType),
manifestEncoder)
.collectAsList();
});
@@ -337,6 +353,7 @@ public class RewriteManifestsSparkAction
Broadcast<FileIO> io,
String location,
int format,
+ Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType)
throws IOException {
@@ -346,8 +363,9 @@ public class RewriteManifestsSparkAction
OutputFile outputFile =
io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
- Types.StructType dataFileType = DataFile.getType(spec.partitionType());
- SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+ Types.StructType combinedFileType =
DataFile.getType(combinedPartitionType);
+ Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+ SparkDataFile wrapper = new SparkDataFile(combinedFileType,
manifestFileType, sparkType);
ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec,
outputFile, null);
@@ -371,6 +389,7 @@ public class RewriteManifestsSparkAction
long maxNumManifestEntries,
String location,
int format,
+ Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType) {
@@ -384,14 +403,40 @@ public class RewriteManifestsSparkAction
List<ManifestFile> manifests = Lists.newArrayList();
if (rowsAsList.size() <= maxNumManifestEntries) {
manifests.add(
- writeManifest(rowsAsList, 0, rowsAsList.size(), io, location,
format, spec, sparkType));
+ writeManifest(
+ rowsAsList,
+ 0,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
} else {
int midIndex = rowsAsList.size() / 2;
manifests.add(
- writeManifest(rowsAsList, 0, midIndex, io, location, format, spec,
sparkType));
+ writeManifest(
+ rowsAsList,
+ 0,
+ midIndex,
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
manifests.add(
writeManifest(
- rowsAsList, midIndex, rowsAsList.size(), io, location, format,
spec, sparkType));
+ rowsAsList,
+ midIndex,
+ rowsAsList.size(),
+ io,
+ location,
+ format,
+ combinedPartitionType,
+ spec,
+ sparkType));
}
return manifests.iterator();