This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a247fdb44 Core: Avoid reading ManifestFile when creating 
ManifestReader (#5632)
7a247fdb44 is described below

commit 7a247fdb44110363bd9e87d1ace91497fd72ba92
Author: Xianyang Liu <[email protected]>
AuthorDate: Fri Nov 4 08:54:35 2022 +0800

    Core: Avoid reading ManifestFile when creating ManifestReader (#5632)
---
 .../org/apache/iceberg/BaseRewriteManifests.java   |  8 +++++-
 .../main/java/org/apache/iceberg/FastAppend.java   |  1 +
 .../org/apache/iceberg/FileCleanupStrategy.java    |  6 ++++-
 .../java/org/apache/iceberg/ManifestFiles.java     | 12 ++++++---
 .../java/org/apache/iceberg/ManifestReader.java    | 30 ++++++++++++----------
 .../apache/iceberg/MergingSnapshotProducer.java    |  1 +
 6 files changed, 39 insertions(+), 19 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java 
b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 816bc0c8a7..54bf3c6e44 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -164,7 +164,13 @@ public class BaseRewriteManifests extends 
SnapshotProducer<RewriteManifests>
     InputFile toCopy = ops.io().newInputFile(manifest.path());
     OutputFile newFile = newManifestOutput();
     return ManifestFiles.copyRewriteManifest(
-        current.formatVersion(), toCopy, specsById, newFile, snapshotId(), 
summaryBuilder);
+        current.formatVersion(),
+        manifest.partitionSpecId(),
+        toCopy,
+        specsById,
+        newFile,
+        snapshotId(),
+        summaryBuilder);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java 
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 1979f633f5..5e5e512841 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -133,6 +133,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> 
implements AppendFiles {
     OutputFile newManifestPath = newManifestOutput();
     return ManifestFiles.copyAppendManifest(
         current.formatVersion(),
+        manifest.partitionSpecId(),
         toCopy,
         current.specsById(),
         newManifestPath,
diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java 
b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
index 9cd002e020..73429a62da 100644
--- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
@@ -54,7 +54,11 @@ abstract class FileCleanupStrategy {
   private static final Schema MANIFEST_PROJECTION =
       ManifestFile.schema()
           .select(
-              "manifest_path", "manifest_length", "added_snapshot_id", 
"deleted_data_files_count");
+              "manifest_path",
+              "manifest_length",
+              "partition_spec_id",
+              "added_snapshot_id",
+              "deleted_data_files_count");
 
   protected CloseableIterable<ManifestFile> readManifests(Snapshot snapshot) {
     if (snapshot.manifestListLocation() != null) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java 
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index b4de239c26..0155610af4 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -127,7 +127,8 @@ public class ManifestFiles {
         manifest);
     InputFile file = newInputFile(io, manifest.path(), manifest.length());
     InheritableMetadata inheritableMetadata = 
InheritableMetadataFactory.fromManifest(manifest);
-    return new ManifestReader<>(file, specsById, inheritableMetadata, 
FileType.DATA_FILES);
+    return new ManifestReader<>(
+        file, manifest.partitionSpecId(), specsById, inheritableMetadata, 
FileType.DATA_FILES);
   }
 
   /**
@@ -181,7 +182,8 @@ public class ManifestFiles {
         manifest);
     InputFile file = newInputFile(io, manifest.path(), manifest.length());
     InheritableMetadata inheritableMetadata = 
InheritableMetadataFactory.fromManifest(manifest);
-    return new ManifestReader<>(file, specsById, inheritableMetadata, 
FileType.DELETE_FILES);
+    return new ManifestReader<>(
+        file, manifest.partitionSpecId(), specsById, inheritableMetadata, 
FileType.DELETE_FILES);
   }
 
   /**
@@ -248,6 +250,7 @@ public class ManifestFiles {
 
   static ManifestFile copyAppendManifest(
       int formatVersion,
+      int specId,
       InputFile toCopy,
       Map<Integer, PartitionSpec> specsById,
       OutputFile outputFile,
@@ -256,7 +259,7 @@ public class ManifestFiles {
     // use metadata that will add the current snapshot's ID for the rewrite
     InheritableMetadata inheritableMetadata = 
InheritableMetadataFactory.forCopy(snapshotId);
     try (ManifestReader<DataFile> reader =
-        new ManifestReader<>(toCopy, specsById, inheritableMetadata, 
FileType.DATA_FILES)) {
+        new ManifestReader<>(toCopy, specId, specsById, inheritableMetadata, 
FileType.DATA_FILES)) {
       return copyManifestInternal(
           formatVersion,
           reader,
@@ -271,6 +274,7 @@ public class ManifestFiles {
 
   static ManifestFile copyRewriteManifest(
       int formatVersion,
+      int specId,
       InputFile toCopy,
       Map<Integer, PartitionSpec> specsById,
       OutputFile outputFile,
@@ -280,7 +284,7 @@ public class ManifestFiles {
     // exception if it is not
     InheritableMetadata inheritableMetadata = 
InheritableMetadataFactory.empty();
     try (ManifestReader<DataFile> reader =
-        new ManifestReader<>(toCopy, specsById, inheritableMetadata, 
FileType.DATA_FILES)) {
+        new ManifestReader<>(toCopy, specId, specsById, inheritableMetadata, 
FileType.DATA_FILES)) {
       return copyManifestInternal(
           formatVersion,
           reader,
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java 
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index aa203c1044..aef8b53546 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -82,7 +82,6 @@ public class ManifestReader<F extends ContentFile<F>> extends 
CloseableGroup
   private final InputFile file;
   private final InheritableMetadata inheritableMetadata;
   private final FileType content;
-  private final Map<String, String> metadata;
   private final PartitionSpec spec;
   private final Schema fileSchema;
 
@@ -101,6 +100,7 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
 
   protected ManifestReader(
       InputFile file,
+      int specId,
       Map<Integer, PartitionSpec> specsById,
       InheritableMetadata inheritableMetadata,
       FileType content) {
@@ -108,13 +108,24 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
     this.inheritableMetadata = inheritableMetadata;
     this.content = content;
 
+    if (specsById != null) {
+      this.spec = specsById.get(specId);
+    } else {
+      this.spec = readPartitionSpec(file);
+    }
+
+    this.fileSchema = new 
Schema(DataFile.getType(spec.partitionType()).fields());
+  }
+
+  private <T extends ContentFile<T>> PartitionSpec readPartitionSpec(InputFile 
inputFile) {
+    Map<String, String> metadata;
     try {
-      try (AvroIterable<ManifestEntry<F>> headerReader =
-          Avro.read(file)
+      try (AvroIterable<ManifestEntry<T>> headerReader =
+          Avro.read(inputFile)
               
.project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
               .classLoader(GenericManifestEntry.class.getClassLoader())
               .build()) {
-        this.metadata = headerReader.getMetadata();
+        metadata = headerReader.getMetadata();
       }
     } catch (IOException e) {
       throw new RuntimeIOException(e);
@@ -126,15 +137,8 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
       specId = Integer.parseInt(specProperty);
     }
 
-    if (specsById != null) {
-      this.spec = specsById.get(specId);
-    } else {
-      Schema schema = SchemaParser.fromJson(metadata.get("schema"));
-      this.spec =
-          PartitionSpecParser.fromJsonFields(schema, specId, 
metadata.get("partition-spec"));
-    }
-
-    this.fileSchema = new 
Schema(DataFile.getType(spec.partitionType()).fields());
+    Schema schema = SchemaParser.fromJson(metadata.get("schema"));
+    return PartitionSpecParser.fromJsonFields(schema, specId, 
metadata.get("partition-spec"));
   }
 
   public boolean isDeleteManifestReader() {
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index b82244f071..e0452e0be2 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -260,6 +260,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
     OutputFile newManifestPath = newManifestOutput();
     return ManifestFiles.copyAppendManifest(
         current.formatVersion(),
+        manifest.partitionSpecId(),
         toCopy,
         current.specsById(),
         newManifestPath,

Reply via email to