This is an automated email from the ASF dual-hosted git repository.

rymurr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 92a264b  Use bulk decryption interface in ArrowReader (#2720)
92a264b is described below

commit 92a264b4406fd933eed5685d25defc9e0dcb8617
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Thu Jun 24 12:11:06 2021 +0200

    Use bulk decryption interface in ArrowReader (#2720)
---
 .../iceberg/arrow/vectorized/ArrowReader.java      | 23 ++++++++++++++++------
 .../apache/iceberg/flink/source/DataIterator.java  |  4 ++--
 .../iceberg/spark/source/BaseDataReader.java       |  4 ++--
 3 files changed, 21 insertions(+), 10 deletions(-)

diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
index 60b79a6..503a273 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
@@ -20,12 +20,11 @@
 package org.apache.iceberg.arrow.vectorized;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -38,6 +37,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
@@ -49,6 +49,7 @@ import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.schema.MessageType;
 
@@ -210,11 +211,21 @@ public class ArrowReader extends CloseableGroup {
           .flatMap(Collection::stream)
           .collect(Collectors.toList());
       this.fileItr = fileTasks.iterator();
-      this.inputFiles = Collections.unmodifiableMap(fileTasks.stream()
+
+      Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
+      fileTasks.stream()
           .flatMap(fileScanTask -> 
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
-          .map(file -> 
EncryptedFiles.encryptedInput(io.newInputFile(file.path().toString()), 
file.keyMetadata()))
-          .map(encryptionManager::decrypt)
-          .collect(Collectors.toMap(InputFile::location, 
Function.identity())));
+          .forEach(file -> keyMetadata.put(file.path().toString(), 
file.keyMetadata()));
+
+      Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
+          .map(entry -> 
EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), 
entry.getValue()));
+
+      // decrypt with the batch call to avoid multiple RPCs to a key server, 
if possible
+      Iterable<InputFile> decryptedFiles = 
encryptionManager.decrypt(encrypted::iterator);
+
+      Map<String, InputFile> files = 
Maps.newHashMapWithExpectedSize(fileTasks.size());
+      decryptedFiles.forEach(decrypted -> 
files.putIfAbsent(decrypted.location(), decrypted));
+      this.inputFiles = ImmutableMap.copyOf(files);
       this.currentIterator = CloseableIterator.empty();
       this.expectedSchema = expectedSchema;
       this.nameMapping = nameMapping;
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java 
b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
index f44ea4a..f74a896 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.flink.source;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -35,6 +34,7 @@ import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
 /**
@@ -64,7 +64,7 @@ abstract class DataIterator<T> implements 
CloseableIterator<T> {
 
     Map<String, InputFile> files = 
Maps.newHashMapWithExpectedSize(task.files().size());
     decryptedFiles.forEach(decrypted -> 
files.putIfAbsent(decrypted.location(), decrypted));
-    this.inputFiles = Collections.unmodifiableMap(files);
+    this.inputFiles = ImmutableMap.copyOf(files);
 
     this.currentIterator = CloseableIterator.empty();
   }
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index 566dcab..c8b33dd 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -23,7 +23,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -38,6 +37,7 @@ import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.util.ByteBuffers;
@@ -76,7 +76,7 @@ abstract class BaseDataReader<T> implements Closeable {
 
     Map<String, InputFile> files = 
Maps.newHashMapWithExpectedSize(task.files().size());
     decryptedFiles.forEach(decrypted -> 
files.putIfAbsent(decrypted.location(), decrypted));
-    this.inputFiles = Collections.unmodifiableMap(files);
+    this.inputFiles = ImmutableMap.copyOf(files);
 
     this.currentIterator = CloseableIterator.empty();
   }

Reply via email to