davisusanibar commented on code in PR #34227:
URL: https://github.com/apache/arrow/pull/34227#discussion_r1164090184
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -261,6 +264,50 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
default_memory_pool_id = -1L;
}
+/// Unpack the named tables passed through JNI.
+///
+/// Named tables are encoded as a string array, where every two elements
+/// encode (1) the table name and (2) the address of an ArrowArrayStream
+/// containing the table data. This function will eagerly read all
+/// tables into Tables.
+std::unordered_map<std::string, std::shared_ptr<arrow::Table>>
LoadNamedTables(JNIEnv* env, jobjectArray& str_array) {
+ std::unordered_map<std::string, std::shared_ptr<arrow::Table>>
map_table_to_record_batch_reader;
+ int length = env->GetArrayLength(str_array);
+ if (length % 2 != 0) {
+ JniThrow("Can not map odd number of array elements to key/value pairs");
+ }
+ std::shared_ptr<arrow::Table> output_table;
+ for (int pos = 0; pos < length; pos++) {
+ auto j_string_key =
reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+ pos++;
+ auto j_string_value =
reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+ long memory_address = 0;
+ try {
+ memory_address = std::stol(JStringToCString(env, j_string_value));
+ } catch (...) {
+ JniThrow("Failed to parse memory address from string value");
+ }
+ auto* arrow_stream_in =
reinterpret_cast<ArrowArrayStream*>(memory_address);
+ std::shared_ptr<arrow::RecordBatchReader> readerIn =
JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+ output_table = JniGetOrThrow(readerIn->ToTable());
+ map_table_to_record_batch_reader[JStringToCString(env, j_string_key)] =
output_table;
+ }
+ return map_table_to_record_batch_reader;
+}
+
+/// Find the arrow Table associated with a given table name
+std::shared_ptr<arrow::Table> GetTableByName(const std::vector<std::string>&
names,
+ std::unordered_map<std::string, std::shared_ptr<arrow::Table>>
map_table_to_reader) {
+ std::shared_ptr<arrow::Table> output_table;
+ for (const auto& name : names) {
+ output_table = map_table_to_reader[name];
+ if (output_table == nullptr) {
+ JniThrow("Table name " + name + " is needed to execute the Substrait
plan");
+ }
+ }
+ return output_table;
+}
Review Comment:
Thank you for the detail. Let me add more detail how it is doing internally,
let start the case when we need Join for two Tables:
```cpp
arrow::Status execute_substrait_low_level() {
auto left_table =
RunDatasetParquetScan("file:///Users/dsusanibar/voltron/fork/consumer-testing/substrait_consumer/data/tpch_parquet/nation.parquet");
auto right_table =
RunDatasetParquetScan2("file:///Users/dsusanibar/voltron/fork/consumer-testing/substrait_consumer/data/tpch_parquet/customer.parquet");
std::map<std::string, std::shared_ptr<Table>> mapTableToMemoryAddress;
mapTableToMemoryAddress["NATION"] = left_table;
mapTableToMemoryAddress["CUSTOMER"] = right_table;
// this table_provider was executed two times (alligned to the number of
Tables into the Plan)
arrow::engine::NamedTableProvider table_provider =
[&mapTableToMemoryAddress](const std::vector<std::string>& names) {
std::shared_ptr<arrow::Table> output_table;
std::cout << "Tables names size : " << names.size() << std::endl; //
alwais is "1" and it is handled by Acero C++ behavior
for (const auto& name : names) { // this is a for but always
iterarte once time
output_table = mapTableToMemoryAddress[name];
}
// another way without for will be: output_table =
mapTableToMemoryAddress[names.front()];
std::shared_ptr<compute::ExecNodeOptions> options =
std::make_shared<compute::TableSourceNodeOptions>(std::move(output_table));
return compute::Declaration("table_source", {}, options,
"mock_source");
};
arrow::engine::ConversionOptions conversion_options;
conversion_options.named_table_provider = std::move(table_provider);
ARROW_ASSIGN_OR_RAISE(std::string substrait_json, GetSubstraitJSONV21());
ARROW_ASSIGN_OR_RAISE(auto buffer,
arrow::engine::SerializeJsonPlan(substrait_json));
Result<std::shared_ptr<RecordBatchReader>> reader =
arrow::engine::ExecuteSerializedPlan(*buffer, NULLPTR, NULLPTR,
conversion_options);
auto table=
arrow::Table::FromRecordBatchReader(reader.ValueOrDie().get()).ValueOrDie();
std::cout << "Results : " << table->ToString() << std::endl;
}
```
Consider:
1. For 1 query that contais two tables:
- This is an incorrect behavior think that:
`arrow::engine::NamedTableProvider table_provider =
[&mapTableToMemoryAddress](const std::vector<std::string>& names)` is only
executed once time, and, all the tables (nation/customer) are iterated into
`for (const auto& name : names) {` two times.
- Correct behavior is: `arrow::engine::NamedTableProvider table_provider =
[&mapTableToMemoryAddress](const std::vector<std::string>& names)` is executed
any times of number of table are in Plan and `for (const auto& name : names) {`
is all the time only iterated over one table all the time. This is the main
reason because there is not a discarding of lookups.
2. The names array is exactly of length 1 it is handled/validates/passed by
Acero.
3. Do a single lookup: We use a for but in real it is only once time, maybe
we could change to `output_table = mapTableToMemoryAddress[names.front()];`
Please @lidavidm let me know if with this explanation current
implementation is clear or need some kind of additional changes.
I know there will be improvements, but as a Java JNI pass thru we are only
consuming current defined API behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]