This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d8f97cde8 [parquet] Introduce filter push down for parquet (#3487)
d8f97cde8 is described below
commit d8f97cde82627dcca9137df491ac1be4a7686cda
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 17 17:17:44 2024 +0800
[parquet] Introduce filter push down for parquet (#3487)
---
.../apache/paimon/format/FormatReaderFactory.java | 3 +-
.../apache/paimon/format/avro/AvroBulkFormat.java | 2 -
.../apache/paimon/format/orc/OrcReaderFactory.java | 9 +-
.../paimon/format/parquet/ParquetFileFormat.java | 4 +-
.../format/parquet/ParquetReaderFactory.java | 9 +-
.../parquet/filter2/predicate/ParquetFilters.java | 291 +++++++++++++++++++++
.../paimon/format/parquet/ParquetFiltersTest.java | 111 ++++++++
.../format/parquet/ParquetReadWriteTest.java | 21 +-
8 files changed, 425 insertions(+), 25 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
index ce92bb751..d2fc91501 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
@@ -24,10 +24,9 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;
import java.io.IOException;
-import java.io.Serializable;
/** A factory to create {@link RecordReader} for file. */
-public interface FormatReaderFactory extends Serializable {
+public interface FormatReaderFactory {
RecordReader<InternalRow> createReader(Context context) throws IOException;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index 39bba1a08..eda5650b3 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -40,8 +40,6 @@ import java.util.Iterator;
/** Provides a {@link FormatReaderFactory} for Avro records. */
public class AvroBulkFormat implements FormatReaderFactory {
- private static final long serialVersionUID = 1L;
-
protected final RowType projectedRowType;
public AvroBulkFormat(RowType projectedRowType) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 4657caebe..5093a5010 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -36,6 +36,7 @@ import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Pool;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
@@ -57,9 +58,7 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
/** An ORC reader that produces a stream of {@link ColumnarRow} records. */
public class OrcReaderFactory implements FormatReaderFactory {
- private static final long serialVersionUID = 1L;
-
- protected final SerializableHadoopConfigWrapper hadoopConfigWrapper;
+ protected final Configuration hadoopConfig;
protected final TypeDescription schema;
@@ -79,7 +78,7 @@ public class OrcReaderFactory implements FormatReaderFactory {
final RowType readType,
final List<OrcFilters.Predicate> conjunctPredicates,
final int batchSize) {
- this.hadoopConfigWrapper = new
SerializableHadoopConfigWrapper(checkNotNull(hadoopConfig));
+ this.hadoopConfig = checkNotNull(hadoopConfig);
this.schema = toOrcType(readType);
this.tableType = readType;
this.conjunctPredicates = checkNotNull(conjunctPredicates);
@@ -99,7 +98,7 @@ public class OrcReaderFactory implements FormatReaderFactory {
RecordReader orcReader =
createRecordReader(
- hadoopConfigWrapper.getHadoopConfig(),
+ hadoopConfig,
schema,
conjunctPredicates,
context.fileIO(),
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
index ffeb97909..c8ce5ccf5 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
@@ -31,6 +31,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;
+import org.apache.parquet.filter2.predicate.ParquetFilters;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import java.util.List;
@@ -59,7 +60,8 @@ public class ParquetFileFormat extends FileFormat {
return new ParquetReaderFactory(
getParquetConfiguration(formatContext),
projectedRowType,
- formatContext.readBatchSize());
+ formatContext.readBatchSize(),
+ ParquetFilters.convert(filters));
}
@Override
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 64e1e2296..2b2c651a8 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -39,6 +39,7 @@ import org.apache.paimon.utils.Pool;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.schema.GroupType;
@@ -68,21 +69,22 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
private static final Logger LOG =
LoggerFactory.getLogger(ParquetReaderFactory.class);
- private static final long serialVersionUID = 1L;
-
private static final String ALLOCATION_SIZE =
"parquet.read.allocation.size";
private final Options conf;
private final String[] projectedFields;
private final DataType[] projectedTypes;
private final int batchSize;
+ private final FilterCompat.Filter filter;
private final Set<Integer> unknownFieldsIndices = new HashSet<>();
- public ParquetReaderFactory(Options conf, RowType projectedType, int
batchSize) {
+ public ParquetReaderFactory(
+ Options conf, RowType projectedType, int batchSize,
FilterCompat.Filter filter) {
this.conf = conf;
this.projectedFields = projectedType.getFieldNames().toArray(new
String[0]);
this.projectedTypes = projectedType.getFieldTypes().toArray(new
DataType[0]);
this.batchSize = batchSize;
+ this.filter = filter;
}
@Override
@@ -124,6 +126,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
if (badRecordThresh != null) {
builder.set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
}
+ builder.withRecordFilter(filter);
}
/** Clips `parquetSchema` according to `fieldNames`. */
diff --git
a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
new file mode 100644
index 000000000..ef36cc6f8
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.filter2.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FunctionVisitor;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.io.api.Binary;
+
+import java.util.List;
+
+/** Convert {@link Predicate} to {@link FilterCompat.Filter}. */
+public class ParquetFilters {
+
+ private static final ConvertFilterToParquet CONVERTER = new
ConvertFilterToParquet();
+
+ private ParquetFilters() {}
+
+ public static FilterCompat.Filter convert(List<Predicate> predicates) {
+ FilterPredicate result = null;
+ if (predicates != null) {
+ for (Predicate predicate : predicates) {
+ try {
+ FilterPredicate parquetFilter = predicate.visit(CONVERTER);
+ if (result == null) {
+ result = parquetFilter;
+ } else {
+ result = FilterApi.and(result, parquetFilter);
+ }
+ } catch (UnsupportedOperationException ignore) {
+ }
+ }
+ }
+
+ return result != null ? FilterCompat.get(result) : FilterCompat.NOOP;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static class ConvertFilterToParquet implements
FunctionVisitor<FilterPredicate> {
+
+ @Override
+ public FilterPredicate visitIsNotNull(FieldRef fieldRef) {
+ return new Operators.NotEq<>(toParquetColumn(fieldRef), null);
+ }
+
+ @Override
+ public FilterPredicate visitIsNull(FieldRef fieldRef) {
+ return new Operators.Eq<>(toParquetColumn(fieldRef), null);
+ }
+
+ @Override
+ public FilterPredicate visitLessThan(FieldRef fieldRef, Object
literal) {
+ return new Operators.Lt(toParquetColumn(fieldRef),
toParquetObject(literal));
+ }
+
+ @Override
+ public FilterPredicate visitGreaterOrEqual(FieldRef fieldRef, Object
literal) {
+ return new Operators.GtEq(toParquetColumn(fieldRef),
toParquetObject(literal));
+ }
+
+ @Override
+ public FilterPredicate visitNotEqual(FieldRef fieldRef, Object
literal) {
+ return new Operators.NotEq(toParquetColumn(fieldRef),
toParquetObject(literal));
+ }
+
+ @Override
+ public FilterPredicate visitLessOrEqual(FieldRef fieldRef, Object
literal) {
+ return new Operators.LtEq(toParquetColumn(fieldRef),
toParquetObject(literal));
+ }
+
+ @Override
+ public FilterPredicate visitEqual(FieldRef fieldRef, Object literal) {
+ return new Operators.Eq(toParquetColumn(fieldRef),
toParquetObject(literal));
+ }
+
+ @Override
+ public FilterPredicate visitGreaterThan(FieldRef fieldRef, Object
literal) {
+ return new Operators.Gt(toParquetColumn(fieldRef),
toParquetObject(literal));
+ }
+
+ @Override
+ public FilterPredicate visitAnd(List<FilterPredicate> children) {
+ if (children.size() != 2) {
+ throw new RuntimeException("Illegal and children: " +
children.size());
+ }
+
+ return FilterApi.and(children.get(0), children.get(1));
+ }
+
+ @Override
+ public FilterPredicate visitOr(List<FilterPredicate> children) {
+ if (children.size() != 2) {
+ throw new RuntimeException("Illegal and children: " +
children.size());
+ }
+
+ return FilterApi.or(children.get(0), children.get(1));
+ }
+
+ @Override
+ public FilterPredicate visitStartsWith(FieldRef fieldRef, Object
literal) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FilterPredicate visitIn(FieldRef fieldRef, List<Object>
literals) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FilterPredicate visitNotIn(FieldRef fieldRef, List<Object>
literals) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static Operators.Column<?> toParquetColumn(FieldRef fieldRef) {
+ return fieldRef.type().accept(new
ConvertToColumnTypeVisitor(fieldRef.name()));
+ }
+
+ private static Comparable<?> toParquetObject(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Number) {
+ return (Comparable<?>) value;
+ } else if (value instanceof String) {
+ return Binary.fromString((String) value);
+ } else if (value instanceof BinaryString) {
+ return Binary.fromString(value.toString());
+ } else if (value instanceof byte[]) {
+ return Binary.fromReusedByteArray((byte[]) value);
+ }
+
+ // TODO Support Decimal and Timestamp
+ throw new UnsupportedOperationException();
+ }
+
+ private static class ConvertToColumnTypeVisitor
+ implements DataTypeVisitor<Operators.Column<?>> {
+
+ private final String name;
+
+ public ConvertToColumnTypeVisitor(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public Operators.Column<?> visit(CharType charType) {
+ return FilterApi.binaryColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(VarCharType varCharType) {
+ return FilterApi.binaryColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(BooleanType booleanType) {
+ return FilterApi.booleanColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(BinaryType binaryType) {
+ return FilterApi.binaryColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(VarBinaryType varBinaryType) {
+ return FilterApi.binaryColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(TinyIntType tinyIntType) {
+ return FilterApi.intColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(SmallIntType smallIntType) {
+ return FilterApi.intColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(IntType intType) {
+ return FilterApi.intColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(BigIntType bigIntType) {
+ return FilterApi.longColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(FloatType floatType) {
+ return FilterApi.floatColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(DoubleType doubleType) {
+ return FilterApi.doubleColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(DateType dateType) {
+ return FilterApi.intColumn(name);
+ }
+
+ @Override
+ public Operators.Column<?> visit(TimeType timeType) {
+ return FilterApi.intColumn(name);
+ }
+
+ // TODO we can support decimal and timestamp
+
+ @Override
+ public Operators.Column<?> visit(DecimalType decimalType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Operators.Column<?> visit(TimestampType timestampType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Operators.Column<?> visit(LocalZonedTimestampType
localZonedTimestampType) {
+ throw new UnsupportedOperationException();
+ }
+
+ // ===================== can not support =========================
+
+ @Override
+ public Operators.Column<?> visit(ArrayType arrayType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Operators.Column<?> visit(MultisetType multisetType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Operators.Column<?> visit(MapType mapType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Operators.Column<?> visit(RowType rowType) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java
new file mode 100644
index 000000000..28b165df6
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet;
+
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
+import org.apache.parquet.filter2.predicate.ParquetFilters;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ParquetFiltersTest {
+
+ @Test
+ public void testLong() {
+ PredicateBuilder builder =
+ new PredicateBuilder(
+ new RowType(
+ Collections.singletonList(
+ new DataField(0, "long1", new
BigIntType()))));
+
+ test(builder.isNull(0), "eq(long1, null)", true);
+
+ test(builder.isNotNull(0), "noteq(long1, null)", true);
+
+ test(builder.lessThan(0, 5L), "lt(long1, 5)", true);
+
+ test(builder.greaterThan(0, 5L), "gt(long1, 5)", true);
+
+ test(
+ builder.in(0, Arrays.asList(1L, 2L, 3L)),
+ "or(or(eq(long1, 1), eq(long1, 2)), eq(long1, 3))",
+ true);
+
+ test(builder.between(0, 1L, 3L), "and(gteq(long1, 1), lteq(long1,
3))", true);
+
+ test(
+ builder.notIn(0, Arrays.asList(1L, 2L, 3L)),
+ "and(and(noteq(long1, 1), noteq(long1, 2)), noteq(long1, 3))",
+ true);
+
+ test(
+ builder.in(0, LongStream.range(1L,
22L).boxed().collect(Collectors.toList())),
+ "",
+ false);
+
+ test(
+ builder.notIn(0, LongStream.range(1L,
22L).boxed().collect(Collectors.toList())),
+ "",
+ false);
+ }
+
+ @Test
+ public void testString() {
+ PredicateBuilder builder =
+ new PredicateBuilder(
+ new RowType(
+ Collections.singletonList(
+ new DataField(0, "string1", new
VarCharType()))));
+ test(builder.isNull(0), "eq(string1, null)", true);
+
+ test(builder.isNotNull(0), "noteq(string1, null)", true);
+
+ test(
+ builder.in(0, Arrays.asList("1", "2", "3")),
+ "or(or(eq(string1, Binary{\"1\"}), eq(string1,
Binary{\"2\"})), eq(string1, Binary{\"3\"}))",
+ true);
+ test(
+ builder.notIn(0, Arrays.asList("1", "2", "3")),
+ "and(and(noteq(string1, Binary{\"1\"}), noteq(string1,
Binary{\"2\"})), noteq(string1, Binary{\"3\"}))",
+ true);
+ }
+
+ private void test(Predicate predicate, String expected, boolean
canPushDown) {
+ FilterCompat.Filter filter =
ParquetFilters.convert(PredicateBuilder.splitAnd(predicate));
+ if (canPushDown) {
+ FilterPredicateCompat compat = (FilterPredicateCompat) filter;
+
assertThat(compat.getFilterPredicate().toString()).isEqualTo(expected);
+ } else {
+ assertThat(filter).isEqualTo(FilterCompat.NOOP);
+ }
+ }
+}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index d56edea59..4457db5e8 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -47,8 +47,8 @@ import org.apache.paimon.types.SmallIntType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.InstantiationUtil;
+import org.apache.parquet.filter2.compat.FilterCompat;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@@ -230,7 +230,8 @@ public class ParquetReadWriteTest {
RowType.builder()
.fields(fieldTypes, new String[] {"f7", "f2",
"f4"})
.build(),
- 500);
+ 500,
+ FilterCompat.NOOP);
AtomicInteger cnt = new AtomicInteger(0);
RecordReader<InternalRow> reader =
@@ -273,7 +274,8 @@ public class ParquetReadWriteTest {
RowType.builder()
.fields(fieldTypes, new String[] {"f7", "f2",
"f4", "f99"})
.build(),
- 500);
+ 500,
+ FilterCompat.NOOP);
AtomicInteger cnt = new AtomicInteger(0);
RecordReader<InternalRow> reader =
@@ -311,7 +313,8 @@ public class ParquetReadWriteTest {
new ParquetReaderFactory(
new Options(),
RowType.builder().fields(fieldTypes, new String[]
{"f7"}).build(),
- batchSize);
+ batchSize,
+ FilterCompat.NOOP);
AtomicInteger cnt = new AtomicInteger(0);
try (RecordReader<InternalRow> reader =
@@ -360,14 +363,8 @@ public class ParquetReadWriteTest {
}
private int testReadingFile(List<Integer> expected, Path path) throws
IOException {
- ParquetReaderFactory format = new ParquetReaderFactory(new Options(),
ROW_TYPE, 500);
-
- // validate java serialization
- try {
- InstantiationUtil.clone(format);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
+ ParquetReaderFactory format =
+ new ParquetReaderFactory(new Options(), ROW_TYPE, 500,
FilterCompat.NOOP);
RecordReader<InternalRow> reader =
format.createReader(