This is an automated email from the ASF dual-hosted git repository.

kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 49fa28735 feat: introduce hadoop mini cluster to test native scan on 
hdfs (#1556)
49fa28735 is described below

commit 49fa28735e3bff490c08445ab6a49db5418fc9fd
Author: Zhen Wang <[email protected]>
AuthorDate: Wed Mar 26 06:08:27 2025 +0800

    feat: introduce hadoop mini cluster to test native scan on hdfs (#1556)
    
    ## Which issue does this PR close?
    
    Closes #1515.
    
    ## Rationale for this change
    
    test native scan on hdfs
    
    ## What changes are included in this PR?
    
    introduce hadoop mini cluster to test native scan on hdfs
    
    ## How are these changes tested?
    
    Successfully run `CometReadHdfsBenchmark` locally (tips: build native 
enable hdfs: `cd native && cargo build --features hdfs`)
---
 native/core/Cargo.toml                             |   2 +-
 pom.xml                                            |   8 ++
 spark/pom.xml                                      |  24 +++++
 .../scala/org/apache/comet/WithHdfsCluster.scala   | 109 +++++++++++++++++++++
 .../spark/sql/benchmark/CometReadBenchmark.scala   |  70 ++++++++++++-
 5 files changed, 210 insertions(+), 3 deletions(-)

diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index bc9197223..e964b6869 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -68,7 +68,7 @@ datafusion-comet-proto = { workspace = true }
 object_store = { workspace = true }
 url = { workspace = true }
 parking_lot = "0.12.3"
-datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true}
+datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, 
default-features = false, features = ["hdfs"] }
 
 [dev-dependencies]
 pprof = { version = "0.14.0", features = ["flamegraph"] }
diff --git a/pom.xml b/pom.xml
index fa0dfc7c3..9c0339f16 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@ under the License.
     <protobuf.version>3.25.5</protobuf.version>
     <parquet.version>1.13.1</parquet.version>
     <parquet.maven.scope>provided</parquet.maven.scope>
+    <hadoop.version>3.3.4</hadoop.version>
     <arrow.version>16.0.0</arrow.version>
     <codehaus.jackson.version>1.9.13</codehaus.jackson.version>
     <spotless.version>2.43.0</spotless.version>
@@ -447,6 +448,13 @@ under the License.
         <version>5.1.0</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-client-minicluster</artifactId>
+        <version>${hadoop.version}</version>
+        <scope>test</scope>
+      </dependency>
+
     </dependencies>
 
   </dependencyManagement>
diff --git a/spark/pom.xml b/spark/pom.xml
index d28b5d29e..80150d024 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -141,6 +141,30 @@ under the License.
       <artifactId>arrow-c-data</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-minicluster</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <!-- hadoop clients are provided by spark -->
+        <exclusion>
+          <artifactId>hadoop-client-api</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-client-runtime</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>snappy-java</artifactId>
+          <groupId>org.xerial.snappy</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>junit</artifactId>
+          <groupId>junit</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala 
b/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala
new file mode 100644
index 000000000..49124d63e
--- /dev/null
+++ b/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet
+
+import java.io.{File, FileWriter}
+import java.net.InetAddress
+import java.nio.file.Files
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hdfs.MiniDFSCluster
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys
+import org.apache.spark.internal.Logging
+
+/**
+ * Trait for starting and stopping a MiniDFSCluster for testing.
+ *
+ * Most copy from:
+ * 
https://github.com/apache/kyuubi/blob/master/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
+ */
+trait WithHdfsCluster extends Logging {
+
+  private var hadoopConfDir: File = _
+  private var hdfsCluster: MiniDFSCluster = _
+  private var hdfsConf: Configuration = _
+  private var tmpRootDir: Path = _
+  private var fileSystem: FileSystem = _
+
+  def startHdfsCluster(): Unit = {
+    hdfsConf = new Configuration()
+    // before HADOOP-18206 (3.4.0), HDFS MetricsLogger strongly depends on
+    // commons-logging, we should disable it explicitly, otherwise, it throws
+    // ClassNotFound: org.apache.commons.logging.impl.Log4JLogger
+    hdfsConf.set("dfs.namenode.metrics.logger.period.seconds", "0")
+    hdfsConf.set("dfs.datanode.metrics.logger.period.seconds", "0")
+    // Set bind host to localhost to avoid java.net.BindException
+    hdfsConf.setIfUnset("dfs.namenode.rpc-bind-host", "localhost")
+
+    hdfsCluster = new MiniDFSCluster.Builder(hdfsConf)
+      .checkDataNodeAddrConfig(true)
+      .checkDataNodeHostConfig(true)
+      .build()
+    logInfo(
+      "NameNode address in configuration is " +
+        s"${hdfsConf.get(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)}")
+    hadoopConfDir =
+      
Files.createTempDirectory(s"comet_hdfs_conf_${UUID.randomUUID().toString}").toFile
+    saveHadoopConf(hadoopConfDir)
+
+    fileSystem = hdfsCluster.getFileSystem
+    tmpRootDir = new Path("/tmp")
+    fileSystem.mkdirs(tmpRootDir)
+  }
+
+  def stopHdfsCluster(): Unit = {
+    if (hdfsCluster != null) hdfsCluster.shutdown(true)
+    if (hadoopConfDir != null) FileUtils.deleteDirectory(hadoopConfDir)
+  }
+
+  private def saveHadoopConf(hadoopConfDir: File): Unit = {
+    val configToWrite = new Configuration(false)
+    val hostName = InetAddress.getLocalHost.getHostName
+    hdfsConf.iterator().asScala.foreach { kv =>
+      val key = kv.getKey
+      val value = kv.getValue.replaceAll(hostName, "localhost")
+      configToWrite.set(key, value)
+    }
+    val file = new File(hadoopConfDir, "core-site.xml")
+    val writer = new FileWriter(file)
+    configToWrite.writeXml(writer)
+    writer.close()
+  }
+
+  def getHadoopConf: Configuration = hdfsConf
+  def getDFSPort: Int = hdfsCluster.getNameNodePort
+  def getHadoopConfDir: String = hadoopConfDir.getAbsolutePath
+  def getHadoopConfFile: Path = new Path(hadoopConfDir.toURI.toURL.toString, 
"core-site.xml")
+  def getTmpRootDir: Path = tmpRootDir
+  def getFileSystem: FileSystem = fileSystem
+
+  def withTmpHdfsDir(tmpDir: Path => Unit): Unit = {
+    val tempPath = new Path(tmpRootDir, UUID.randomUUID().toString)
+    fileSystem.mkdirs(tempPath)
+    try tmpDir(tempPath)
+    finally fileSystem.delete(tempPath, true)
+  }
+
+}
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
index 5f40ab2d7..fcfbf6074 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
@@ -24,13 +24,15 @@ import java.io.File
 import scala.collection.JavaConverters._
 import scala.util.Random
 
