This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 0667c60b chore: Make parquet reader options Comet options instead of 
Hadoop options (#968)
0667c60b is described below

commit 0667c60b8817dcf4fa05ba218cc4a97a5d36c559
Author: Parth Chandra <[email protected]>
AuthorDate: Mon Oct 7 09:00:11 2024 -0700

    chore: Make parquet reader options Comet options instead of Hadoop options 
(#968)
    
    * fix: Make parquet reader options Comet options instead of hadoop options.
    
    * Add additional test with configuration set thru spark sql config
    
    * Add configuration check in unit test
    
    * style fix
---
 .../java/org/apache/comet/parquet/ReadOptions.java | 50 +++++--------
 .../main/scala/org/apache/comet/CometConf.scala    | 42 +++++++++++
 .../org/apache/comet/parquet/TestFileReader.java   |  8 +-
 docs/source/user-guide/configs.md                  |  5 ++
 .../apache/comet/parquet/ParquetReadSuite.scala    | 87 ++++++++++++++++++++++
 5 files changed, 156 insertions(+), 36 deletions(-)

diff --git a/common/src/main/java/org/apache/comet/parquet/ReadOptions.java 
b/common/src/main/java/org/apache/comet/parquet/ReadOptions.java
index 023f71dc..b2889f38 100644
--- a/common/src/main/java/org/apache/comet/parquet/ReadOptions.java
+++ b/common/src/main/java/org/apache/comet/parquet/ReadOptions.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkEnv;
 import org.apache.spark.launcher.SparkLauncher;
 
+import org.apache.comet.CometConf;
+
 /**
  * Comet specific Parquet related read options.
  *
@@ -33,30 +35,6 @@ import org.apache.spark.launcher.SparkLauncher;
  */
 public class ReadOptions {
   private static final Logger LOG = LoggerFactory.getLogger(ReadOptions.class);
-  public static final String COMET_PARQUET_PARALLEL_IO_ENABLED =
-      "comet.parquet.read.parallel.io.enabled";
-  public static final boolean COMET_PARQUET_PARALLEL_IO_ENABLED_DEFAULT = true;
-
-  public static final String COMET_PARQUET_PARALLEL_IO_THREADS =
-      "comet.parquet.read.parallel.io.thread-pool.size";
-  public static final int COMET_PARQUET_PARALLEL_IO_THREADS_DEFAULT = 32;
-
-  public static final String COMET_IO_MERGE_RANGES = 
"comet.parquet.read.io.mergeRanges";
-  private static final boolean COMET_IO_MERGE_RANGES_DEFAULT = true;
-
-  public static final String COMET_IO_MERGE_RANGES_DELTA =
-      "comet.parquet.read.io.mergeRanges.delta";
-  private static final int COMET_IO_MERGE_RANGES_DELTA_DEFAULT = 1 << 23; // 8 
MB
-
-  // In the parallel reader, if the read ranges submitted are skewed in sizes, 
this
-  // option will cause the reader to break up larger read ranges into smaller 
ranges
-  // to reduce the skew. This will result in a slightly larger number of 
connections
-  // opened to the file system but may give improved performance.
-  // The option is off by default.
-  public static final String COMET_IO_ADJUST_READRANGE_SKEW =
-      "comet.parquet.read.io.adjust.readRange.skew";
-
-  private static final boolean COMET_IO_ADJUST_READRANGE_SKEW_DEFAULT = false;
 
   // Max number of concurrent tasks we expect. Used to autoconfigure S3 client 
connections
   public static final int S3A_MAX_EXPECTED_PARALLELISM = 32;
@@ -112,10 +90,6 @@ public class ReadOptions {
     return new Builder(conf);
   }
 
-  public static Builder builder() {
-    return builder(new Configuration());
-  }
-
   public static class Builder {
     private final Configuration conf;
 
@@ -173,14 +147,24 @@ public class ReadOptions {
       this.conf = conf;
       this.parallelIOEnabled =
           conf.getBoolean(
-              COMET_PARQUET_PARALLEL_IO_ENABLED, 
COMET_PARQUET_PARALLEL_IO_ENABLED_DEFAULT);
+              CometConf.COMET_PARQUET_PARALLEL_IO_ENABLED().key(),
+              (Boolean) 
CometConf.COMET_PARQUET_PARALLEL_IO_ENABLED().defaultValue().get());
       this.parallelIOThreadPoolSize =
-          conf.getInt(COMET_PARQUET_PARALLEL_IO_THREADS, 
COMET_PARQUET_PARALLEL_IO_THREADS_DEFAULT);
-      this.ioMergeRanges = conf.getBoolean(COMET_IO_MERGE_RANGES, 
COMET_IO_MERGE_RANGES_DEFAULT);
+          conf.getInt(
+              CometConf.COMET_PARQUET_PARALLEL_IO_THREADS().key(),
+              (Integer) 
CometConf.COMET_PARQUET_PARALLEL_IO_THREADS().defaultValue().get());
+      this.ioMergeRanges =
+          conf.getBoolean(
+              CometConf.COMET_IO_MERGE_RANGES().key(),
+              (boolean) 
CometConf.COMET_IO_MERGE_RANGES().defaultValue().get());
       this.ioMergeRangesDelta =
-          conf.getInt(COMET_IO_MERGE_RANGES_DELTA, 
COMET_IO_MERGE_RANGES_DELTA_DEFAULT);
+          conf.getInt(
+              CometConf.COMET_IO_MERGE_RANGES_DELTA().key(),
+              (Integer) 
CometConf.COMET_IO_MERGE_RANGES_DELTA().defaultValue().get());
       this.adjustReadRangeSkew =
-          conf.getBoolean(COMET_IO_ADJUST_READRANGE_SKEW, 
COMET_IO_ADJUST_READRANGE_SKEW_DEFAULT);
+          conf.getBoolean(
+              CometConf.COMET_IO_ADJUST_READRANGE_SKEW().key(),
+              (Boolean) 
CometConf.COMET_IO_ADJUST_READRANGE_SKEW().defaultValue().get());
       // override some S3 defaults
       setS3Config();
     }
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 03b7a2a4..8223c2cc 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -74,6 +74,48 @@ object CometConf extends ShimCometConf {
     .booleanConf
     .createWithDefault(true)
 
+  val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
+    conf("spark.comet.parquet.read.parallel.io.enabled")
+      .doc(
+        "Whether to enable Comet's parallel reader for Parquet files. The 
parallel reader reads " +
+          "ranges of consecutive data in a  file in parallel. It is faster for 
large files and " +
+          "row groups but uses more resources. The parallel reader is enabled 
by default.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val COMET_PARQUET_PARALLEL_IO_THREADS: ConfigEntry[Int] =
+    conf("spark.comet.parquet.read.parallel.io.thread-pool.size")
+      .doc("The maximum number of parallel threads the parallel reader will 
use in a single " +
+        "executor. For executors configured with a smaller number of cores, 
use a smaller number.")
+      .intConf
+      .createWithDefault(16)
+
+  val COMET_IO_MERGE_RANGES: ConfigEntry[Boolean] =
+    conf("spark.comet.parquet.read.io.mergeRanges")
+      .doc(
+        "When enabled the parallel reader will try to merge ranges of data 
that are separated " +
+          "by less than 'comet.parquet.read.io.mergeRanges.delta' bytes. 
Longer continuous reads " +
+          "are faster on cloud storage. The default behavior is to merge 
consecutive ranges.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val COMET_IO_MERGE_RANGES_DELTA: ConfigEntry[Int] =
+    conf("spark.comet.parquet.read.io.mergeRanges.delta")
+      .doc(
+        "The delta in bytes between consecutive read ranges below which the 
parallel reader " +
+          "will try to merge the ranges. The default is 8MB.")
+      .intConf
+      .createWithDefault(1 << 23) // 8 MB
+
+  val COMET_IO_ADJUST_READRANGE_SKEW: ConfigEntry[Boolean] =
+    conf("spark.comet.parquet.read.io.adjust.readRange.skew")
+      .doc("In the parallel reader, if the read ranges submitted are skewed in 
sizes, this " +
+        "option will cause the reader to break up larger read ranges into 
smaller ranges to " +
+        "reduce the skew. This will result in a slightly larger number of 
connections opened to " +
+        "the file system but may give improved performance. The option is off 
by default.")
+      .booleanConf
+      .createWithDefault(false)
+
   val COMET_CONVERT_FROM_PARQUET_ENABLED: ConfigEntry[Boolean] =
     conf("spark.comet.convert.parquet.enabled")
       .doc(
diff --git a/common/src/test/java/org/apache/comet/parquet/TestFileReader.java 
b/common/src/test/java/org/apache/comet/parquet/TestFileReader.java
index 6e73f751..240aa07a 100644
--- a/common/src/test/java/org/apache/comet/parquet/TestFileReader.java
+++ b/common/src/test/java/org/apache/comet/parquet/TestFileReader.java
@@ -67,6 +67,8 @@ import org.apache.parquet.schema.MessageTypeParser;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Types;
 
+import org.apache.comet.CometConf;
+
 import static org.apache.parquet.column.Encoding.*;
 import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
 import static org.junit.Assert.*;
@@ -615,12 +617,12 @@ public class TestFileReader {
   @Test
   public void testWriteReadMergeScanRange() throws Throwable {
     Configuration conf = new Configuration();
-    conf.set(ReadOptions.COMET_IO_MERGE_RANGES, Boolean.toString(true));
+    conf.set(CometConf.COMET_IO_MERGE_RANGES().key(), Boolean.toString(true));
     // Set the merge range delta so small that ranges do not get merged
-    conf.set(ReadOptions.COMET_IO_MERGE_RANGES_DELTA, Integer.toString(1024));
+    conf.set(CometConf.COMET_IO_MERGE_RANGES_DELTA().key(), 
Integer.toString(1024));
     testReadWrite(conf, 2, 1024);
     // Set the merge range delta so large that all ranges get merged
-    conf.set(ReadOptions.COMET_IO_MERGE_RANGES_DELTA, Integer.toString(1024 * 
1024));
+    conf.set(CometConf.COMET_IO_MERGE_RANGES_DELTA().key(), 
Integer.toString(1024 * 1024));
     testReadWrite(conf, 2, 1024);
   }
 
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index ff2db342..f7ef1d55 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -66,6 +66,11 @@ Comet provides the following configuration settings.
 | spark.comet.memory.overhead.min | Minimum amount of additional memory to be 
allocated per executor process for Comet, in MiB. | 402653184b |
 | spark.comet.nativeLoadRequired | Whether to require Comet native library to 
load successfully when Comet is enabled. If not, Comet will silently fallback 
to Spark when it fails to load the native lib. Otherwise, an error will be 
thrown and the Spark job will be aborted. | false |
 | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte 
buffer when reading Parquet. By default, this is false | false |
+| spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, 
if the read ranges submitted are skewed in sizes, this option will cause the 
reader to break up larger read ranges into smaller ranges to reduce the skew. 
This will result in a slightly larger number of connections opened to the file 
system but may give improved performance. The option is off by default. | false 
|
+| spark.comet.parquet.read.io.mergeRanges | When enabled the parallel reader 
will try to merge ranges of data that are separated by less than 
'comet.parquet.read.io.mergeRanges.delta' bytes. Longer continuous reads are 
faster on cloud storage. The default behavior is to merge consecutive ranges. | 
true |
+| spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between 
consecutive read ranges below which the parallel reader will try to merge the 
ranges. The default is 8MB. | 8388608 |
+| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's 
parallel reader for Parquet files. The parallel reader reads ranges of 
consecutive data in a  file in parallel. It is faster for large files and row 
groups but uses more resources. The parallel reader is enabled by default. | 
true |
+| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number 
of parallel threads the parallel reader will use in a single executor. For 
executors configured with a smaller number of cores, use a smaller number. | 16 
|
 | spark.comet.regexp.allowIncompatible | Comet is not currently fully 
compatible with Spark for all regular expressions. Set this config to true to 
allow them anyway using Rust's regular expression engine. See compatibility 
guide for more information. | false |
 | spark.comet.scan.enabled | Whether to enable native scans. When this is 
turned on, Spark will use Comet to read supported data sources (currently only 
Parquet is supported natively). Note that to enable native vectorized 
execution, both this config and 'spark.comet.exec.enabled' need to be enabled. 
By default, this config is true. | true |
 | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature 
of CometScan. By default is disabled. | false |
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 376139b5..5ddf62c8 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -1280,6 +1280,93 @@ abstract class ParquetReadSuite extends CometTestBase {
       }
     }
   }
+
+  test("test merge scan range") {
+    def makeRawParquetFile(path: Path, n: Int): Seq[Option[Int]] = {
+      val dictionaryPageSize = 1024
+      val pageRowCount = 500
+      val schemaStr =
+        """
+          |message root {
+          |  optional int32   _1(INT_16);
+          |  optional int32   _2;
+          |  optional int64   _3;
+          |}
+        """.stripMargin
+
+      val schema = MessageTypeParser.parseMessageType(schemaStr)
+      val writer = createParquetWriter(
+        schema,
+        path,
+        dictionaryEnabled = true,
+        dictionaryPageSize = dictionaryPageSize,
+        pageRowCountLimit = pageRowCount)
+
+      val rand = scala.util.Random
+      val expected = (0 until n).map { i =>
+        // use a single value for the first page, to make sure dictionary 
encoding kicks in
+        val value = if (i < pageRowCount) i % 8 else i
+        if (rand.nextBoolean()) None
+        else Some(value)
+      }
+
+      expected.foreach { opt =>
+        val record = new SimpleGroup(schema)
+        opt match {
+          case Some(i) =>
+            record.add(0, i.toShort)
+            record.add(1, i)
+            record.add(2, i.toLong)
+          case _ =>
+        }
+        writer.write(record)
+      }
+
+      writer.close()
+      expected
+    }
+
+    Seq(16, 128).foreach { batchSize =>
+      Seq(1024, 1024 * 1024).foreach { mergeRangeDelta =>
+        {
+          withSQLConf(
+            CometConf.COMET_BATCH_SIZE.key -> batchSize.toString,
+            CometConf.COMET_IO_MERGE_RANGES.key -> "true",
+            CometConf.COMET_IO_MERGE_RANGES_DELTA.key -> 
mergeRangeDelta.toString) {
+            withTempDir { dir =>
+              val path = new Path(dir.toURI.toString, "part-r-0.parquet")
+              val expected = makeRawParquetFile(path, 10000)
+              val schema = StructType(
+                Seq(StructField("_1", ShortType, true), StructField("_3", 
LongType, true)))
+              readParquetFile(path.toString, Some(schema)) { df =>
+                {
+                  // CometScanExec calls sessionState.newHadoopConfWithOptions 
which copies
+                  // the sqlConf and some additional options to the hadoopConf 
and then
+                  // uses the result to create the inputRDD 
(https://github.com/apache/datafusion-comet/blob/3783faaa01078a35bee93b299368f8c72869198d/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala#L181).
+                  // We don't have access to the created hadoop Conf, but can 
confirm that the
+                  // result does contain the correct configuration
+                  assert(
+                    df.sparkSession.sessionState
+                      .newHadoopConfWithOptions(Map.empty)
+                      .get(CometConf.COMET_IO_MERGE_RANGES_DELTA.key)
+                      .equals(mergeRangeDelta.toString))
+                  checkAnswer(
+                    df,
+                    expected.map {
+                      case None =>
+                        Row(null, null)
+                      case Some(i) =>
+                        Row(i.toShort, i.toLong)
+                    })
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
   def testScanner(cometEnabled: String, scanner: String, v1: Option[String] = 
None): Unit = {
     withSQLConf(
       CometConf.COMET_ENABLED.key -> cometEnabled,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to