This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c85b2f5f Core: Add length arg to FileIO.newInputFile (#5207)
5c85b2f5f is described below

commit 5c85b2f5f1837e0e5a89434f656c6168653823c5
Author: Ryan Blue <[email protected]>
AuthorDate: Wed Jul 6 07:02:06 2022 -0700

    Core: Add length arg to FileIO.newInputFile (#5207)
---
 api/src/main/java/org/apache/iceberg/io/FileIO.java    |  7 +++++++
 .../main/java/org/apache/iceberg/aws/s3/S3FileIO.java  |  5 +++++
 .../java/org/apache/iceberg/aws/s3/S3InputFile.java    | 18 +++++++++++++++---
 .../java/org/apache/iceberg/aws/s3/S3OutputFile.java   |  2 +-
 .../main/java/org/apache/iceberg/ManifestFiles.java    |  4 ++--
 .../main/java/org/apache/iceberg/RemoveSnapshots.java  |  2 +-
 .../java/org/apache/iceberg/hadoop/HadoopFileIO.java   |  5 +++++
 .../org/apache/iceberg/hadoop/HadoopInputFile.java     |  6 +++++-
 .../java/org/apache/iceberg/io/ResolvingFileIO.java    |  5 +++++
 .../iceberg/hadoop/TestCatalogUtilDropTable.java       |  4 ++++
 .../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java     |  5 +++++
 .../java/org/apache/iceberg/gcp/gcs/GCSInputFile.java  | 18 +++++++++++++++---
 .../java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java |  2 +-
 13 files changed, 71 insertions(+), 12 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java 
b/api/src/main/java/org/apache/iceberg/io/FileIO.java
index 187b53fa2..7bf725ff7 100644
--- a/api/src/main/java/org/apache/iceberg/io/FileIO.java
+++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java
@@ -37,6 +37,13 @@ public interface FileIO extends Serializable, Closeable {
    */
   InputFile newInputFile(String path);
 
+  /**
+   * Get a {@link InputFile} instance to read bytes from the file at the given 
path, with a known file length.
+   */
+  default InputFile newInputFile(String path, long length) {
+    return newInputFile(path);
+  }
+
   /**
    * Get a {@link OutputFile} instance to write bytes to the file at the given 
path.
    */
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index f09eb948c..5d5d88eaf 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -119,6 +119,11 @@ public class S3FileIO implements FileIO, 
SupportsBulkOperations, SupportsPrefixO
     return S3InputFile.fromLocation(path, client(), awsProperties, metrics);
   }
 
+  @Override
+  public InputFile newInputFile(String path, long length) {
+    return S3InputFile.fromLocation(path, length, client(), awsProperties, 
metrics);
+  }
+
   @Override
   public OutputFile newOutputFile(String path) {
     return S3OutputFile.fromLocation(path, client(), awsProperties, metrics);
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
index 8e8a81e7b..e4862e0dc 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
@@ -29,15 +29,23 @@ import software.amazon.awssdk.services.s3.S3Client;
 
 public class S3InputFile extends BaseS3File implements InputFile, 
NativelyEncryptedFile {
   private NativeFileCryptoParameters nativeDecryptionParameters;
+  private Long length;
 
   public static S3InputFile fromLocation(String location, S3Client client, 
AwsProperties awsProperties,
       MetricsContext metrics) {
-    return new S3InputFile(client, new S3URI(location, 
awsProperties.s3BucketToAccessPointMapping()),
+    return new S3InputFile(client, new S3URI(location, 
awsProperties.s3BucketToAccessPointMapping()), null,
         awsProperties, metrics);
   }
 
-  S3InputFile(S3Client client, S3URI uri, AwsProperties awsProperties, 
MetricsContext metrics) {
+  public static S3InputFile fromLocation(String location, long length, 
S3Client client, AwsProperties awsProperties,
+                                         MetricsContext metrics) {
+    return new S3InputFile(client, new S3URI(location, 
awsProperties.s3BucketToAccessPointMapping()),
+        length > 0 ? length : null, awsProperties, metrics);
+  }
+
+  S3InputFile(S3Client client, S3URI uri, Long length, AwsProperties 
awsProperties, MetricsContext metrics) {
     super(client, uri, awsProperties, metrics);
+    this.length = length;
   }
 
   /**
@@ -47,7 +55,11 @@ public class S3InputFile extends BaseS3File implements 
InputFile, NativelyEncryp
    */
   @Override
   public long getLength() {
-    return getObjectMetadata().contentLength();
+    if (length == null) {
+      this.length = getObjectMetadata().contentLength();
+    }
+
+    return length;
   }
 
   @Override
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
index 7c29097d6..f2af425f1 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
@@ -70,7 +70,7 @@ public class S3OutputFile extends BaseS3File implements 
OutputFile, NativelyEncr
 
   @Override
   public InputFile toInputFile() {
-    return new S3InputFile(client(), uri(), awsProperties(), metrics());
+    return new S3InputFile(client(), uri(), null, awsProperties(), metrics());
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java 
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 73f2f6840..6e2c10f4c 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -82,7 +82,7 @@ public class ManifestFiles {
   public static ManifestReader<DataFile> read(ManifestFile manifest, FileIO 
io, Map<Integer, PartitionSpec> specsById) {
     Preconditions.checkArgument(manifest.content() == ManifestContent.DATA,
         "Cannot read a delete manifest with a ManifestReader: %s", manifest);
-    InputFile file = io.newInputFile(manifest.path());
+    InputFile file = io.newInputFile(manifest.path(), manifest.length());
     InheritableMetadata inheritableMetadata = 
InheritableMetadataFactory.fromManifest(manifest);
     return new ManifestReader<>(file, specsById, inheritableMetadata, 
FileType.DATA_FILES);
   }
@@ -133,7 +133,7 @@ public class ManifestFiles {
                                                               Map<Integer, 
PartitionSpec> specsById) {
     Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES,
         "Cannot read a data manifest with a DeleteManifestReader: %s", 
manifest);
-    InputFile file = io.newInputFile(manifest.path());
+    InputFile file = io.newInputFile(manifest.path(), manifest.length());
     InheritableMetadata inheritableMetadata = 
InheritableMetadataFactory.fromManifest(manifest);
     return new ManifestReader<>(file, specsById, inheritableMetadata, 
FileType.DELETE_FILES);
   }
diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java 
b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
index cfad2a4b3..dd96b3659 100644
--- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
@@ -563,7 +563,7 @@ class RemoveSnapshots implements ExpireSnapshots {
   }
 
   private static final Schema MANIFEST_PROJECTION = ManifestFile.schema()
-      .select("manifest_path", "added_snapshot_id", 
"deleted_data_files_count");
+      .select("manifest_path", "manifest_length", "added_snapshot_id", 
"deleted_data_files_count");
 
   private CloseableIterable<ManifestFile> readManifestFiles(Snapshot snapshot) 
{
     if (snapshot.manifestListLocation() != null) {
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
index fd207eb2f..2ba2cbf26 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
@@ -67,6 +67,11 @@ public class HadoopFileIO implements FileIO, 
HadoopConfigurable, SupportsPrefixO
     return HadoopInputFile.fromLocation(path, hadoopConf.get());
   }
 
+  @Override
+  public InputFile newInputFile(String path, long length) {
+    return HadoopInputFile.fromLocation(path, length, hadoopConf.get());
+  }
+
   @Override
   public OutputFile newOutputFile(String path) {
     return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get());
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
index 7393c91ce..8e39dcaf0 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
@@ -61,7 +61,11 @@ public class HadoopInputFile implements InputFile, 
NativelyEncryptedFile {
   public static HadoopInputFile fromLocation(CharSequence location, long 
length,
                                              Configuration conf) {
     FileSystem fs = Util.getFs(new Path(location.toString()), conf);
-    return new HadoopInputFile(fs, location.toString(), length, conf);
+    if (length > 0) {
+      return new HadoopInputFile(fs, location.toString(), length, conf);
+    } else {
+      return new HadoopInputFile(fs, location.toString(), conf);
+    }
   }
 
   public static HadoopInputFile fromLocation(CharSequence location, FileSystem 
fs) {
diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java 
b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
index 3b6d9725f..3815d5da5 100644
--- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
@@ -63,6 +63,11 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable {
     return io(location).newInputFile(location);
   }
 
+  @Override
+  public InputFile newInputFile(String location, long length) {
+    return io(location).newInputFile(location, length);
+  }
+
   @Override
   public OutputFile newOutputFile(String location) {
     return io(location).newOutputFile(location);
diff --git 
a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java 
b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java
index 4c114a280..449445a91 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java
@@ -60,6 +60,8 @@ public class TestCatalogUtilDropTable extends 
HadoopTableTestBase {
     FileIO fileIO = Mockito.mock(FileIO.class);
     Mockito.when(fileIO.newInputFile(Mockito.anyString()))
         .thenAnswer(invocation -> 
table.io().newInputFile(invocation.getArgument(0)));
+    Mockito.when(fileIO.newInputFile(Mockito.anyString(), Mockito.anyLong()))
+        .thenAnswer(invocation -> 
table.io().newInputFile(invocation.getArgument(0), invocation.getArgument(1)));
 
     CatalogUtil.dropTableData(fileIO, tableMetadata);
     ArgumentCaptor<String> argumentCaptor = 
ArgumentCaptor.forClass(String.class);
@@ -90,6 +92,8 @@ public class TestCatalogUtilDropTable extends 
HadoopTableTestBase {
     FileIO fileIO = Mockito.mock(FileIO.class);
     Mockito.when(fileIO.newInputFile(Mockito.anyString()))
         .thenAnswer(invocation -> 
table.io().newInputFile(invocation.getArgument(0)));
+    Mockito.when(fileIO.newInputFile(Mockito.anyString(), Mockito.anyLong()))
+        .thenAnswer(invocation -> 
table.io().newInputFile(invocation.getArgument(0), invocation.getArgument(1)));
     Mockito.doThrow(new 
RuntimeException()).when(fileIO).deleteFile(ArgumentMatchers.anyString());
 
     CatalogUtil.dropTableData(fileIO, tableMetadata);
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index 24e37e23d..ecb520f1d 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -80,6 +80,11 @@ public class GCSFileIO implements FileIO {
     return GCSInputFile.fromLocation(path, client(), gcpProperties, metrics);
   }
 
+  @Override
+  public InputFile newInputFile(String path, long length) {
+    return GCSInputFile.fromLocation(path, length, client(), gcpProperties, 
metrics);
+  }
+
   @Override
   public OutputFile newOutputFile(String path) {
     return GCSOutputFile.fromLocation(path, client(), gcpProperties, metrics);
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
index 82b6f10ae..c220615e4 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
@@ -27,19 +27,31 @@ import org.apache.iceberg.io.SeekableInputStream;
 import org.apache.iceberg.metrics.MetricsContext;
 
 class GCSInputFile extends BaseGCSFile implements InputFile {
+  private Long length;
 
   static GCSInputFile fromLocation(String location, Storage storage,
+                                   GCPProperties gcpProperties, MetricsContext 
metrics) {
+    return new GCSInputFile(storage, BlobId.fromGsUtilUri(location), null, 
gcpProperties, metrics);
+  }
+
+  static GCSInputFile fromLocation(String location, long length, Storage 
storage,
       GCPProperties gcpProperties, MetricsContext metrics) {
-    return new GCSInputFile(storage, BlobId.fromGsUtilUri(location), 
gcpProperties, metrics);
+    return new GCSInputFile(
+        storage, BlobId.fromGsUtilUri(location), length > 0 ? length : null, 
gcpProperties, metrics);
   }
 
-  GCSInputFile(Storage storage, BlobId blobId, GCPProperties gcpProperties, 
MetricsContext metrics) {
+  GCSInputFile(Storage storage, BlobId blobId, Long length, GCPProperties 
gcpProperties, MetricsContext metrics) {
     super(storage, blobId, gcpProperties, metrics);
+    this.length = length;
   }
 
   @Override
   public long getLength() {
-    return getBlob().getSize();
+    if (length == null) {
+      this.length = getBlob().getSize();
+    }
+
+    return length;
   }
 
   @Override
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
index 9c01b5dae..a8f5d6080 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
@@ -67,6 +67,6 @@ class GCSOutputFile extends BaseGCSFile implements OutputFile 
{
 
   @Override
   public InputFile toInputFile() {
-    return new GCSInputFile(storage(), blobId(), gcpProperties(), metrics());
+    return new GCSInputFile(storage(), blobId(), null, gcpProperties(), 
metrics());
   }
 }

Reply via email to