This is an automated email from the ASF dual-hosted git repository.
rymurr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 92a264b Use bulk decryption interface in ArrowReader (#2720)
92a264b is described below
commit 92a264b4406fd933eed5685d25defc9e0dcb8617
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Thu Jun 24 12:11:06 2021 +0200
Use bulk decryption interface in ArrowReader (#2720)
---
.../iceberg/arrow/vectorized/ArrowReader.java | 23 ++++++++++++++++------
.../apache/iceberg/flink/source/DataIterator.java | 4 ++--
.../iceberg/spark/source/BaseDataReader.java | 4 ++--
3 files changed, 21 insertions(+), 10 deletions(-)
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
index 60b79a6..503a273 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
@@ -20,12 +20,11 @@
package org.apache.iceberg.arrow.vectorized;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@@ -38,6 +37,7 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
@@ -49,6 +49,7 @@ import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.parquet.schema.MessageType;
@@ -210,11 +211,21 @@ public class ArrowReader extends CloseableGroup {
.flatMap(Collection::stream)
.collect(Collectors.toList());
this.fileItr = fileTasks.iterator();
- this.inputFiles = Collections.unmodifiableMap(fileTasks.stream()
+
+ Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
+ fileTasks.stream()
.flatMap(fileScanTask ->
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
- .map(file ->
EncryptedFiles.encryptedInput(io.newInputFile(file.path().toString()),
file.keyMetadata()))
- .map(encryptionManager::decrypt)
- .collect(Collectors.toMap(InputFile::location,
Function.identity())));
+ .forEach(file -> keyMetadata.put(file.path().toString(),
file.keyMetadata()));
+
+ Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
+ .map(entry ->
EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()),
entry.getValue()));
+
+ // decrypt with the batch call to avoid multiple RPCs to a key server,
if possible
+ Iterable<InputFile> decryptedFiles =
encryptionManager.decrypt(encrypted::iterator);
+
+ Map<String, InputFile> files =
Maps.newHashMapWithExpectedSize(fileTasks.size());
+ decryptedFiles.forEach(decrypted ->
files.putIfAbsent(decrypted.location(), decrypted));
+ this.inputFiles = ImmutableMap.copyOf(files);
this.currentIterator = CloseableIterator.empty();
this.expectedSchema = expectedSchema;
this.nameMapping = nameMapping;
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
index f44ea4a..f74a896 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.flink.source;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;
@@ -35,6 +34,7 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
/**
@@ -64,7 +64,7 @@ abstract class DataIterator<T> implements
CloseableIterator<T> {
Map<String, InputFile> files =
Maps.newHashMapWithExpectedSize(task.files().size());
decryptedFiles.forEach(decrypted ->
files.putIfAbsent(decrypted.location(), decrypted));
- this.inputFiles = Collections.unmodifiableMap(files);
+ this.inputFiles = ImmutableMap.copyOf(files);
this.currentIterator = CloseableIterator.empty();
}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index 566dcab..c8b33dd 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -23,7 +23,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;
@@ -38,6 +37,7 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.ByteBuffers;
@@ -76,7 +76,7 @@ abstract class BaseDataReader<T> implements Closeable {
Map<String, InputFile> files =
Maps.newHashMapWithExpectedSize(task.files().size());
decryptedFiles.forEach(decrypted ->
files.putIfAbsent(decrypted.location(), decrypted));
- this.inputFiles = Collections.unmodifiableMap(files);
+ this.inputFiles = ImmutableMap.copyOf(files);
this.currentIterator = CloseableIterator.empty();
}