davisusanibar commented on code in PR #34227:
URL: https://github.com/apache/arrow/pull/34227#discussion_r1162815904
##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -261,6 +264,50 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
default_memory_pool_id = -1L;
}
+/// Unpack the named tables passed through JNI.
+///
+/// Named tables are encoded as a string array, where every two elements
+/// encode (1) the table name and (2) the address of an ArrowArrayStream
+/// containing the table data. This function will eagerly read all
+/// tables into Tables.
+std::unordered_map<std::string, std::shared_ptr<arrow::Table>>
LoadNamedTables(JNIEnv* env, jobjectArray& str_array) {
+ std::unordered_map<std::string, std::shared_ptr<arrow::Table>>
map_table_to_record_batch_reader;
+ int length = env->GetArrayLength(str_array);
+ if (length % 2 != 0) {
+ JniThrow("Can not map odd number of array elements to key/value pairs");
+ }
+ std::shared_ptr<arrow::Table> output_table;
+ for (int pos = 0; pos < length; pos++) {
+ auto j_string_key =
reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+ pos++;
+ auto j_string_value =
reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+ long memory_address = 0;
+ try {
+ memory_address = std::stol(JStringToCString(env, j_string_value));
+ } catch (...) {
+ JniThrow("Failed to parse memory address from string value");
+ }
+ auto* arrow_stream_in =
reinterpret_cast<ArrowArrayStream*>(memory_address);
+ std::shared_ptr<arrow::RecordBatchReader> readerIn =
JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+ output_table = JniGetOrThrow(readerIn->ToTable());
+ map_table_to_record_batch_reader[JStringToCString(env, j_string_key)] =
output_table;
+ }
+ return map_table_to_record_batch_reader;
+}
+
+/// Find the arrow Table associated with a given table name
+std::shared_ptr<arrow::Table> GetTableByName(const std::vector<std::string>&
names,
+ std::unordered_map<std::string, std::shared_ptr<arrow::Table>>
map_table_to_reader) {
+ std::shared_ptr<arrow::Table> output_table;
+ for (const auto& name : names) {
+ output_table = map_table_to_reader[name];
+ if (output_table == nullptr) {
+ JniThrow("Table name " + name + " is needed to execute the Substrait
plan");
+ }
+ }
+ return output_table;
+}
Review Comment:
Oh no, tis is the main core of the proposal to map table name to table (on
the same way that c++ test their join code). If you mention this isn't correct,
then all proposal will be deleted.
Please I need you support to help me what part isn't correct? I'm trying to
recover last messages and didn't see all the comments (I don't know why, maybe,
I delete some on them).
- Unused input parameter?: will thrown a exception
- Plan references the same table twice?: recover the reference by name again
- Is there **some changes pending** o you recommend me to implement?
- Related to the test code, what I did to validate if Jar is working is
install locally Dataset and run query agains that, like this for example:
```
import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class ClientSubstrait {
public static void main(String[] args) {
twoTables();
}
static Plan getPlanOnetable() throws SqlParseException {
String sql = "SELECT * from nation";
String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL,
N_NAME CHAR(25), " +
"N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
return plan;
}
static Plan getPlanTwoTables() throws SqlParseException {
String sql = "SELECT n.n_nationkey, n.n_name, c.c_name, c.c_phone,
c.c_address FROM nation n JOIN customer c " +
"ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 21";
String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL,
N_NAME CHAR(25), " +
"N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL,
C_NAME VARCHAR(25), " +
"C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT NULL, C_PHONE
CHAR(15), C_ACCTBAL DECIMAL, " +
"C_MKTSEGMENT CHAR(10), C_COMMENT VARCHAR(117) )";
SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation,
customer));
return plan;
}
static void oneTable() {
String uri =
"s3://voltrondata-labs-datasets/nyc-taxi-tiny/year=2022/month=2/part-0.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new
FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
) {
// map table to reader
Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
mapTableToArrowReader.put("NATION", reader);
// get binary plan
Plan plan = getPlanOnetable();
ByteBuffer substraitPlan =
ByteBuffer.allocateDirect(plan.toByteArray().length);
substraitPlan.put(plan.toByteArray());
// run query
try (ArrowReader arrowReader = new
AceroSubstraitConsumer(allocator).runQuery(
substraitPlan,
mapTableToArrowReader
)) {
while (arrowReader.loadNextBatch()) {
System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
static void twoTables(){
String uri1 =
"file:///Users/dsusanibar/voltron/fork/consumer-testing/tests/data/tpch_parquet/nation.parquet";
String uri2 =
"file:///Users/dsusanibar/voltron/fork/consumer-testing/tests/data/tpch_parquet/customer.parquet";
ScanOptions optionsNations = new ScanOptions(/*batchSize*/ 32768);
ScanOptions optionsCustomer = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new
FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri1);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(optionsNations);
ArrowReader readerNation = scanner.scanBatches();
DatasetFactory datasetFactoryCustomer = new
FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET,
uri2);
Dataset datasetCustomer = datasetFactoryCustomer.finish();
Scanner scannerCustomer =
datasetCustomer.newScan(optionsCustomer);
ArrowReader readerCustomer = scannerCustomer.scanBatches()
) {
// map table to reader
Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
mapTableToArrowReader.put("NATION", readerNation);
mapTableToArrowReader.put("CUSTOMER", readerCustomer);
// get binary plan
Plan plan = getPlanTwoTables();
ByteBuffer substraitPlan =
ByteBuffer.allocateDirect(plan.toByteArray().length);
substraitPlan.put(plan.toByteArray());
// run query
try (ArrowReader arrowReader = new
AceroSubstraitConsumer(allocator).runQuery(
substraitPlan,
mapTableToArrowReader
)) {
while (arrowReader.loadNextBatch()) {
System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]