lidavidm commented on code in PR #12672: URL: https://github.com/apache/arrow/pull/12672#discussion_r852096531
########## python/pyarrow/_engine.pyx: ########## @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +from pyarrow import Buffer +from pyarrow.lib cimport * +from pyarrow.includes.libarrow cimport * + + +def run_query(plan): + """ + Executes a substrait plan and returns a RecordBatchReader. + + Parameters + ---------- + plan : bytes or Buffer + Substrait Plan can be fed as a encoded string in utf-8 + as a JSON string or as an Arrow Buffer. + """ + + cdef: + CResult[shared_ptr[CRecordBatchReader]] c_res_reader + shared_ptr[CRecordBatchReader] c_reader + RecordBatchReader reader + c_string c_str_plan + shared_ptr[CBuffer] c_buf_plan + + if isinstance(plan, bytes): + c_str_plan = plan + c_res_reader = GetRecordBatchReader(c_str_plan) + elif isinstance(plan, Buffer): + c_buf_plan = pyarrow_unwrap_buffer(plan) + c_res_reader = GetRecordBatchReader(c_buf_plan) + else: + raise ValueError("Expected bytes or pyarrow.Buffer") + + c_reader = GetResultValue(c_res_reader) + + reader = RecordBatchReader.__new__(RecordBatchReader) + reader.reader = c_reader + return reader + + +def get_buffer_from_json(plan): + """ + Returns Buffer object by converting substrait plan in + JSON. + + Parameter + --------- + plan: byte + Substrait plan as a bytes. + """ Review Comment: What does this actually do? It converts a serialized plan? Why does it return a buffer of JSON? ########## cpp/src/arrow/engine/substrait/util.h: ########## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> +#include "arrow/compute/type_fwd.h" +#include "arrow/engine/api.h" +#include "arrow/util/iterator.h" +#include "arrow/util/optional.h" + +namespace arrow { + +namespace engine { + +/// \brief Retrieve a RecordBatchReader from a Substrait plan in JSON. +ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::string& substrait_json); + +/// \brief Retrieve a RecordBatchReader from a Substrait plan in Buffer. Review Comment: Don't conflate the container (Buffer) with the representation (serialized Substrait Protobuf Plan message, presumably?) ########## python/pyarrow/engine.py: ########## @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyarrow._engine import ( # noqa + run_query, + get_buffer_from_json, Review Comment: If we actually need this, and it's purely meant as a test helper, 1) prefix the name with an underscore and 2) don't import it here (the test can import it from pyarrow._engine if necessary) ########## python/pyarrow/_engine.pyx: ########## @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +from pyarrow import Buffer +from pyarrow.lib cimport * +from pyarrow.includes.libarrow cimport * + + +def run_query(plan): + """ + Executes a substrait plan and returns a RecordBatchReader. + + Parameters + ---------- + plan : bytes or Buffer + Substrait Plan can be fed as a encoded string in utf-8 + as a JSON string or as an Arrow Buffer. Review Comment: Again, don't conflate the container and the representation. What does the buffer contain? Just from looking at signatures, I would assume it should contain a serialized plan? But from the helper's implementation, it actually contains JSON? ########## cpp/src/arrow/engine/substrait/util.cc: ########## @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/engine/substrait/util.h" +#include "arrow/util/async_generator.h" + +namespace arrow { + +namespace engine { + +/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator +class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public compute::SinkNodeConsumer { + public: + explicit SubstraitSinkConsumer( + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator, + arrow::util::BackpressureOptions backpressure = {}) + : producer_(MakeProducer(generator, std::move(backpressure))) {} + + Status Consume(compute::ExecBatch batch) override { + // Consume a batch of data + bool did_push = producer_.Push(batch); + if (!did_push) return Status::ExecutionError("Producer closed already"); + return Status::OK(); + } + + Status Init(const std::shared_ptr<Schema>& schema) override { return Status::OK(); } + + static arrow::PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer + MakeProducer(AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* out_gen, + arrow::util::BackpressureOptions backpressure); + + Future<> Finish() override { + producer_.Push(IterationEnd<arrow::util::optional<compute::ExecBatch>>()); + if (producer_.Close()) { + return Future<>::MakeFinished(); + } + return Future<>::MakeFinished( + Status::ExecutionError("Error occurred in closing the batch producer")); + } + + private: + PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer producer_; +}; + +/// \brief An executor to run a Substrait Query +/// This interface is provided as a utility when creating language +/// bindings for consuming a Substrait plan. +class ARROW_ENGINE_EXPORT SubstraitExecutor { + public: + explicit SubstraitExecutor( + std::shared_ptr<Buffer> substrait_buffer, + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator, + std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context) + : substrait_buffer_(substrait_buffer), + generator_(generator), + plan_(std::move(plan)), + exec_context_(exec_context) {} + + Result<std::shared_ptr<RecordBatchReader>> Execute() { + RETURN_NOT_OK(SubstraitExecutor::Init()); + for (const compute::Declaration& decl : declarations_) { + RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status()); + } + ARROW_RETURN_NOT_OK(plan_->Validate()); + ARROW_RETURN_NOT_OK(plan_->StartProducing()); + // schema of the output can be obtained by the output_schema + // of the input to the sink node. + auto schema = plan_->sinks()[0]->inputs()[0]->output_schema(); + std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader( + schema, std::move(*generator_), exec_context_.memory_pool()); + return sink_reader; + } + + Status Close() { return plan_->finished().status(); } + + Status Init() { + if (substrait_buffer_ == NULLPTR) { + return Status::Invalid("Buffer containing Substrait plan is null."); + } + std::function<std::shared_ptr<compute::SinkNodeConsumer>()> consumer_factory = [&] { + return std::make_shared<SubstraitSinkConsumer>(generator_); + }; + ARROW_ASSIGN_OR_RAISE(declarations_, + engine::DeserializePlan(*substrait_buffer_, consumer_factory)); + return Status::OK(); + } + + private: + std::shared_ptr<Buffer> substrait_buffer_; + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator_; + std::vector<compute::Declaration> declarations_; + std::shared_ptr<compute::ExecPlan> plan_; + compute::ExecContext exec_context_; +}; + +arrow::PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer +SubstraitSinkConsumer::MakeProducer( + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* out_gen, + arrow::util::BackpressureOptions backpressure) { + arrow::PushGenerator<arrow::util::optional<compute::ExecBatch>> push_gen( + std::move(backpressure)); + auto out = push_gen.producer(); + *out_gen = std::move(push_gen); + return out; +} + +Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::string& substrait_json) { + ARROW_ASSIGN_OR_RAISE(auto substrait_buffer, + GetSubstraitBufferFromJSON(substrait_json)); + return GetRecordBatchReader(substrait_buffer); +} + +Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::shared_ptr<Buffer> substrait_buffer) { + arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen; + ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make()); + compute::ExecContext exec_context(arrow::default_memory_pool(), + ::arrow::internal::GetCpuThreadPool()); + arrow::engine::SubstraitExecutor executor(substrait_buffer, &sink_gen, plan, Review Comment: move the buffer and the plan ########## cpp/src/arrow/engine/substrait/util.cc: ########## @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/engine/substrait/util.h" +#include "arrow/util/async_generator.h" + +namespace arrow { + +namespace engine { + +/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator +class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public compute::SinkNodeConsumer { + public: + explicit SubstraitSinkConsumer( + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator, + arrow::util::BackpressureOptions backpressure = {}) + : producer_(MakeProducer(generator, std::move(backpressure))) {} + + Status Consume(compute::ExecBatch batch) override { + // Consume a batch of data + bool did_push = producer_.Push(batch); + if (!did_push) return Status::ExecutionError("Producer closed already"); + return Status::OK(); + } + + Status Init(const std::shared_ptr<Schema>& schema) override { return Status::OK(); } + + static arrow::PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer + MakeProducer(AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* out_gen, + arrow::util::BackpressureOptions backpressure); + + Future<> Finish() override { + producer_.Push(IterationEnd<arrow::util::optional<compute::ExecBatch>>()); + if (producer_.Close()) { + return Future<>::MakeFinished(); + } + return Future<>::MakeFinished( + Status::ExecutionError("Error occurred in closing the batch producer")); + } + + private: + PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer producer_; +}; + +/// \brief An executor to run a Substrait Query +/// This interface is provided as a utility when creating language +/// bindings for consuming a Substrait plan. +class ARROW_ENGINE_EXPORT SubstraitExecutor { + public: + explicit SubstraitExecutor( + std::shared_ptr<Buffer> substrait_buffer, + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator, + std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context) + : substrait_buffer_(substrait_buffer), + generator_(generator), + plan_(std::move(plan)), + exec_context_(exec_context) {} + + Result<std::shared_ptr<RecordBatchReader>> Execute() { + RETURN_NOT_OK(SubstraitExecutor::Init()); + for (const compute::Declaration& decl : declarations_) { + RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status()); + } + ARROW_RETURN_NOT_OK(plan_->Validate()); + ARROW_RETURN_NOT_OK(plan_->StartProducing()); Review Comment: stick with one of ARROW_RETURN_NOT_OK and RETURN_NOT_OK ########## cpp/src/arrow/engine/substrait/util.h: ########## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> +#include "arrow/compute/type_fwd.h" +#include "arrow/engine/api.h" Review Comment: Can you clean up includes? Some of these are unnecessary ########## python/pyarrow/tests/test_substrait.py: ########## @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import pathlib +from pyarrow.lib import tobytes +from pyarrow.lib import ArrowInvalid +import pyarrow.parquet as pq +import pytest + +from pyarrow.engine import ( + run_query, + get_buffer_from_json, +) +# Marks all of the tests in this module +# Ignore these with pytest ... -m 'not engine' +pytestmark = pytest.mark.engine + + +_substrait_query = """ + { + "relations": [ + {"rel": { + "read": { + "base_schema": { + "struct": { + "types": [ + {"binary": {}} + ] + }, + "names": [ + "foo" + ] + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER", + "format": "FILE_FORMAT_PARQUET" + } + ] + } + } + }} + ] + } + """ + + +def resource_root(): + """Get the path to the test resources directory.""" + if not os.environ.get("PARQUET_TEST_DATA"): + raise RuntimeError("Test resources not found; set " + "PARQUET_TEST_DATA to " + "<repo root>/cpp/submodules/parquet-testing/data") + return pathlib.Path(os.environ["PARQUET_TEST_DATA"]) + + +def test_run_query(): + filename = str(resource_root() / "binary.parquet") + + query = _substrait_query Review Comment: Why the redundant assignment? ########## cpp/src/arrow/engine/substrait/util.cc: ########## @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/engine/substrait/util.h" +#include "arrow/util/async_generator.h" + +namespace arrow { + +namespace engine { + +/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator +class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public compute::SinkNodeConsumer { + public: + explicit SubstraitSinkConsumer( + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator, + arrow::util::BackpressureOptions backpressure = {}) + : producer_(MakeProducer(generator, std::move(backpressure))) {} + + Status Consume(compute::ExecBatch batch) override { + // Consume a batch of data + bool did_push = producer_.Push(batch); + if (!did_push) return Status::ExecutionError("Producer closed already"); + return Status::OK(); + } + + Status Init(const std::shared_ptr<Schema>& schema) override { return Status::OK(); } + + static arrow::PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer + MakeProducer(AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* out_gen, + arrow::util::BackpressureOptions backpressure); + + Future<> Finish() override { + producer_.Push(IterationEnd<arrow::util::optional<compute::ExecBatch>>()); + if (producer_.Close()) { + return Future<>::MakeFinished(); + } + return Future<>::MakeFinished( + Status::ExecutionError("Error occurred in closing the batch producer")); + } + + private: + PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer producer_; +}; + +/// \brief An executor to run a Substrait Query +/// This interface is provided as a utility when creating language +/// bindings for consuming a Substrait plan. +class ARROW_ENGINE_EXPORT SubstraitExecutor { + public: + explicit SubstraitExecutor( + std::shared_ptr<Buffer> substrait_buffer, + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator, + std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context) + : substrait_buffer_(substrait_buffer), Review Comment: move the buffer? ########## cpp/src/arrow/engine/substrait/util.cc: ########## @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/engine/substrait/util.h" +#include "arrow/util/async_generator.h" + +namespace arrow { + +namespace engine { + +/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator +class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public compute::SinkNodeConsumer { + public: + explicit SubstraitSinkConsumer( + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator, + arrow::util::BackpressureOptions backpressure = {}) + : producer_(MakeProducer(generator, std::move(backpressure))) {} + + Status Consume(compute::ExecBatch batch) override { + // Consume a batch of data + bool did_push = producer_.Push(batch); + if (!did_push) return Status::ExecutionError("Producer closed already"); + return Status::OK(); + } + + Status Init(const std::shared_ptr<Schema>& schema) override { return Status::OK(); } + + static arrow::PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer + MakeProducer(AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* out_gen, + arrow::util::BackpressureOptions backpressure); + + Future<> Finish() override { + producer_.Push(IterationEnd<arrow::util::optional<compute::ExecBatch>>()); + if (producer_.Close()) { + return Future<>::MakeFinished(); + } + return Future<>::MakeFinished( + Status::ExecutionError("Error occurred in closing the batch producer")); + } + + private: + PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer producer_; +}; + +/// \brief An executor to run a Substrait Query +/// This interface is provided as a utility when creating language +/// bindings for consuming a Substrait plan. +class ARROW_ENGINE_EXPORT SubstraitExecutor { + public: + explicit SubstraitExecutor( + std::shared_ptr<Buffer> substrait_buffer, + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator, + std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context) + : substrait_buffer_(substrait_buffer), + generator_(generator), + plan_(std::move(plan)), + exec_context_(exec_context) {} + + Result<std::shared_ptr<RecordBatchReader>> Execute() { + RETURN_NOT_OK(SubstraitExecutor::Init()); Review Comment: Why are we calling Init twice? ########## cpp/src/arrow/engine/substrait/util.h: ########## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> +#include "arrow/compute/type_fwd.h" +#include "arrow/engine/api.h" +#include "arrow/util/iterator.h" +#include "arrow/util/optional.h" + +namespace arrow { + +namespace engine { + +/// \brief Retrieve a RecordBatchReader from a Substrait plan in JSON. +ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::string& substrait_json); + +/// \brief Retrieve a RecordBatchReader from a Substrait plan in Buffer. +ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( Review Comment: Personally I'd prefer something like `ExecuteJsonPlan` and `ExecuteSerializedPlan` over overloads ########## cpp/src/arrow/engine/substrait/util.h: ########## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> +#include "arrow/compute/type_fwd.h" +#include "arrow/engine/api.h" +#include "arrow/util/iterator.h" +#include "arrow/util/optional.h" + +namespace arrow { + +namespace engine { + +/// \brief Retrieve a RecordBatchReader from a Substrait plan in JSON. +ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::string& substrait_json); + +/// \brief Retrieve a RecordBatchReader from a Substrait plan in Buffer. +ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::shared_ptr<Buffer> substrait_buffer); + +/// \brief Get Substrait Buffer from a Substrait JSON plan. +/// This is a helper method for Python tests. +ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> GetSubstraitBufferFromJSON( + std::string& substrait_json); Review Comment: Why do we need this? Don't we have overloads for JSON and binary plans? ########## python/pyarrow/tests/test_substrait.py: ########## @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import pathlib +from pyarrow.lib import tobytes +from pyarrow.lib import ArrowInvalid +import pyarrow.parquet as pq +import pytest + +from pyarrow.engine import ( + run_query, + get_buffer_from_json, +) +# Marks all of the tests in this module +# Ignore these with pytest ... -m 'not engine' +pytestmark = pytest.mark.engine Review Comment: Since the module is optional, you can't assume you can import from pyarrow.engine above; you have to do something like ```python try: import pyarrow.engine as engine except ImportError: engine = None ########## cpp/src/arrow/engine/substrait/util.cc: ########## @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/engine/substrait/util.h" +#include "arrow/util/async_generator.h" + +namespace arrow { + +namespace engine { + +/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator +class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public compute::SinkNodeConsumer { + public: + explicit SubstraitSinkConsumer( + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator, + arrow::util::BackpressureOptions backpressure = {}) + : producer_(MakeProducer(generator, std::move(backpressure))) {} + + Status Consume(compute::ExecBatch batch) override { + // Consume a batch of data + bool did_push = producer_.Push(batch); + if (!did_push) return Status::ExecutionError("Producer closed already"); + return Status::OK(); + } + + Status Init(const std::shared_ptr<Schema>& schema) override { return Status::OK(); } + + static arrow::PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer + MakeProducer(AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* out_gen, + arrow::util::BackpressureOptions backpressure); + + Future<> Finish() override { + producer_.Push(IterationEnd<arrow::util::optional<compute::ExecBatch>>()); + if (producer_.Close()) { + return Future<>::MakeFinished(); + } + return Future<>::MakeFinished( + Status::ExecutionError("Error occurred in closing the batch producer")); + } + + private: + PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer producer_; +}; + +/// \brief An executor to run a Substrait Query +/// This interface is provided as a utility when creating language +/// bindings for consuming a Substrait plan. +class ARROW_ENGINE_EXPORT SubstraitExecutor { + public: + explicit SubstraitExecutor( + std::shared_ptr<Buffer> substrait_buffer, + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator, + std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context) + : substrait_buffer_(substrait_buffer), + generator_(generator), + plan_(std::move(plan)), + exec_context_(exec_context) {} + + Result<std::shared_ptr<RecordBatchReader>> Execute() { + RETURN_NOT_OK(SubstraitExecutor::Init()); + for (const compute::Declaration& decl : declarations_) { + RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status()); + } + ARROW_RETURN_NOT_OK(plan_->Validate()); + ARROW_RETURN_NOT_OK(plan_->StartProducing()); + // schema of the output can be obtained by the output_schema + // of the input to the sink node. + auto schema = plan_->sinks()[0]->inputs()[0]->output_schema(); + std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader( + schema, std::move(*generator_), exec_context_.memory_pool()); + return sink_reader; + } + + Status Close() { return plan_->finished().status(); } + + Status Init() { + if (substrait_buffer_ == NULLPTR) { + return Status::Invalid("Buffer containing Substrait plan is null."); + } + std::function<std::shared_ptr<compute::SinkNodeConsumer>()> consumer_factory = [&] { + return std::make_shared<SubstraitSinkConsumer>(generator_); + }; + ARROW_ASSIGN_OR_RAISE(declarations_, + engine::DeserializePlan(*substrait_buffer_, consumer_factory)); + return Status::OK(); + } + + private: + std::shared_ptr<Buffer> substrait_buffer_; + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* generator_; + std::vector<compute::Declaration> declarations_; + std::shared_ptr<compute::ExecPlan> plan_; + compute::ExecContext exec_context_; +}; + +arrow::PushGenerator<arrow::util::optional<compute::ExecBatch>>::Producer +SubstraitSinkConsumer::MakeProducer( + AsyncGenerator<arrow::util::optional<compute::ExecBatch>>* out_gen, + arrow::util::BackpressureOptions backpressure) { + arrow::PushGenerator<arrow::util::optional<compute::ExecBatch>> push_gen( + std::move(backpressure)); + auto out = push_gen.producer(); + *out_gen = std::move(push_gen); + return out; +} + +Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::string& substrait_json) { + ARROW_ASSIGN_OR_RAISE(auto substrait_buffer, + GetSubstraitBufferFromJSON(substrait_json)); + return GetRecordBatchReader(substrait_buffer); +} + +Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::shared_ptr<Buffer> substrait_buffer) { + arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen; + ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make()); + compute::ExecContext exec_context(arrow::default_memory_pool(), + ::arrow::internal::GetCpuThreadPool()); + arrow::engine::SubstraitExecutor executor(substrait_buffer, &sink_gen, plan, + exec_context); + RETURN_NOT_OK(executor.Init()); + ARROW_ASSIGN_OR_RAISE(auto sink_reader, executor.Execute()); + RETURN_NOT_OK(executor.Close()); + return sink_reader; +} + +Result<std::shared_ptr<Buffer>> GetSubstraitBufferFromJSON(std::string& substrait_json) { Review Comment: this will have to be `const std::string&` ########## cpp/src/arrow/engine/substrait/serde_test.cc: ########## @@ -750,5 +751,66 @@ TEST(Substrait, ExtensionSetFromPlanMissingFunc) { &ext_set)); } +Result<std::string> GetSubstraitJSON() { + ARROW_ASSIGN_OR_RAISE(std::string dir_string, + arrow::internal::GetEnvVar("PARQUET_TEST_DATA")); + auto file_name = + arrow::internal::PlatformFilename::FromString(dir_string)->Join("binary.parquet"); + auto file_path = file_name->ToString(); + std::string substrait_json = R"({ + "relations": [ + {"rel": { + "read": { + "base_schema": { + "struct": { + "types": [ + {"binary": {}} + ] + }, + "names": [ + "foo" + ] + }, + "local_files": { + "items": [ + { + "uri_file": "FILENAME_PLACEHOLDER", + "format": "FILE_FORMAT_PARQUET" + } + ] + } + } + }} + ] + })"; +#ifdef _WIN32 + // Path is supposed to start with "X:/..." + file_path = "file:///" + file_path; +#else + // Path is supposed to start with "/..." + file_path = "file://" + file_path; +#endif + std::cout << "File Path : >>>>" << file_path << std::endl; Review Comment: Remove this print? ########## cpp/src/arrow/engine/substrait/serde_test.cc: ########## @@ -724,5 +728,103 @@ TEST(Substrait, ExtensionSetFromPlan) { EXPECT_EQ(decoded_add_func.name, "add"); } +TEST(Substrait, GetRecordBatchIterator) { + const auto parquet_root = std::getenv("PARQUET_TEST_DATA"); + std::string dir_string(parquet_root); + std::stringstream ss; + ss << dir_string << "/binary.parquet"; + auto file_path = ss.str(); + + std::string substrait_json = R"({ + "relations": [ + {"rel": { + "read": { + "base_schema": { + "struct": { + "types": [ + {"binary": {}} + ] + }, + "names": [ + "foo" + ] + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER", + "format": "FILE_FORMAT_PARQUET" + } + ] + } + } + }} + ] + })"; + + std::string filename_placeholder = "FILENAME_PLACEHOLDER"; + substrait_json.replace(substrait_json.find(filename_placeholder), + filename_placeholder.size(), file_path); + auto in_schema = schema({field("foo", binary())}); + AsyncGenerator<util::optional<cp::ExecBatch>> sink_gen; + cp::ExecContext exec_context(default_memory_pool(), + arrow::internal::GetCpuThreadPool()); + ASSERT_OK_AND_ASSIGN(auto plan, cp::ExecPlan::Make()); + engine::SubstraitExecutor executor(substrait_json, &sink_gen, plan, in_schema, + exec_context); + auto status = executor.MakePlan(); + ASSERT_OK(status); + ASSERT_OK_AND_ASSIGN(auto reader, executor.Execute()); + auto finish = executor.Finalize(); + ASSERT_OK(finish); + ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get())); + EXPECT_GT(table->num_rows(), 0); +} + +TEST(Substrait, GetRecordBatchIteratorUtil) { + const auto parquet_root = std::getenv("PARQUET_TEST_DATA"); + std::string dir_string(parquet_root); + std::stringstream ss; + ss << dir_string << "/binary.parquet"; + auto file_path = ss.str(); + + std::string substrait_json = R"({ + "relations": [ + {"rel": { + "read": { + "base_schema": { + "struct": { + "types": [ + {"binary": {}} + ] + }, + "names": [ + "foo" + ] + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER", + "format": "FILE_FORMAT_PARQUET" + } + ] + } + } + }} + ] + })"; + + std::string filename_placeholder = "FILENAME_PLACEHOLDER"; + substrait_json.replace(substrait_json.find(filename_placeholder), + filename_placeholder.size(), file_path); + auto in_schema = schema({field("foo", binary())}); + + ASSERT_OK_AND_ASSIGN(auto reader, engine::SubstraitExecutor::GetRecordBatchReader( + substrait_json, in_schema)); + ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get())); + EXPECT_GT(table->num_rows(), 0); Review Comment: Ping? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
