This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e40fce9  [SPARK-34562][SQL] Add test and doc for Parquet Bloom filter 
push down
e40fce9 is described below

commit e40fce919ab77f5faeb0bbd34dc86c56c04adbaa
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Mon Apr 12 17:07:35 2021 +0300

    [SPARK-34562][SQL] Add test and doc for Parquet Bloom filter push down
    
    ### What changes were proposed in this pull request?
    
    This pr add test and document for Parquet Bloom filter push down.
    
    ### Why are the changes needed?
    
    Improve document.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Generating docs:
    
![image](https://user-images.githubusercontent.com/5399861/114327472-c131bb80-9b6b-11eb-87a0-6f9a74eb1097.png)
    
    Closes #32123 from wangyum/SPARK-34562.
    
    Authored-by: Yuming Wang <yumw...@ebay.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 docs/sql-data-sources-load-save-functions.md       | 46 +++++++++++++++++++++-
 .../examples/sql/JavaSQLDataSourceExample.java     |  8 ++++
 examples/src/main/python/sql/datasource.py         | 10 +++++
 examples/src/main/r/RSparkSQLExample.R             |  5 +++
 .../spark/examples/sql/SQLDataSourceExample.scala  |  8 ++++
 .../datasources/parquet/ParquetFilterSuite.scala   | 29 ++++++++++++++
 6 files changed, 104 insertions(+), 2 deletions(-)

diff --git a/docs/sql-data-sources-load-save-functions.md 
b/docs/sql-data-sources-load-save-functions.md
index 0866f37..25df34e 100644
--- a/docs/sql-data-sources-load-save-functions.md
+++ b/docs/sql-data-sources-load-save-functions.md
@@ -105,9 +105,11 @@ To load a CSV file you can use:
 The extra options are also used during write operation.
 For example, you can control bloom filters and dictionary encodings for ORC 
data sources.
 The following ORC example will create bloom filter and use dictionary encoding 
only for `favorite_color`.
-For Parquet, there exists `parquet.enable.dictionary`, too.
+For Parquet, there exists `parquet.bloom.filter.enabled` and 
`parquet.enable.dictionary`, too.
 To find more detailed information about the extra ORC/Parquet options,
-visit the official Apache ORC/Parquet websites.
+visit the official Apache [ORC](https://orc.apache.org/docs/spark-config.html) 
/ [Parquet](https://github.com/apache/parquet-mr/tree/master/parquet-hadoop) 
websites.
+
+ORC data source:
 
 <div class="codetabs">
 
@@ -146,6 +148,46 @@ OPTIONS (
 
 </div>
 
+Parquet data source:
+
+<div class="codetabs">
+
+<div data-lang="scala"  markdown="1">
+{% include_example manual_save_options_parquet 
scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
+</div>
+
+<div data-lang="java"  markdown="1">
+{% include_example manual_save_options_parquet 
java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
+</div>
+
+<div data-lang="python"  markdown="1">
+{% include_example manual_save_options_parquet python/sql/datasource.py %}
+</div>
+
+<div data-lang="r"  markdown="1">
+{% include_example manual_save_options_parquet r/RSparkSQLExample.R %}
+</div>
+
+<div data-lang="SQL"  markdown="1">
+
+{% highlight sql %}
+CREATE TABLE users_with_options (
+  name STRING,
+  favorite_color STRING,
+  favorite_numbers array<integer>
+) USING parquet
+OPTIONS (
+  `parquet.bloom.filter.enabled#favorite_color` true,
+  `parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
+  parquet.enable.dictionary true,
+  parquet.page.write-checksum.enabled true
+)
+{% endhighlight %}
+
+</div>
+
+</div>
+
 ### Run SQL on files directly
 
 Instead of using read API to load a file into DataFrame and query it, you can 
also query that
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index 53eb8fd..5dcf321 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -195,6 +195,14 @@ public class JavaSQLDataSourceExample {
       .option("orc.column.encoding.direct", "name")
       .save("users_with_options.orc");
     // $example off:manual_save_options_orc$
+    // $example on:manual_save_options_parquet$
+    usersDF.write().format("parquet")
+        .option("parquet.bloom.filter.enabled#favorite_color", "true")
+        .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
+        .option("parquet.enable.dictionary", "true")
+        .option("parquet.page.write-checksum.enabled", "false")
+        .save("users_with_options.parquet");
+    // $example off:manual_save_options_parquet$
     // $example on:direct_sql$
     Dataset<Row> sqlDF =
       spark.sql("SELECT * FROM 
parquet.`examples/src/main/resources/users.parquet`");
diff --git a/examples/src/main/python/sql/datasource.py 
b/examples/src/main/python/sql/datasource.py
index f3ad65f..4d7aa04 100644
--- a/examples/src/main/python/sql/datasource.py
+++ b/examples/src/main/python/sql/datasource.py
@@ -126,6 +126,16 @@ def basic_datasource_example(spark):
         .save("users_with_options.orc"))
     # $example off:manual_save_options_orc$
 
+    # $example on:manual_save_options_parquet$
+    df = spark.read.parquet("examples/src/main/resources/users.parquet")
+    (df.write.format("parquet")
+        .option("parquet.bloom.filter.enabled#favorite_color", "true")
+        .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
+        .option("parquet.enable.dictionary", "true")
+        .option("parquet.page.write-checksum.enabled", "false")
+        .save("users_with_options.parquet"))
+    # $example off:manual_save_options_parquet$
+
     # $example on:write_sorting_and_bucketing$
     df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
     # $example off:write_sorting_and_bucketing$
diff --git a/examples/src/main/r/RSparkSQLExample.R 
b/examples/src/main/r/RSparkSQLExample.R
index 86ad533..15118e1 100644
--- a/examples/src/main/r/RSparkSQLExample.R
+++ b/examples/src/main/r/RSparkSQLExample.R
@@ -157,6 +157,11 @@ df <- read.df("examples/src/main/resources/users.orc", 
"orc")
 write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = 
"favorite_color", orc.dictionary.key.threshold = 1.0, 
orc.column.encoding.direct = "name")
 # $example off:manual_save_options_orc$
 
+# $example on:manual_save_options_parquet$
+df <- read.df("examples/src/main/resources/users.parquet", "parquet")
+write.parquet(df, "users_with_options.parquet", 
parquet.bloom.filter.enabled#favorite_color = true, 
parquet.bloom.filter.expected.ndv#favorite_color = 1000000, 
parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
+# $example off:manual_save_options_parquet$
+
 # $example on:direct_sql$
 df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
 # $example off:direct_sql$
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
index 207961b..6bd2bd6 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
@@ -129,6 +129,14 @@ object SQLDataSourceExample {
       .option("orc.column.encoding.direct", "name")
       .save("users_with_options.orc")
     // $example off:manual_save_options_orc$
+    // $example on:manual_save_options_parquet$
+    usersDF.write.format("parquet")
+      .option("parquet.bloom.filter.enabled#favorite_color", "true")
+      .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
+      .option("parquet.enable.dictionary", "true")
+      .option("parquet.page.write-checksum.enabled", "false")
+      .save("users_with_options.parquet")
+    // $example off:manual_save_options_parquet$
 
     // $example on:direct_sql$
     val sqlDF = spark.sql("SELECT * FROM 
parquet.`examples/src/main/resources/users.parquet`")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 329a3e4..94bda56 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -1634,6 +1634,35 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
       }
     }
   }
+
+  test("SPARK-34562: Bloom filter push down") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      spark.range(100).selectExpr("id * 2 AS id")
+        .write
+        .option(ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#id", true)
+        // Disable dictionary because the distinct values less than 40000.
+        .option(ParquetOutputFormat.ENABLE_DICTIONARY, false)
+        .parquet(path)
+
+      Seq(true, false).foreach { bloomFilterEnabled =>
+        withSQLConf(ParquetInputFormat.BLOOM_FILTERING_ENABLED -> 
bloomFilterEnabled.toString) {
+          val accu = new NumRowGroupsAcc
+          sparkContext.register(accu)
+
+          val df = spark.read.parquet(path).filter("id = 19")
+          df.foreachPartition((it: Iterator[Row]) => it.foreach(_ => 
accu.add(0)))
+          if (bloomFilterEnabled) {
+            assert(accu.value === 0)
+          } else {
+            assert(accu.value > 0)
+          }
+
+          AccumulatorContext.remove(accu.id)
+        }
+      }
+    }
+  }
 }
 
 @ExtendedSQLTest

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to