This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 25c909be90 API: Fix default FileIO#newInputFile ManifestFile, DataFile
and DeleteFile implementations (#9953)
25c909be90 is described below
commit 25c909be90d448ebbd070a20726ee2e63f48c717
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Thu Apr 4 14:02:50 2024 -0600
API: Fix default FileIO#newInputFile ManifestFile, DataFile and DeleteFile
implementations (#9953)
---
.../main/java/org/apache/iceberg/io/FileIO.java | 12 ++---
.../org/apache/iceberg/aws/s3/TestS3FileIO.java | 58 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 6 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java
b/api/src/main/java/org/apache/iceberg/io/FileIO.java
index fc6a53367f..a521cbf79d 100644
--- a/api/src/main/java/org/apache/iceberg/io/FileIO.java
+++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java
@@ -49,25 +49,25 @@ public interface FileIO extends Serializable, Closeable {
default InputFile newInputFile(DataFile file) {
Preconditions.checkArgument(
file.keyMetadata() == null,
- "Cannot decrypt data file: {} (use EncryptingFileIO)",
+ "Cannot decrypt data file: %s (use EncryptingFileIO)",
file.path());
- return newInputFile(file.path().toString());
+ return newInputFile(file.path().toString(), file.fileSizeInBytes());
}
default InputFile newInputFile(DeleteFile file) {
Preconditions.checkArgument(
file.keyMetadata() == null,
- "Cannot decrypt delete file: {} (use EncryptingFileIO)",
+ "Cannot decrypt delete file: %s (use EncryptingFileIO)",
file.path());
- return newInputFile(file.path().toString());
+ return newInputFile(file.path().toString(), file.fileSizeInBytes());
}
default InputFile newInputFile(ManifestFile manifest) {
Preconditions.checkArgument(
manifest.keyMetadata() == null,
- "Cannot decrypt manifest: {} (use EncryptingFileIO)",
+ "Cannot decrypt manifest: %s (use EncryptingFileIO)",
manifest.path());
- return newInputFile(manifest.path());
+ return newInputFile(manifest.path(), manifest.length());
}
/** Get a {@link OutputFile} instance to write bytes to the file at the
given path. */
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index a74e574c97..26c9bc133b 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -22,6 +22,8 @@ import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -38,6 +40,13 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.aws.AwsProperties;
@@ -74,6 +83,7 @@ import
software.amazon.awssdk.services.s3.model.BucketAlreadyExistsException;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
@ExtendWith(S3MockExtension.class)
@@ -377,6 +387,54 @@ public class TestS3FileIO {
Assertions.assertThat(result).isInstanceOf(S3FileIO.class);
}
+ @Test
+ public void testInputFileWithDataFile() throws IOException {
+ String location = "s3://bucket/path/to/data-file.parquet";
+ DataFile dataFile =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(location)
+ .withFileSizeInBytes(123L)
+ .withFormat(FileFormat.PARQUET)
+ .withRecordCount(123L)
+ .build();
+ OutputStream outputStream = s3FileIO.newOutputFile(location).create();
+ byte[] data = "testing".getBytes();
+ outputStream.write(data);
+ outputStream.close();
+
+ InputFile inputFile = s3FileIO.newInputFile(dataFile);
+ reset(s3mock);
+
+ Assertions.assertThat(inputFile.getLength())
+ .as("Data file length should be determined from the file size stats")
+ .isEqualTo(123L);
+ verify(s3mock, never()).headObject(any(HeadObjectRequest.class));
+ }
+
+ @Test
+ public void testInputFileWithManifest() throws IOException {
+ String dataFileLocation = "s3://bucket/path/to/data-file-2.parquet";
+ DataFile dataFile =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(dataFileLocation)
+ .withFileSizeInBytes(123L)
+ .withFormat(FileFormat.PARQUET)
+ .withRecordCount(123L)
+ .build();
+ String manifestLocation = "s3://bucket/path/to/manifest.avro";
+ OutputFile outputFile = s3FileIO.newOutputFile(manifestLocation);
+ ManifestWriter<DataFile> writer =
+ ManifestFiles.write(PartitionSpec.unpartitioned(), outputFile);
+ writer.add(dataFile);
+ writer.close();
+ ManifestFile manifest = writer.toManifestFile();
+ InputFile inputFile = s3FileIO.newInputFile(manifest);
+ reset(s3mock);
+
+ Assertions.assertThat(inputFile.getLength()).isEqualTo(manifest.length());
+ verify(s3mock, never()).headObject(any(HeadObjectRequest.class));
+ }
+
private void createRandomObjects(String prefix, int count) {
S3URI s3URI = new S3URI(prefix);