This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new de0be4b4c feat: add experimental remote HDFS support for native
DataFusion reader (#1359)
de0be4b4c is described below
commit de0be4b4c41a4e2d234c8f50d89d172c60064e6b
Author: Oleks V <[email protected]>
AuthorDate: Mon Feb 17 17:25:23 2025 -0800
feat: add experimental remote HDFS support for native DataFusion reader
(#1359)
* feat: add experimental remote HDFS support for native DataFusion reader
---
Makefile | 5 +-
docs/source/user-guide/datasources.md | 78 ++++++++++++++++++++++++++++++
native/core/src/execution/planner.rs | 14 ++----
native/core/src/parquet/parquet_support.rs | 39 +++++++++++++++
4 files changed, 125 insertions(+), 11 deletions(-)
diff --git a/Makefile b/Makefile
index 80f334a34..9382d1aee 100644
--- a/Makefile
+++ b/Makefile
@@ -21,6 +21,9 @@ define spark_jvm_17_extra_args
$(shell ./mvnw help:evaluate -Dexpression=extraJavaTestArgs | grep -v '\[')
endef
+# Build optional Comet native features (like hdfs e.g)
+FEATURES_ARG := $(shell ! [ -z $(COMET_FEATURES) ] && echo
'--features=$(COMET_FEATURES)')
+
all: core jvm
core:
@@ -95,7 +98,7 @@ release-linux: clean
cd native && RUSTFLAGS="-Ctarget-cpu=native
-Ctarget-feature=-prefer-256-bit" cargo build --release
./mvnw install -Prelease -DskipTests $(PROFILES)
release:
- cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
+ cd native && RUSTFLAGS="$(RUSTFLAGS) -Ctarget-cpu=native" cargo build
--release $(FEATURES_ARG)
./mvnw install -Prelease -DskipTests $(PROFILES)
release-nogit:
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
diff --git a/docs/source/user-guide/datasources.md
b/docs/source/user-guide/datasources.md
index 9607ba603..27c5492d8 100644
--- a/docs/source/user-guide/datasources.md
+++ b/docs/source/user-guide/datasources.md
@@ -35,3 +35,81 @@ converted into Arrow format, allowing native execution to
happen after that.
Comet does not provide native JSON scan, but when
`spark.comet.convert.json.enabled` is enabled, data is immediately
converted into Arrow format, allowing native execution to happen after that.
+
+# Supported Storages
+
+## Local
+In progress
+
+## HDFS
+
+Apache DataFusion Comet native reader seamlessly scans files from remote HDFS
for [supported formats](#supported-spark-data-sources)
+
+### Using experimental native DataFusion reader
+Unlike to native Comet reader the Datafusion reader fully supports nested
types processing. This reader is currently experimental only
+
+To build Comet with native DataFusion reader and remote HDFS support it is
required to have a JDK installed
+
+Example:
+Build a Comet for `spark-3.4` provide a JDK path in `JAVA_HOME`
+Provide the JRE linker path in `RUSTFLAGS`, the path can vary depending on the
system. Typically JRE linker is a part of installed JDK
+
+```shell
+export JAVA_HOME="/opt/homebrew/opt/openjdk@11"
+make release PROFILES="-Pspark-3.4" COMET_FEATURES=hdfs RUSTFLAGS="-L
$JAVA_HOME/libexec/openjdk.jdk/Contents/Home/lib/server"
+```
+
+Start Comet with experimental reader and HDFS support as
[described](installation.md/#run-spark-shell-with-comet-enabled)
+and add additional parameters
+
+```shell
+--conf spark.comet.scan.impl=native_datafusion \
+--conf spark.hadoop.fs.defaultFS="hdfs://namenode:9000" \
+--conf spark.hadoop.dfs.client.use.datanode.hostname = true \
+--conf dfs.client.use.datanode.hostname = true
+```
+
+Query a struct type from Remote HDFS
+```shell
+spark.read.parquet("hdfs://namenode:9000/user/data").show(false)
+
+root
+ |-- id: integer (nullable = true)
+ |-- first_name: string (nullable = true)
+ |-- personal_info: struct (nullable = true)
+ | |-- firstName: string (nullable = true)
+ | |-- lastName: string (nullable = true)
+ | |-- ageInYears: integer (nullable = true)
+
+25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.6.0
initialized
+== Physical Plan ==
+* CometColumnarToRow (2)
++- CometNativeScan: (1)
+
+
+(1) CometNativeScan:
+Output [3]: [id#0, first_name#1, personal_info#4]
+Arguments: [id#0, first_name#1, personal_info#4]
+
+(2) CometColumnarToRow [codegen id : 1]
+Input [3]: [id#0, first_name#1, personal_info#4]
+
+
+25/01/30 16:50:44 INFO fs-hdfs-0.1.12/src/hdfs.rs: Connecting to Namenode
(hdfs://namenode:9000)
++---+----------+-----------------+
+|id |first_name|personal_info |
++---+----------+-----------------+
+|2 |Jane |{Jane, Smith, 34}|
+|1 |John |{John, Doe, 28} |
++---+----------+-----------------+
+
+
+
+```
+
+Verify the native scan type should be `CometNativeScan`.
+
+More on [HDFS Reader](../../../native/hdfs/README.md)
+
+## S3
+In progress
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 878b2b7cf..f42a9ed19 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -74,7 +74,7 @@ use
datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio
use crate::execution::shuffle::CompressionCodec;
use crate::execution::spark_plan::SparkPlan;
-use crate::parquet::parquet_support::SparkParquetOptions;
+use crate::parquet::parquet_support::{register_object_store,
SparkParquetOptions};
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
@@ -106,7 +106,6 @@ use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
TreeNodeRewriter},
JoinType as DFJoinType, ScalarValue,
};
-use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::type_coercion::other::get_coerce_type_for_case_expression;
use datafusion_expr::{
AggregateUDF, ReturnTypeArgs, ScalarUDF, WindowFrame, WindowFrameBound,
WindowFrameUnits,
@@ -1165,12 +1164,9 @@ impl PhysicalPlanner {
))
});
- let object_store = object_store::local::LocalFileSystem::new();
- // register the object store with the runtime environment
- let url = Url::try_from("file://").unwrap();
- self.session_ctx
- .runtime_env()
- .register_object_store(&url, Arc::new(object_store));
+ // By default, local FS object store registered
+ // if `hdfs` feature enabled then HDFS file object store
registered
+ let object_store_url =
register_object_store(Arc::clone(&self.session_ctx))?;
// Generate file groups
let mut file_groups: Vec<Vec<PartitionedFile>> =
@@ -1229,8 +1225,6 @@ impl PhysicalPlanner {
// TODO: I think we can remove partition_count in the future,
but leave for testing.
assert_eq!(file_groups.len(), partition_count);
-
- let object_store_url = ObjectStoreUrl::local_filesystem();
let partition_fields: Vec<Field> = partition_schema
.fields()
.iter()
diff --git a/native/core/src/parquet/parquet_support.rs
b/native/core/src/parquet/parquet_support.rs
index 0fa671a30..393265204 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::execution::operators::ExecutionError;
use arrow::{
array::{cast::AsArray, types::Int32Type, Array, ArrayRef},
compute::{cast_with_options, take, CastOptions},
@@ -22,9 +23,11 @@ use arrow::{
};
use arrow_array::{DictionaryArray, StructArray};
use arrow_schema::DataType;
+use datafusion::prelude::SessionContext;
use datafusion_comet_spark_expr::utils::array_with_timezone;
use datafusion_comet_spark_expr::EvalMode;
use datafusion_common::{Result as DataFusionResult, ScalarValue};
+use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::ColumnarValue;
use std::collections::HashMap;
use std::{fmt::Debug, hash::Hash, sync::Arc};
@@ -195,3 +198,39 @@ fn cast_struct_to_struct(
_ => unreachable!(),
}
}
+
+// Default object store which is local filesystem
+#[cfg(not(feature = "hdfs"))]
+pub(crate) fn register_object_store(
+ session_context: Arc<SessionContext>,
+) -> Result<ObjectStoreUrl, ExecutionError> {
+ let object_store = object_store::local::LocalFileSystem::new();
+ let url = ObjectStoreUrl::parse("file://")?;
+ session_context
+ .runtime_env()
+ .register_object_store(url.as_ref(), Arc::new(object_store));
+ Ok(url)
+}
+
+// HDFS object store
+#[cfg(feature = "hdfs")]
+pub(crate) fn register_object_store(
+ session_context: Arc<SessionContext>,
+) -> Result<ObjectStoreUrl, ExecutionError> {
+ // TODO: read the namenode configuration from file schema or from
spark.defaultFS
+ let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?;
+ if let Some(object_store) =
+
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
+ {
+ session_context
+ .runtime_env()
+ .register_object_store(url.as_ref(), Arc::new(object_store));
+
+ return Ok(url);
+ }
+
+ Err(ExecutionError::GeneralError(format!(
+ "HDFS object store cannot be created for {}",
+ url
+ )))
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]