davisusanibar commented on code in PR #34227:
URL: https://github.com/apache/arrow/pull/34227#discussion_r1154672897


##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -578,3 +581,100 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, 
scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlanLocalFiles
+ * Signature: (Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles
 (
+    JNIEnv* env, jobject, jstring substrait_plan, jlong 
c_arrow_array_stream_address_out) {
+  JNI_METHOD_START
+  auto* arrow_stream = 
reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, 
substrait_plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jstring plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {
+      // get table from memory address provided
+      long memory_address = map_table_to_memory_address[name];
+      auto* arrow_stream_in = 
reinterpret_cast<ArrowArrayStream*>(memory_address);
+      std::shared_ptr<arrow::RecordBatchReader> readerIn =
+        JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+      std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+        arrow::dataset::ScannerBuilder::FromRecordBatchReader(readerIn);
+      JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
+      auto scanner = JniGetOrThrow(scanner_builder->Finish());
+      output_table = JniGetOrThrow(scanner->ToTable());
+    }
+    std::shared_ptr<arrow::compute::ExecNodeOptions> options =
+      
std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::compute::Declaration("table_source", {}, options, 
"java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // execute plan
+  std::shared_ptr<arrow::Buffer> buffer = 
JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> readerOut = 
JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, NULLPTR, NULLPTR, 
conversion_options));
+  auto* arrow_stream_out = 
reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, 
arrow_stream_out));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    callMeRN
+ * Signature: (Ljava/lang/Object;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_Object_2_3Ljava_lang_String_2J
 (
+    JNIEnv* env, jobject, jobject plan, jobjectArray 
table_to_memory_address_input, jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::map<std::string, long> map_table_to_memory_address = ToMap(env, 
table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider = 
[&map_table_to_memory_address](const std::vector<std::string>& names, const 
arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table;
+    for (const auto& name : names) {

Review Comment:
   With output_table = map_table_to_reader[name], we tried to be more concise.
   
   Is it okay to continue as it is now, it is a lambda, so you will see more 
details about what is happening.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to