This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5c85b2f5f Core: Add length arg to FileIO.newInputFile (#5207)
5c85b2f5f is described below
commit 5c85b2f5f1837e0e5a89434f656c6168653823c5
Author: Ryan Blue <[email protected]>
AuthorDate: Wed Jul 6 07:02:06 2022 -0700
Core: Add length arg to FileIO.newInputFile (#5207)
---
api/src/main/java/org/apache/iceberg/io/FileIO.java | 7 +++++++
.../main/java/org/apache/iceberg/aws/s3/S3FileIO.java | 5 +++++
.../java/org/apache/iceberg/aws/s3/S3InputFile.java | 18 +++++++++++++++---
.../java/org/apache/iceberg/aws/s3/S3OutputFile.java | 2 +-
.../main/java/org/apache/iceberg/ManifestFiles.java | 4 ++--
.../main/java/org/apache/iceberg/RemoveSnapshots.java | 2 +-
.../java/org/apache/iceberg/hadoop/HadoopFileIO.java | 5 +++++
.../org/apache/iceberg/hadoop/HadoopInputFile.java | 6 +++++-
.../java/org/apache/iceberg/io/ResolvingFileIO.java | 5 +++++
.../iceberg/hadoop/TestCatalogUtilDropTable.java | 4 ++++
.../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java | 5 +++++
.../java/org/apache/iceberg/gcp/gcs/GCSInputFile.java | 18 +++++++++++++++---
.../java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java | 2 +-
13 files changed, 71 insertions(+), 12 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java
b/api/src/main/java/org/apache/iceberg/io/FileIO.java
index 187b53fa2..7bf725ff7 100644
--- a/api/src/main/java/org/apache/iceberg/io/FileIO.java
+++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java
@@ -37,6 +37,13 @@ public interface FileIO extends Serializable, Closeable {
*/
InputFile newInputFile(String path);
+ /**
+ * Get a {@link InputFile} instance to read bytes from the file at the given
path, with a known file length.
+ */
+ default InputFile newInputFile(String path, long length) {
+ return newInputFile(path);
+ }
+
/**
* Get a {@link OutputFile} instance to write bytes to the file at the given
path.
*/
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index f09eb948c..5d5d88eaf 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -119,6 +119,11 @@ public class S3FileIO implements FileIO,
SupportsBulkOperations, SupportsPrefixO
return S3InputFile.fromLocation(path, client(), awsProperties, metrics);
}
+ @Override
+ public InputFile newInputFile(String path, long length) {
+ return S3InputFile.fromLocation(path, length, client(), awsProperties,
metrics);
+ }
+
@Override
public OutputFile newOutputFile(String path) {
return S3OutputFile.fromLocation(path, client(), awsProperties, metrics);
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
index 8e8a81e7b..e4862e0dc 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
@@ -29,15 +29,23 @@ import software.amazon.awssdk.services.s3.S3Client;
public class S3InputFile extends BaseS3File implements InputFile,
NativelyEncryptedFile {
private NativeFileCryptoParameters nativeDecryptionParameters;
+ private Long length;
public static S3InputFile fromLocation(String location, S3Client client,
AwsProperties awsProperties,
MetricsContext metrics) {
- return new S3InputFile(client, new S3URI(location,
awsProperties.s3BucketToAccessPointMapping()),
+ return new S3InputFile(client, new S3URI(location,
awsProperties.s3BucketToAccessPointMapping()), null,
awsProperties, metrics);
}
- S3InputFile(S3Client client, S3URI uri, AwsProperties awsProperties,
MetricsContext metrics) {
+ public static S3InputFile fromLocation(String location, long length,
S3Client client, AwsProperties awsProperties,
+ MetricsContext metrics) {
+ return new S3InputFile(client, new S3URI(location,
awsProperties.s3BucketToAccessPointMapping()),
+ length > 0 ? length : null, awsProperties, metrics);
+ }
+
+ S3InputFile(S3Client client, S3URI uri, Long length, AwsProperties
awsProperties, MetricsContext metrics) {
super(client, uri, awsProperties, metrics);
+ this.length = length;
}
/**
@@ -47,7 +55,11 @@ public class S3InputFile extends BaseS3File implements
InputFile, NativelyEncryp
*/
@Override
public long getLength() {
- return getObjectMetadata().contentLength();
+ if (length == null) {
+ this.length = getObjectMetadata().contentLength();
+ }
+
+ return length;
}
@Override
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
index 7c29097d6..f2af425f1 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
@@ -70,7 +70,7 @@ public class S3OutputFile extends BaseS3File implements
OutputFile, NativelyEncr
@Override
public InputFile toInputFile() {
- return new S3InputFile(client(), uri(), awsProperties(), metrics());
+ return new S3InputFile(client(), uri(), null, awsProperties(), metrics());
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 73f2f6840..6e2c10f4c 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -82,7 +82,7 @@ public class ManifestFiles {
public static ManifestReader<DataFile> read(ManifestFile manifest, FileIO
io, Map<Integer, PartitionSpec> specsById) {
Preconditions.checkArgument(manifest.content() == ManifestContent.DATA,
"Cannot read a delete manifest with a ManifestReader: %s", manifest);
- InputFile file = io.newInputFile(manifest.path());
+ InputFile file = io.newInputFile(manifest.path(), manifest.length());
InheritableMetadata inheritableMetadata =
InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader<>(file, specsById, inheritableMetadata,
FileType.DATA_FILES);
}
@@ -133,7 +133,7 @@ public class ManifestFiles {
Map<Integer,
PartitionSpec> specsById) {
Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES,
"Cannot read a data manifest with a DeleteManifestReader: %s",
manifest);
- InputFile file = io.newInputFile(manifest.path());
+ InputFile file = io.newInputFile(manifest.path(), manifest.length());
InheritableMetadata inheritableMetadata =
InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader<>(file, specsById, inheritableMetadata,
FileType.DELETE_FILES);
}
diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
index cfad2a4b3..dd96b3659 100644
--- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
@@ -563,7 +563,7 @@ class RemoveSnapshots implements ExpireSnapshots {
}
private static final Schema MANIFEST_PROJECTION = ManifestFile.schema()
- .select("manifest_path", "added_snapshot_id",
"deleted_data_files_count");
+ .select("manifest_path", "manifest_length", "added_snapshot_id",
"deleted_data_files_count");
private CloseableIterable<ManifestFile> readManifestFiles(Snapshot snapshot)
{
if (snapshot.manifestListLocation() != null) {
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
index fd207eb2f..2ba2cbf26 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
@@ -67,6 +67,11 @@ public class HadoopFileIO implements FileIO,
HadoopConfigurable, SupportsPrefixO
return HadoopInputFile.fromLocation(path, hadoopConf.get());
}
+ @Override
+ public InputFile newInputFile(String path, long length) {
+ return HadoopInputFile.fromLocation(path, length, hadoopConf.get());
+ }
+
@Override
public OutputFile newOutputFile(String path) {
return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get());
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
index 7393c91ce..8e39dcaf0 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
@@ -61,7 +61,11 @@ public class HadoopInputFile implements InputFile,
NativelyEncryptedFile {
public static HadoopInputFile fromLocation(CharSequence location, long
length,
Configuration conf) {
FileSystem fs = Util.getFs(new Path(location.toString()), conf);
- return new HadoopInputFile(fs, location.toString(), length, conf);
+ if (length > 0) {
+ return new HadoopInputFile(fs, location.toString(), length, conf);
+ } else {
+ return new HadoopInputFile(fs, location.toString(), conf);
+ }
}
public static HadoopInputFile fromLocation(CharSequence location, FileSystem
fs) {
diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
index 3b6d9725f..3815d5da5 100644
--- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
@@ -63,6 +63,11 @@ public class ResolvingFileIO implements FileIO,
HadoopConfigurable {
return io(location).newInputFile(location);
}
+ @Override
+ public InputFile newInputFile(String location, long length) {
+ return io(location).newInputFile(location, length);
+ }
+
@Override
public OutputFile newOutputFile(String location) {
return io(location).newOutputFile(location);
diff --git
a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java
b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java
index 4c114a280..449445a91 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java
@@ -60,6 +60,8 @@ public class TestCatalogUtilDropTable extends
HadoopTableTestBase {
FileIO fileIO = Mockito.mock(FileIO.class);
Mockito.when(fileIO.newInputFile(Mockito.anyString()))
.thenAnswer(invocation ->
table.io().newInputFile(invocation.getArgument(0)));
+ Mockito.when(fileIO.newInputFile(Mockito.anyString(), Mockito.anyLong()))
+ .thenAnswer(invocation ->
table.io().newInputFile(invocation.getArgument(0), invocation.getArgument(1)));
CatalogUtil.dropTableData(fileIO, tableMetadata);
ArgumentCaptor<String> argumentCaptor =
ArgumentCaptor.forClass(String.class);
@@ -90,6 +92,8 @@ public class TestCatalogUtilDropTable extends
HadoopTableTestBase {
FileIO fileIO = Mockito.mock(FileIO.class);
Mockito.when(fileIO.newInputFile(Mockito.anyString()))
.thenAnswer(invocation ->
table.io().newInputFile(invocation.getArgument(0)));
+ Mockito.when(fileIO.newInputFile(Mockito.anyString(), Mockito.anyLong()))
+ .thenAnswer(invocation ->
table.io().newInputFile(invocation.getArgument(0), invocation.getArgument(1)));
Mockito.doThrow(new
RuntimeException()).when(fileIO).deleteFile(ArgumentMatchers.anyString());
CatalogUtil.dropTableData(fileIO, tableMetadata);
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index 24e37e23d..ecb520f1d 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -80,6 +80,11 @@ public class GCSFileIO implements FileIO {
return GCSInputFile.fromLocation(path, client(), gcpProperties, metrics);
}
+ @Override
+ public InputFile newInputFile(String path, long length) {
+ return GCSInputFile.fromLocation(path, length, client(), gcpProperties,
metrics);
+ }
+
@Override
public OutputFile newOutputFile(String path) {
return GCSOutputFile.fromLocation(path, client(), gcpProperties, metrics);
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
index 82b6f10ae..c220615e4 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
@@ -27,19 +27,31 @@ import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.MetricsContext;
class GCSInputFile extends BaseGCSFile implements InputFile {
+ private Long length;
static GCSInputFile fromLocation(String location, Storage storage,
+ GCPProperties gcpProperties, MetricsContext
metrics) {
+ return new GCSInputFile(storage, BlobId.fromGsUtilUri(location), null,
gcpProperties, metrics);
+ }
+
+ static GCSInputFile fromLocation(String location, long length, Storage
storage,
GCPProperties gcpProperties, MetricsContext metrics) {
- return new GCSInputFile(storage, BlobId.fromGsUtilUri(location),
gcpProperties, metrics);
+ return new GCSInputFile(
+ storage, BlobId.fromGsUtilUri(location), length > 0 ? length : null,
gcpProperties, metrics);
}
- GCSInputFile(Storage storage, BlobId blobId, GCPProperties gcpProperties,
MetricsContext metrics) {
+ GCSInputFile(Storage storage, BlobId blobId, Long length, GCPProperties
gcpProperties, MetricsContext metrics) {
super(storage, blobId, gcpProperties, metrics);
+ this.length = length;
}
@Override
public long getLength() {
- return getBlob().getSize();
+ if (length == null) {
+ this.length = getBlob().getSize();
+ }
+
+ return length;
}
@Override
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
index 9c01b5dae..a8f5d6080 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
@@ -67,6 +67,6 @@ class GCSOutputFile extends BaseGCSFile implements OutputFile
{
@Override
public InputFile toInputFile() {
- return new GCSInputFile(storage(), blobId(), gcpProperties(), metrics());
+ return new GCSInputFile(storage(), blobId(), null, gcpProperties(),
metrics());
}
}