This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cc41e1  Spark: Fix key error with duplicate data files (#1798)
1cc41e1 is described below

commit 1cc41e170719eaeb7177295c0619052123f081d2
Author: Samarth Jain <[email protected]>
AuthorDate: Fri Nov 20 14:55:58 2020 -0800

    Spark: Fix key error with duplicate data files (#1798)
---
 .../main/java/org/apache/iceberg/flink/source/DataIterator.java  | 8 ++++----
 .../java/org/apache/iceberg/spark/source/BaseDataReader.java     | 9 +++++----
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java 
b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
index f1e1500..f44ea4a 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.flink.source;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -34,7 +35,6 @@ import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
 /**
@@ -62,9 +62,9 @@ abstract class DataIterator<T> implements 
CloseableIterator<T> {
     // decrypt with the batch call to avoid multiple RPCs to a key server, if 
possible
     Iterable<InputFile> decryptedFiles = 
encryption.decrypt(encrypted::iterator);
 
-    ImmutableMap.Builder<String, InputFile> inputFileBuilder = 
ImmutableMap.builder();
-    decryptedFiles.forEach(decrypted -> 
inputFileBuilder.put(decrypted.location(), decrypted));
-    this.inputFiles = inputFileBuilder.build();
+    Map<String, InputFile> files = 
Maps.newHashMapWithExpectedSize(task.files().size());
+    decryptedFiles.forEach(decrypted -> 
files.putIfAbsent(decrypted.location(), decrypted));
+    this.inputFiles = Collections.unmodifiableMap(files);
 
     this.currentIterator = CloseableIterator.empty();
   }
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index 60c22e8..b9690c7 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -23,6 +23,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -37,7 +38,6 @@ import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.util.ByteBuffers;
@@ -69,9 +69,10 @@ abstract class BaseDataReader<T> implements Closeable {
     // decrypt with the batch call to avoid multiple RPCs to a key server, if 
possible
     Iterable<InputFile> decryptedFiles = 
encryptionManager.decrypt(encrypted::iterator);
 
-    ImmutableMap.Builder<String, InputFile> inputFileBuilder = 
ImmutableMap.builder();
-    decryptedFiles.forEach(decrypted -> 
inputFileBuilder.put(decrypted.location(), decrypted));
-    this.inputFiles = inputFileBuilder.build();
+    Map<String, InputFile> files = 
Maps.newHashMapWithExpectedSize(task.files().size());
+    decryptedFiles.forEach(decrypted -> 
files.putIfAbsent(decrypted.location(), decrypted));
+    this.inputFiles = Collections.unmodifiableMap(files);
+
     this.currentIterator = CloseableIterator.empty();
   }
 

Reply via email to