parthchandra commented on code in PR #1992:
URL: https://github.com/apache/datafusion-comet/pull/1992#discussion_r2193617615


##########
native/core/src/parquet/objectstore/jni_hdfs.rs:
##########
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use bytes::Bytes;
+use chrono::Utc;
+use futures::{stream, stream::BoxStream};
+use jni::{
+    objects::{JClass, JObject, JValue},
+    JNIEnv, JavaVM,
+};
+use object_store::{
+    path::Path, Attributes, Error as ObjectStoreError, GetOptions, GetRange, 
GetResult,
+    GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, 
PutMultipartOpts,
+    PutOptions, PutPayload, PutResult,
+};
+use once_cell::sync::OnceCell;
+
+static JVM: OnceCell<JavaVM> = OnceCell::new();
+
+pub fn init_jvm(env: &JNIEnv) {
+    let _ = JVM.set(env.get_java_vm().expect("Failed to get JavaVM"));
+}
+
+fn get_jni_env<'a>() -> jni::AttachGuard<'a> {
+    JVM.get()
+        .expect("JVM not initialized")
+        .attach_current_thread()
+        .expect("Failed to attach thread")
+}
+

Review Comment:
   We are already executing this code in a JVM and the `JNIEnv` is available to 
us in `(jni_api.rs) Java_org_apache_comet_Native_executePlan` We could probably 
pass that in all the way here.



##########
spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala:
##########
@@ -103,19 +103,26 @@ class ParquetReadFromS3Suite extends CometTestBase with 
AdaptiveSparkPlanHelper
   }
 
   test("read parquet file from MinIO") {
+    runParquetScanAndAssert()
+  }
+
+  test("Comet uses JNI object store when use_jni is true") {
+    spark.conf.set("spark.comet.use_jni_object_store", "true")
+    runParquetScanAndAssert()

Review Comment:
   Is there a way we can verify the jni implementation did get called?



##########
native/core/src/parquet/parquet_support.rs:
##########
@@ -382,8 +390,29 @@ pub(crate) fn prepare_object_store_with_configs(
         &url[url::Position::BeforeHost..url::Position::AfterPort],
     );
 
+    let mut hadoop_schemes = HashSet::new();
+    for (key, _) in object_store_configs.iter() {
+        if let Some(scheme) = key
+            .strip_prefix("spark.hadoop.fs.")
+            .and_then(|k| k.strip_suffix(".impl"))
+        {
+            hadoop_schemes.insert(scheme.to_string());
+        }
+    }
+
     let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if 
scheme == "hdfs" {
         parse_hdfs_url(&url)
+    } else if use_jni && hadoop_schemes.contains(scheme) {

Review Comment:
   I think this check should come first, even before the s3a scheme has been 
renamed to s3. Note that a user could override the implementation to be used 
for the `hdfs` scheme.
   Either way, if the `use_jni flag` is set, we should use jni if the scheme is 
`hdfs` or if the config specifies an implementation for the scheme.
   Also , we can rename s3a to s3 only if `use_jni` is false; this way we won't 
be renaminbg s3a back and forth.



##########
native/core/src/parquet/objectstore/jni_hdfs.rs:
##########
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use bytes::Bytes;
+use chrono::Utc;
+use futures::{stream, stream::BoxStream};
+use jni::{
+    objects::{JClass, JObject, JValue},
+    JNIEnv, JavaVM,
+};
+use object_store::{
+    path::Path, Attributes, Error as ObjectStoreError, GetOptions, GetRange, 
GetResult,
+    GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, 
PutMultipartOpts,
+    PutOptions, PutPayload, PutResult,
+};
+use once_cell::sync::OnceCell;
+
+static JVM: OnceCell<JavaVM> = OnceCell::new();
+
+pub fn init_jvm(env: &JNIEnv) {
+    let _ = JVM.set(env.get_java_vm().expect("Failed to get JavaVM"));
+}
+
+fn get_jni_env<'a>() -> jni::AttachGuard<'a> {
+    JVM.get()
+        .expect("JVM not initialized")
+        .attach_current_thread()
+        .expect("Failed to attach thread")
+}
+
+mod jni_helpers {
+    use super::*;
+
+    pub fn create_jni_hashmap<'local>(
+        env: &mut JNIEnv<'local>,
+        configs: &HashMap<String, String>,
+    ) -> Result<JObject<'local>, ObjectStoreError> {
+        let map_class = 
env.find_class("java/util/HashMap").map_err(jni_error)?;
+        let jmap = env.new_object(map_class, "()V", &[]).map_err(jni_error)?;
+
+        for (k, v) in configs {
+            let jkey = env.new_string(k).map_err(jni_error)?;
+            let jval = env.new_string(v).map_err(jni_error)?;
+
+            env.call_method(
+                &jmap,
+                "put",
+                "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;",
+                &[JValue::Object(&jkey), JValue::Object(&jval)],
+            )
+            .map_err(jni_error)?;
+        }
+
+        Ok(jmap)
+    }
+
+    pub fn jni_error(e: jni::errors::Error) -> ObjectStoreError {

Review Comment:
   `jvm_bridge/mod.rs` has a `jni_map_error` macro. It also has the very useful 
`jni_call` and `jni_static_call`. Perhaps we can use those and make the code 
consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to