This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7a247fdb44 Core: Avoid reading ManifestFile when creating
ManifestReader (#5632)
7a247fdb44 is described below
commit 7a247fdb44110363bd9e87d1ace91497fd72ba92
Author: Xianyang Liu <[email protected]>
AuthorDate: Fri Nov 4 08:54:35 2022 +0800
Core: Avoid reading ManifestFile when creating ManifestReader (#5632)
---
.../org/apache/iceberg/BaseRewriteManifests.java | 8 +++++-
.../main/java/org/apache/iceberg/FastAppend.java | 1 +
.../org/apache/iceberg/FileCleanupStrategy.java | 6 ++++-
.../java/org/apache/iceberg/ManifestFiles.java | 12 ++++++---
.../java/org/apache/iceberg/ManifestReader.java | 30 ++++++++++++----------
.../apache/iceberg/MergingSnapshotProducer.java | 1 +
6 files changed, 39 insertions(+), 19 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 816bc0c8a7..54bf3c6e44 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -164,7 +164,13 @@ public class BaseRewriteManifests extends
SnapshotProducer<RewriteManifests>
InputFile toCopy = ops.io().newInputFile(manifest.path());
OutputFile newFile = newManifestOutput();
return ManifestFiles.copyRewriteManifest(
- current.formatVersion(), toCopy, specsById, newFile, snapshotId(),
summaryBuilder);
+ current.formatVersion(),
+ manifest.partitionSpecId(),
+ toCopy,
+ specsById,
+ newFile,
+ snapshotId(),
+ summaryBuilder);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 1979f633f5..5e5e512841 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -133,6 +133,7 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
OutputFile newManifestPath = newManifestOutput();
return ManifestFiles.copyAppendManifest(
current.formatVersion(),
+ manifest.partitionSpecId(),
toCopy,
current.specsById(),
newManifestPath,
diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
index 9cd002e020..73429a62da 100644
--- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
@@ -54,7 +54,11 @@ abstract class FileCleanupStrategy {
private static final Schema MANIFEST_PROJECTION =
ManifestFile.schema()
.select(
- "manifest_path", "manifest_length", "added_snapshot_id",
"deleted_data_files_count");
+ "manifest_path",
+ "manifest_length",
+ "partition_spec_id",
+ "added_snapshot_id",
+ "deleted_data_files_count");
protected CloseableIterable<ManifestFile> readManifests(Snapshot snapshot) {
if (snapshot.manifestListLocation() != null) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index b4de239c26..0155610af4 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -127,7 +127,8 @@ public class ManifestFiles {
manifest);
InputFile file = newInputFile(io, manifest.path(), manifest.length());
InheritableMetadata inheritableMetadata =
InheritableMetadataFactory.fromManifest(manifest);
- return new ManifestReader<>(file, specsById, inheritableMetadata,
FileType.DATA_FILES);
+ return new ManifestReader<>(
+ file, manifest.partitionSpecId(), specsById, inheritableMetadata,
FileType.DATA_FILES);
}
/**
@@ -181,7 +182,8 @@ public class ManifestFiles {
manifest);
InputFile file = newInputFile(io, manifest.path(), manifest.length());
InheritableMetadata inheritableMetadata =
InheritableMetadataFactory.fromManifest(manifest);
- return new ManifestReader<>(file, specsById, inheritableMetadata,
FileType.DELETE_FILES);
+ return new ManifestReader<>(
+ file, manifest.partitionSpecId(), specsById, inheritableMetadata,
FileType.DELETE_FILES);
}
/**
@@ -248,6 +250,7 @@ public class ManifestFiles {
static ManifestFile copyAppendManifest(
int formatVersion,
+ int specId,
InputFile toCopy,
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
@@ -256,7 +259,7 @@ public class ManifestFiles {
// use metadata that will add the current snapshot's ID for the rewrite
InheritableMetadata inheritableMetadata =
InheritableMetadataFactory.forCopy(snapshotId);
try (ManifestReader<DataFile> reader =
- new ManifestReader<>(toCopy, specsById, inheritableMetadata,
FileType.DATA_FILES)) {
+ new ManifestReader<>(toCopy, specId, specsById, inheritableMetadata,
FileType.DATA_FILES)) {
return copyManifestInternal(
formatVersion,
reader,
@@ -271,6 +274,7 @@ public class ManifestFiles {
static ManifestFile copyRewriteManifest(
int formatVersion,
+ int specId,
InputFile toCopy,
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
@@ -280,7 +284,7 @@ public class ManifestFiles {
// exception if it is not
InheritableMetadata inheritableMetadata =
InheritableMetadataFactory.empty();
try (ManifestReader<DataFile> reader =
- new ManifestReader<>(toCopy, specsById, inheritableMetadata,
FileType.DATA_FILES)) {
+ new ManifestReader<>(toCopy, specId, specsById, inheritableMetadata,
FileType.DATA_FILES)) {
return copyManifestInternal(
formatVersion,
reader,
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index aa203c1044..aef8b53546 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -82,7 +82,6 @@ public class ManifestReader<F extends ContentFile<F>> extends
CloseableGroup
private final InputFile file;
private final InheritableMetadata inheritableMetadata;
private final FileType content;
- private final Map<String, String> metadata;
private final PartitionSpec spec;
private final Schema fileSchema;
@@ -101,6 +100,7 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
protected ManifestReader(
InputFile file,
+ int specId,
Map<Integer, PartitionSpec> specsById,
InheritableMetadata inheritableMetadata,
FileType content) {
@@ -108,13 +108,24 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
this.inheritableMetadata = inheritableMetadata;
this.content = content;
+ if (specsById != null) {
+ this.spec = specsById.get(specId);
+ } else {
+ this.spec = readPartitionSpec(file);
+ }
+
+ this.fileSchema = new
Schema(DataFile.getType(spec.partitionType()).fields());
+ }
+
+ private <T extends ContentFile<T>> PartitionSpec readPartitionSpec(InputFile
inputFile) {
+ Map<String, String> metadata;
try {
- try (AvroIterable<ManifestEntry<F>> headerReader =
- Avro.read(file)
+ try (AvroIterable<ManifestEntry<T>> headerReader =
+ Avro.read(inputFile)
.project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
.classLoader(GenericManifestEntry.class.getClassLoader())
.build()) {
- this.metadata = headerReader.getMetadata();
+ metadata = headerReader.getMetadata();
}
} catch (IOException e) {
throw new RuntimeIOException(e);
@@ -126,15 +137,8 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
specId = Integer.parseInt(specProperty);
}
- if (specsById != null) {
- this.spec = specsById.get(specId);
- } else {
- Schema schema = SchemaParser.fromJson(metadata.get("schema"));
- this.spec =
- PartitionSpecParser.fromJsonFields(schema, specId,
metadata.get("partition-spec"));
- }
-
- this.fileSchema = new
Schema(DataFile.getType(spec.partitionType()).fields());
+ Schema schema = SchemaParser.fromJson(metadata.get("schema"));
+ return PartitionSpecParser.fromJsonFields(schema, specId,
metadata.get("partition-spec"));
}
public boolean isDeleteManifestReader() {
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index b82244f071..e0452e0be2 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -260,6 +260,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
OutputFile newManifestPath = newManifestOutput();
return ManifestFiles.copyAppendManifest(
current.formatVersion(),
+ manifest.partitionSpecId(),
toCopy,
current.specsById(),
newManifestPath,