This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new de0be4b4c feat: add experimental remote HDFS support for native 
DataFusion reader (#1359)
de0be4b4c is described below

commit de0be4b4c41a4e2d234c8f50d89d172c60064e6b
Author: Oleks V <[email protected]>
AuthorDate: Mon Feb 17 17:25:23 2025 -0800

    feat: add experimental remote HDFS support for native DataFusion reader 
(#1359)
    
    * feat: add experimental remote HDFS support for native DataFusion reader
---
 Makefile                                   |  5 +-
 docs/source/user-guide/datasources.md      | 78 ++++++++++++++++++++++++++++++
 native/core/src/execution/planner.rs       | 14 ++----
 native/core/src/parquet/parquet_support.rs | 39 +++++++++++++++
 4 files changed, 125 insertions(+), 11 deletions(-)

diff --git a/Makefile b/Makefile
index 80f334a34..9382d1aee 100644
--- a/Makefile
+++ b/Makefile
@@ -21,6 +21,9 @@ define spark_jvm_17_extra_args
 $(shell ./mvnw help:evaluate -Dexpression=extraJavaTestArgs | grep -v '\[')
 endef
 
+# Build optional Comet native features (like hdfs e.g)
+FEATURES_ARG := $(shell ! [ -z $(COMET_FEATURES) ] && echo 
'--features=$(COMET_FEATURES)')
+
 all: core jvm
 
 core:
@@ -95,7 +98,7 @@ release-linux: clean
        cd native && RUSTFLAGS="-Ctarget-cpu=native 
-Ctarget-feature=-prefer-256-bit" cargo build --release
        ./mvnw install -Prelease -DskipTests $(PROFILES)
 release:
-       cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
+       cd native && RUSTFLAGS="$(RUSTFLAGS) -Ctarget-cpu=native" cargo build 
--release $(FEATURES_ARG)
        ./mvnw install -Prelease -DskipTests $(PROFILES)
 release-nogit:
        cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
diff --git a/docs/source/user-guide/datasources.md 
b/docs/source/user-guide/datasources.md
index 9607ba603..27c5492d8 100644
--- a/docs/source/user-guide/datasources.md
+++ b/docs/source/user-guide/datasources.md
@@ -35,3 +35,81 @@ converted into Arrow format, allowing native execution to 
happen after that.
 
 Comet does not provide native JSON scan, but when 
`spark.comet.convert.json.enabled` is enabled, data is immediately
 converted into Arrow format, allowing native execution to happen after that.
+
+# Supported Storages
+
+## Local
+In progress
+
+## HDFS
+
+Apache DataFusion Comet native reader seamlessly scans files from remote HDFS 
for [supported formats](#supported-spark-data-sources)
+
+### Using experimental native DataFusion reader
+Unlike to native Comet reader the Datafusion reader fully supports nested 
types processing. This reader is currently experimental only
+
+To build Comet with native DataFusion reader and remote HDFS support it is 
required to have a JDK installed
+
+Example:
+Build a Comet for `spark-3.4` provide a JDK path in `JAVA_HOME` 
+Provide the JRE linker path in `RUSTFLAGS`, the path can vary depending on the 
system. Typically JRE linker is a part of installed JDK
+
+```shell
+export JAVA_HOME="/opt/homebrew/opt/openjdk@11"
+make release PROFILES="-Pspark-3.4" COMET_FEATURES=hdfs RUSTFLAGS="-L 
$JAVA_HOME/libexec/openjdk.jdk/Contents/Home/lib/server"
+```
+
+Start Comet with experimental reader and HDFS support as 
[described](installation.md/#run-spark-shell-with-comet-enabled)
+and add additional parameters 
+
+```shell
+--conf spark.comet.scan.impl=native_datafusion \
+--conf spark.hadoop.fs.defaultFS="hdfs://namenode:9000" \
+--conf spark.hadoop.dfs.client.use.datanode.hostname = true \
+--conf dfs.client.use.datanode.hostname = true
+```
+
+Query a struct type from Remote HDFS 
+```shell
+spark.read.parquet("hdfs://namenode:9000/user/data").show(false)
+
+root
+ |-- id: integer (nullable = true)
+ |-- first_name: string (nullable = true)
+ |-- personal_info: struct (nullable = true)
+ |    |-- firstName: string (nullable = true)
+ |    |-- lastName: string (nullable = true)
+ |    |-- ageInYears: integer (nullable = true)
+
+25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.6.0 
initialized
+== Physical Plan ==
+* CometColumnarToRow (2)
++- CometNativeScan:  (1)
+
+
+(1) CometNativeScan: 
+Output [3]: [id#0, first_name#1, personal_info#4]
+Arguments: [id#0, first_name#1, personal_info#4]
+
+(2) CometColumnarToRow [codegen id : 1]
+Input [3]: [id#0, first_name#1, personal_info#4]
+
+
+25/01/30 16:50:44 INFO fs-hdfs-0.1.12/src/hdfs.rs: Connecting to Namenode 
(hdfs://namenode:9000)
++---+----------+-----------------+
+|id |first_name|personal_info    |
++---+----------+-----------------+
+|2  |Jane      |{Jane, Smith, 34}|
+|1  |John      |{John, Doe, 28}  |
++---+----------+-----------------+
+
+
+
+```
+
+Verify the native scan type should be `CometNativeScan`.
+
+More on [HDFS Reader](../../../native/hdfs/README.md)
+
+## S3
+In progress 
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 878b2b7cf..f42a9ed19 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -74,7 +74,7 @@ use 
datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio
 
 use crate::execution::shuffle::CompressionCodec;
 use crate::execution::spark_plan::SparkPlan;
-use crate::parquet::parquet_support::SparkParquetOptions;
+use crate::parquet::parquet_support::{register_object_store, 
SparkParquetOptions};
 use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
@@ -106,7 +106,6 @@ use datafusion_common::{
     tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, 
TreeNodeRewriter},
     JoinType as DFJoinType, ScalarValue,
 };
-use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_expr::type_coercion::other::get_coerce_type_for_case_expression;
 use datafusion_expr::{
     AggregateUDF, ReturnTypeArgs, ScalarUDF, WindowFrame, WindowFrameBound, 
WindowFrameUnits,
@@ -1165,12 +1164,9 @@ impl PhysicalPlanner {
                     ))
                 });
 
-                let object_store = object_store::local::LocalFileSystem::new();
-                // register the object store with the runtime environment
-                let url = Url::try_from("file://").unwrap();
-                self.session_ctx
-                    .runtime_env()
-                    .register_object_store(&url, Arc::new(object_store));
+                // By default, local FS object store registered
+                // if `hdfs` feature enabled then HDFS file object store 
registered
+                let object_store_url = 
register_object_store(Arc::clone(&self.session_ctx))?;
 
                 // Generate file groups
                 let mut file_groups: Vec<Vec<PartitionedFile>> =
@@ -1229,8 +1225,6 @@ impl PhysicalPlanner {
 
                 // TODO: I think we can remove partition_count in the future, 
but leave for testing.
                 assert_eq!(file_groups.len(), partition_count);
-
-                let object_store_url = ObjectStoreUrl::local_filesystem();
                 let partition_fields: Vec<Field> = partition_schema
                     .fields()
                     .iter()
diff --git a/native/core/src/parquet/parquet_support.rs 
b/native/core/src/parquet/parquet_support.rs
index 0fa671a30..393265204 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::execution::operators::ExecutionError;
 use arrow::{
     array::{cast::AsArray, types::Int32Type, Array, ArrayRef},
     compute::{cast_with_options, take, CastOptions},
@@ -22,9 +23,11 @@ use arrow::{
 };
 use arrow_array::{DictionaryArray, StructArray};
 use arrow_schema::DataType;
+use datafusion::prelude::SessionContext;
 use datafusion_comet_spark_expr::utils::array_with_timezone;
 use datafusion_comet_spark_expr::EvalMode;
 use datafusion_common::{Result as DataFusionResult, ScalarValue};
+use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_expr::ColumnarValue;
 use std::collections::HashMap;
 use std::{fmt::Debug, hash::Hash, sync::Arc};
@@ -195,3 +198,39 @@ fn cast_struct_to_struct(
         _ => unreachable!(),
     }
 }
+
+// Default object store which is local filesystem
+#[cfg(not(feature = "hdfs"))]
+pub(crate) fn register_object_store(
+    session_context: Arc<SessionContext>,
+) -> Result<ObjectStoreUrl, ExecutionError> {
+    let object_store = object_store::local::LocalFileSystem::new();
+    let url = ObjectStoreUrl::parse("file://")?;
+    session_context
+        .runtime_env()
+        .register_object_store(url.as_ref(), Arc::new(object_store));
+    Ok(url)
+}
+
+// HDFS object store
+#[cfg(feature = "hdfs")]
+pub(crate) fn register_object_store(
+    session_context: Arc<SessionContext>,
+) -> Result<ObjectStoreUrl, ExecutionError> {
+    // TODO: read the namenode configuration from file schema or from 
spark.defaultFS
+    let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?;
+    if let Some(object_store) =
+        
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
+    {
+        session_context
+            .runtime_env()
+            .register_object_store(url.as_ref(), Arc::new(object_store));
+
+        return Ok(url);
+    }
+
+    Err(ExecutionError::GeneralError(format!(
+        "HDFS object store cannot be created for {}",
+        url
+    )))
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to