This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4a75dce Bug fixes for Pig (#202)
4a75dce is described below
commit 4a75dcef61f8736430d7b83293a1e7c58edc3a89
Author: Ryan Blue <[email protected]>
AuthorDate: Mon Jun 3 16:23:24 2019 -0700
Bug fixes for Pig (#202)
---
.../main/java/org/apache/iceberg/parquet/Parquet.java | 12 +++++++++++-
.../org/apache/iceberg/pig/IcebergPigInputFormat.java | 17 +++++++++++++----
2 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 7ffc818..3763a28 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -22,9 +22,11 @@ package org.apache.iceberg.parquet;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.IOException;
+import java.util.Collection;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
+import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
@@ -66,6 +68,9 @@ public class Parquet {
private Parquet() {
}
+ private static Collection<String> READ_PROPERTIES_TO_REMOVE =
Sets.newHashSet(
+ "parquet.read.filter", "parquet.private.read.filter.predicate",
"parquet.read.support.class");
+
public static WriteBuilder write(OutputFile file) {
return new WriteBuilder(file);
}
@@ -338,7 +343,12 @@ public class Parquet {
if (readerFunc != null) {
ParquetReadOptions.Builder optionsBuilder;
if (file instanceof HadoopInputFile) {
- optionsBuilder = HadoopReadOptions.builder(((HadoopInputFile)
file).getConf());
+ // remove read properties already set that may conflict with this
read
+ Configuration conf = new Configuration(((HadoopInputFile)
file).getConf());
+ for (String property : READ_PROPERTIES_TO_REMOVE) {
+ conf.unset(property);
+ }
+ optionsBuilder = HadoopReadOptions.builder(conf);
} else {
optionsBuilder = ParquetReadOptions.builder();
}
diff --git
a/pig/src/main/java/org/apache/iceberg/pig/IcebergPigInputFormat.java
b/pig/src/main/java/org/apache/iceberg/pig/IcebergPigInputFormat.java
index 84195c9..ca2529b 100644
--- a/pig/src/main/java/org/apache/iceberg/pig/IcebergPigInputFormat.java
+++ b/pig/src/main/java/org/apache/iceberg/pig/IcebergPigInputFormat.java
@@ -106,6 +106,8 @@ public class IcebergPigInputFormat<T> extends
InputFormat<Void, T> {
}
private static class IcebergSplit extends InputSplit implements Writable {
+ private static final String[] ANYWHERE = new String[] { "*" };
+
private CombinedScanTask task;
IcebergSplit(CombinedScanTask task) {
@@ -123,7 +125,7 @@ public class IcebergPigInputFormat<T> extends
InputFormat<Void, T> {
@Override
public String[] getLocations() {
- return new String[0];
+ return ANYWHERE;
}
@Override
@@ -164,7 +166,7 @@ public class IcebergPigInputFormat<T> extends
InputFormat<Void, T> {
@SuppressWarnings("unchecked")
private boolean advance() throws IOException {
- if(reader != null) {
+ if (reader != null) {
reader.close();
}
@@ -242,11 +244,18 @@ public class IcebergPigInputFormat<T> extends
InputFormat<Void, T> {
@Override
public boolean nextKeyValue() throws IOException {
- if (recordIterator.hasNext() || advance()) {
+ if (recordIterator.hasNext()) {
currentRecord = recordIterator.next();
return true;
}
-
+
+ while (advance()) {
+ if (recordIterator.hasNext()) {
+ currentRecord = recordIterator.next();
+ return true;
+ }
+ }
+
return false;
}