This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 1cc41e1 Spark: Fix key error with duplicate data files (#1798)
1cc41e1 is described below
commit 1cc41e170719eaeb7177295c0619052123f081d2
Author: Samarth Jain <[email protected]>
AuthorDate: Fri Nov 20 14:55:58 2020 -0800
Spark: Fix key error with duplicate data files (#1798)
---
.../main/java/org/apache/iceberg/flink/source/DataIterator.java | 8 ++++----
.../java/org/apache/iceberg/spark/source/BaseDataReader.java | 9 +++++----
2 files changed, 9 insertions(+), 8 deletions(-)
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
index f1e1500..f44ea4a 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.flink.source;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;
@@ -34,7 +35,6 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
/**
@@ -62,9 +62,9 @@ abstract class DataIterator<T> implements
CloseableIterator<T> {
// decrypt with the batch call to avoid multiple RPCs to a key server, if
possible
Iterable<InputFile> decryptedFiles =
encryption.decrypt(encrypted::iterator);
- ImmutableMap.Builder<String, InputFile> inputFileBuilder =
ImmutableMap.builder();
- decryptedFiles.forEach(decrypted ->
inputFileBuilder.put(decrypted.location(), decrypted));
- this.inputFiles = inputFileBuilder.build();
+ Map<String, InputFile> files =
Maps.newHashMapWithExpectedSize(task.files().size());
+ decryptedFiles.forEach(decrypted ->
files.putIfAbsent(decrypted.location(), decrypted));
+ this.inputFiles = Collections.unmodifiableMap(files);
this.currentIterator = CloseableIterator.empty();
}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index 60c22e8..b9690c7 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -23,6 +23,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;
@@ -37,7 +38,6 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.ByteBuffers;
@@ -69,9 +69,10 @@ abstract class BaseDataReader<T> implements Closeable {
// decrypt with the batch call to avoid multiple RPCs to a key server, if
possible
Iterable<InputFile> decryptedFiles =
encryptionManager.decrypt(encrypted::iterator);
- ImmutableMap.Builder<String, InputFile> inputFileBuilder =
ImmutableMap.builder();
- decryptedFiles.forEach(decrypted ->
inputFileBuilder.put(decrypted.location(), decrypted));
- this.inputFiles = inputFileBuilder.build();
+ Map<String, InputFile> files =
Maps.newHashMapWithExpectedSize(task.files().size());
+ decryptedFiles.forEach(decrypted ->
files.putIfAbsent(decrypted.location(), decrypted));
+ this.inputFiles = Collections.unmodifiableMap(files);
+
this.currentIterator = CloseableIterator.empty();
}