Repository: spark
Updated Branches:
refs/heads/branch-2.0 15637f735 -> 977fbbfca
[SPARK-15639] [SPARK-16321] [SQL] Push down filter at RowGroups level for
parquet reader
The base class `SpecificParquetRecordReaderBase` used for vectorized parquet
reader will try to get pushed-down filters from the given configuration. This
pushed-down filters are used for RowGroups-level filtering. However, we don't
set up the filters to push down into the configuration. In other words, the
filters are not actually pushed down to do RowGroups-level filtering. This
patch is to fix this and tries to set up the filters for pushing down to
configuration for the reader.
The benchmark that excludes the time of writing Parquet file:
test("Benchmark for Parquet") {
val N = 500 << 12
withParquetTable((0 until N).map(i => (101, i)), "t") {
val benchmark = new Benchmark("Parquet reader", N)
benchmark.addCase("reading Parquet file", 10) { iter =>
sql("SELECT _1 FROM t where t._1 < 100").collect()
}
benchmark.run()
}
}
`withParquetTable` in default will run tests for vectorized reader
non-vectorized readers. I only let it run vectorized reader.
When we set the block size of parquet as 1024 to have multiple row groups. The
benchmark is:
Before this patch:
The retrieved row groups: 8063
Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU 3.10GHz
Parquet reader: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
reading Parquet file 825 / 1233 2.5
402.6 1.0X
After this patch:
The retrieved row groups: 0
Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU 3.10GHz
Parquet reader: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
reading Parquet file 306 / 503 6.7
149.6 1.0X
Next, I run the benchmark for non-pushdown case using the same benchmark code
but with disabled pushdown configuration. This time the parquet block size is
default value.
Before this patch:
Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU 3.10GHz
Parquet reader: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
reading Parquet file 136 / 238 15.0
66.5 1.0X
After this patch:
Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU 3.10GHz
Parquet reader: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
reading Parquet file 124 / 193 16.5
60.7 1.0X
For non-pushdown case, from the results, I think this patch doesn't affect
normal code path.
I've manually output the `totalRowCount` in `SpecificParquetRecordReaderBase`
to see if this patch actually filter the row-groups. When running the above
benchmark:
After this patch:
`totalRowCount = 0`
Before this patch:
`totalRowCount = 1024000`
Existing tests should be passed.
Author: Liang-Chi Hsieh <[email protected]>
Closes #13701 from viirya/vectorized-reader-push-down-filter2.
(cherry picked from commit 19af298bb6d264adcf02f6f84c8dc1542b408507)
Signed-off-by: Davies Liu <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/977fbbfc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/977fbbfc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/977fbbfc
Branch: refs/heads/branch-2.0
Commit: 977fbbfcae705dbdbf203bd0a6e7c75a12156d3f
Parents: 15637f7
Author: Liang-Chi Hsieh <[email protected]>
Authored: Wed Aug 10 10:03:55 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Wed Aug 10 10:05:38 2016 -0700
----------------------------------------------------------------------
.../org/apache/spark/executor/TaskMetrics.scala | 9 +
.../org/apache/spark/util/AccumulatorV2.scala | 12 ++
.../SpecificParquetRecordReaderBase.java | 18 ++
.../datasources/parquet/ParquetFileFormat.scala | 6 +
.../parquet/ParquetFilterSuite.scala | 165 +++++++++++--------
5 files changed, 143 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/977fbbfc/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 5bb505b..dd149a9 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -225,6 +225,15 @@ class TaskMetrics private[spark] () extends Serializable {
}
private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums
++ externalAccums
+
+ /**
+ * Looks for a registered accumulator by accumulator name.
+ */
+ private[spark] def lookForAccumulatorByName(name: String):
Option[AccumulatorV2[_, _]] = {
+ accumulators.find { acc =>
+ acc.name.isDefined && acc.name.get == name
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/977fbbfc/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index a9167ce..d130a37 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -23,6 +23,8 @@ import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.JavaConverters._
+
import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext}
import org.apache.spark.scheduler.AccumulableInfo
@@ -257,6 +259,16 @@ private[spark] object AccumulatorContext {
originals.clear()
}
+ /**
+ * Looks for a registered accumulator by accumulator name.
+ */
+ private[spark] def lookForAccumulatorByName(name: String):
Option[AccumulatorV2[_, _]] = {
+ originals.values().asScala.find { ref =>
+ val acc = ref.get
+ acc != null && acc.name.isDefined && acc.name.get == name
+ }.map(_.get)
+ }
+
// Identifier for distinguishing SQL metrics from other accumulators
private[spark] val SQL_ACCUM_IDENTIFIER = "sql"
}
http://git-wip-us.apache.org/repos/asf/spark/blob/977fbbfc/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 0d624d1..b903aee 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -31,6 +31,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import scala.Option;
+
import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
@@ -59,8 +61,12 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.util.AccumulatorV2;
+import org.apache.spark.util.LongAccumulator;
/**
* Base class for custom RecordReaders for Parquet that directly materialize
to `T`.
@@ -144,6 +150,18 @@ public abstract class SpecificParquetRecordReaderBase<T>
extends RecordReader<Vo
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
}
+
+ // For test purpose.
+ // If the predefined accumulator exists, the row group number to read will
be updated
+ // to the accumulator. So we can check if the row groups are filtered or
not in test case.
+ TaskContext taskContext = TaskContext$.MODULE$.get();
+ if (taskContext != null) {
+ Option<AccumulatorV2<?, ?>> accu = (Option<AccumulatorV2<?, ?>>)
taskContext.taskMetrics()
+ .lookForAccumulatorByName("numRowGroups");
+ if (accu.isDefined()) {
+ ((LongAccumulator)accu.get()).add((long)blocks.size());
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/977fbbfc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 5397d50..7e819c7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -46,6 +46,7 @@ import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -355,6 +356,11 @@ private[sql] class ParquetFileFormat
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value,
attemptId)
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
+ }
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
vectorizedReader.initialize(split, hadoopAttemptContext)
http://git-wip-us.apache.org/repos/asf/spark/blob/977fbbfc/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 2a89773..ab92500 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
+import org.apache.spark.util.{AccumulatorContext, LongAccumulator}
/**
* A test suite that tests Parquet filter2 API based filter pushdown
optimization.
@@ -370,73 +371,75 @@ class ParquetFilterSuite extends QueryTest with
ParquetTest with SharedSQLContex
test("SPARK-11103: Filter applied on merged Parquet schema with new column
fails") {
import testImplicits._
-
- withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
- SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
- withTempPath { dir =>
- val pathOne = s"${dir.getCanonicalPath}/table1"
- (1 to 3).map(i => (i, i.toString)).toDF("a",
"b").write.parquet(pathOne)
- val pathTwo = s"${dir.getCanonicalPath}/table2"
- (1 to 3).map(i => (i, i.toString)).toDF("c",
"b").write.parquet(pathTwo)
-
- // If the "c = 1" filter gets pushed down, this query will throw an
exception which
- // Parquet emits. This is a Parquet issue (PARQUET-389).
- val df = spark.read.parquet(pathOne, pathTwo).filter("c =
1").selectExpr("c", "b", "a")
- checkAnswer(
- df,
- Row(1, "1", null))
-
- // The fields "a" and "c" only exist in one Parquet file.
-
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
- val pathThree = s"${dir.getCanonicalPath}/table3"
- df.write.parquet(pathThree)
-
- // We will remove the temporary metadata when writing Parquet file.
- val schema = spark.read.parquet(pathThree).schema
-
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
- val pathFour = s"${dir.getCanonicalPath}/table4"
- val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
- dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
-
- val pathFive = s"${dir.getCanonicalPath}/table5"
- val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
- dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
-
- // If the "s.c = 1" filter gets pushed down, this query will throw an
exception which
- // Parquet emits.
- val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c =
1")
- .selectExpr("s")
- checkAnswer(dfStruct3, Row(Row(null, 1)))
-
- // The fields "s.a" and "s.c" only exist in one Parquet file.
- val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
-
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
- val pathSix = s"${dir.getCanonicalPath}/table6"
- dfStruct3.write.parquet(pathSix)
-
- // We will remove the temporary metadata when writing Parquet file.
- val forPathSix = spark.read.parquet(pathSix).schema
-
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
- // sanity test: make sure optional metadata field is not wrongly set.
- val pathSeven = s"${dir.getCanonicalPath}/table7"
- (1 to 3).map(i => (i, i.toString)).toDF("a",
"b").write.parquet(pathSeven)
- val pathEight = s"${dir.getCanonicalPath}/table8"
- (4 to 6).map(i => (i, i.toString)).toDF("a",
"b").write.parquet(pathEight)
-
- val df2 = spark.read.parquet(pathSeven, pathEight).filter("a =
1").selectExpr("a", "b")
- checkAnswer(
- df2,
- Row(1, "1"))
-
- // The fields "a" and "b" exist in both two Parquet files. No metadata
is set.
-
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
-
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
+ Seq("true", "false").map { vectorized =>
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+ SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+ SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+ withTempPath { dir =>
+ val pathOne = s"${dir.getCanonicalPath}/table1"
+ (1 to 3).map(i => (i, i.toString)).toDF("a",
"b").write.parquet(pathOne)
+ val pathTwo = s"${dir.getCanonicalPath}/table2"
+ (1 to 3).map(i => (i, i.toString)).toDF("c",
"b").write.parquet(pathTwo)
+
+ // If the "c = 1" filter gets pushed down, this query will throw an
exception which
+ // Parquet emits. This is a Parquet issue (PARQUET-389).
+ val df = spark.read.parquet(pathOne, pathTwo).filter("c =
1").selectExpr("c", "b", "a")
+ checkAnswer(
+ df,
+ Row(1, "1", null))
+
+ // The fields "a" and "c" only exist in one Parquet file.
+
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
+ val pathThree = s"${dir.getCanonicalPath}/table3"
+ df.write.parquet(pathThree)
+
+ // We will remove the temporary metadata when writing Parquet file.
+ val schema = spark.read.parquet(pathThree).schema
+
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+ val pathFour = s"${dir.getCanonicalPath}/table4"
+ val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
+ dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
+
+ val pathFive = s"${dir.getCanonicalPath}/table5"
+ val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
+ dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
+
+ // If the "s.c = 1" filter gets pushed down, this query will throw
an exception which
+ // Parquet emits.
+ val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c =
1")
+ .selectExpr("s")
+ checkAnswer(dfStruct3, Row(Row(null, 1)))
+
+ // The fields "s.a" and "s.c" only exist in one Parquet file.
+ val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
+
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
+ val pathSix = s"${dir.getCanonicalPath}/table6"
+ dfStruct3.write.parquet(pathSix)
+
+ // We will remove the temporary metadata when writing Parquet file.
+ val forPathSix = spark.read.parquet(pathSix).schema
+
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+ // sanity test: make sure optional metadata field is not wrongly set.
+ val pathSeven = s"${dir.getCanonicalPath}/table7"
+ (1 to 3).map(i => (i, i.toString)).toDF("a",
"b").write.parquet(pathSeven)
+ val pathEight = s"${dir.getCanonicalPath}/table8"
+ (4 to 6).map(i => (i, i.toString)).toDF("a",
"b").write.parquet(pathEight)
+
+ val df2 = spark.read.parquet(pathSeven, pathEight).filter("a =
1").selectExpr("a", "b")
+ checkAnswer(
+ df2,
+ Row(1, "1"))
+
+ // The fields "a" and "b" exist in both two Parquet files. No
metadata is set.
+
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
+
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
+ }
}
}
}
@@ -559,4 +562,32 @@ class ParquetFilterSuite extends QueryTest with
ParquetTest with SharedSQLContex
assert(df.filter("_1 IS NOT NULL").count() === 4)
}
}
+
+ test("Fiters should be pushed down for vectorized Parquet reader at row
group level") {
+ import testImplicits._
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/table"
+ (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)
+
+ Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map
{ case (push, func) =>
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) {
+ val accu = new LongAccumulator
+ accu.register(sparkContext, Some("numRowGroups"))
+
+ val df = spark.read.parquet(path).filter("a < 100")
+ df.foreachPartition(_.foreach(v => accu.add(0)))
+ df.collect
+
+ val numRowGroups =
AccumulatorContext.lookForAccumulatorByName("numRowGroups")
+ assert(numRowGroups.isDefined)
+ assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value))
+ AccumulatorContext.remove(accu.id)
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]