+import org.apache.hadoop.fs.Path
 import org.apache.spark.TestUtils
 import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.{DataFrame, SparkSession}
 import 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnVector
 
-import org.apache.comet.CometConf
+import org.apache.comet.{CometConf, WithHdfsCluster}
 import org.apache.comet.CometConf.{SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, 
SCAN_NATIVE_ICEBERG_COMPAT}
 import org.apache.comet.parquet.BatchReader
 
@@ -40,7 +42,7 @@ import org.apache.comet.parquet.BatchReader
  * benchmark-org.apache.spark.sql.benchmark.CometReadBenchmark` Results will 
be written to
  * "spark/benchmarks/CometReadBenchmark-**results.txt".
  */
-object CometReadBenchmark extends CometBenchmarkBase {
+class CometReadBaseBenchmark extends CometBenchmarkBase {
 
   def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
     // Benchmarks running through spark sql.
@@ -71,6 +73,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
             spark.sql(s"select $query from parquetV1Table").noop()
           }
@@ -79,6 +82,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ 
=>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
             spark.sql(s"select $query from parquetV1Table").noop()
           }
@@ -118,6 +122,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
             spark.sql("select sum(id) from parquetV1Table").noop()
           }
@@ -126,6 +131,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ 
=>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
             spark.sql("select sum(id) from parquetV1Table").noop()
           }
@@ -244,6 +250,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
             spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 
0").noop()
           }
@@ -252,6 +259,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
             spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 
0").noop()
           }
@@ -300,6 +308,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
             spark.sql("select sum(length(id)) from parquetV1Table").noop()
           }
@@ -308,6 +317,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ 
=>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
             spark.sql("select sum(length(id)) from parquetV1Table").noop()
           }
@@ -352,6 +362,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
             spark
               .sql("select sum(length(c2)) from parquetV1Table where c1 is " +
@@ -363,6 +374,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
             spark
               .sql("select sum(length(c2)) from parquetV1Table where c1 is " +
@@ -403,6 +415,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
             spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
           }
@@ -411,6 +424,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
             spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
           }
@@ -452,6 +466,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
             spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
           }
@@ -460,6 +475,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
             spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
           }
@@ -501,6 +517,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
             spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
           }
@@ -509,6 +526,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
         benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
           withSQLConf(
             CometConf.COMET_ENABLED.key -> "true",
+            CometConf.COMET_EXEC_ENABLED.key -> "true",
             CometConf.COMET_NATIVE_SCAN_IMPL.key -> 
SCAN_NATIVE_ICEBERG_COMPAT) {
             spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
           }
@@ -587,3 +605,51 @@ object CometReadBenchmark extends CometBenchmarkBase {
     }
   }
 }
+
+object CometReadBenchmark extends CometReadBaseBenchmark {}
+
+object CometReadHdfsBenchmark extends CometReadBaseBenchmark with 
WithHdfsCluster {
+
+  override def getSparkSession: SparkSession = {
+    // start HDFS cluster and add hadoop conf
+    startHdfsCluster()
+    val sparkSession = super.getSparkSession
+    
sparkSession.sparkContext.hadoopConfiguration.addResource(getHadoopConfFile)
+    sparkSession
+  }
+
+  override def runCometBenchmark(mainArgs: Array[String]): Unit = {
+    try {
+      super.runCometBenchmark(mainArgs)
+    } finally {
+      stopHdfsCluster()
+    }
+  }
+
+  override def readerBenchmark(values: Int, dataType: DataType): Unit = {
+    // ignore reader benchmark for HDFS
+  }
+
+  // mock local dir to hdfs
+  override protected def withTempPath(f: File => Unit): Unit = {
+    super.withTempPath { dir =>
+      val tempHdfsPath = new Path(getTmpRootDir, dir.getName)
+      getFileSystem.mkdirs(tempHdfsPath)
+      try f(dir)
+      finally getFileSystem.delete(tempHdfsPath, true)
+    }
+  }
+  override protected def prepareTable(
+      dir: File,
+      df: DataFrame,
+      partition: Option[String]): Unit = {
+    val testDf = if (partition.isDefined) {
+      df.write.partitionBy(partition.get)
+    } else {
+      df.write
+    }
+    val tempHdfsPath = getFileSystem.resolvePath(new Path(getTmpRootDir, 
dir.getName))
+    val parquetV1Path = new Path(tempHdfsPath, "parquetV1")
+    saveAsParquetV1Table(testDf, parquetV1Path.toString)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to