lidavidm commented on code in PR #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r859707514


##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pathlib
+import pyarrow as pa
+from pyarrow.lib import tobytes
+from pyarrow.lib import ArrowInvalid
+import pyarrow.parquet as pq
+import pytest
+
+try:
+    import pyarrow.substrait as substrait
+except ImportError:
+    substrait = None
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not engine'
+pytestmark = pytest.mark.substrait
+
+
+_substrait_query = """
+    {
+        "relations": [
+        {"rel": {
+            "read": {
+            "base_schema": {
+                "struct": {
+                "types": [
+                            {"binary": {}}
+                        ]
+                },
+                "names": [
+                        "foo"
+                        ]
+            },
+            "local_files": {
+                "items": [
+                {
+                    "uri_file": "file://FILENAME_PLACEHOLDER",
+                    "format": "FILE_FORMAT_PARQUET"
+                }
+                ]
+            }
+            }
+        }}
+        ]
+    }
+    """
+
+
+def resource_root():
+    """Get the path to the test resources directory."""
+    if not os.environ.get("PARQUET_TEST_DATA"):
+        raise RuntimeError("Test resources not found; set "
+                           "PARQUET_TEST_DATA to "
+                           "<repo root>/cpp/submodules/parquet-testing/data")
+    return pathlib.Path(os.environ["PARQUET_TEST_DATA"])
+
+
+def test_run_query():
+    filename = str(resource_root() / "binary.parquet")
+
+    query = tobytes(_substrait_query.replace("FILENAME_PLACEHOLDER", filename))
+    reader = substrait.run_query(query)
+    res_tb = reader.read_all()
+
+    expected_tb = pq.read_table(filename)
+
+    assert expected_tb.num_rows == res_tb.num_rows
+
+
+def test_run_query_in_bytes():

Review Comment:
   ```suggestion
   def test_run_serialized_query():
   ```



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pathlib
+import pyarrow as pa
+from pyarrow.lib import tobytes
+from pyarrow.lib import ArrowInvalid
+import pyarrow.parquet as pq

Review Comment:
   I think this needs a try-except too since Parquet support is optional



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via 
PushGenerator
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public 
compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {
+    return Status::OK();
+  }
+
+  static arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer 
MakeProducer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen);
+
+  Future<> Finish() override {
+    producer_.Push(IterationEnd<util::optional<compute::ExecBatch>>());
+    if (producer_.Close()) {
+      return Future<>::MakeFinished();
+    }
+    return Future<>::MakeFinished(
+        Status::ExecutionError("Error occurred in closing the batch 
producer"));
+  }
+
+ private:
+  PushGenerator<util::optional<compute::ExecBatch>>::Producer producer_;
+};
+
+/// \brief An executor to run a Substrait Query
+/// This interface is provided as a utility when creating language
+/// bindings for consuming a Substrait plan.
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::shared_ptr<Buffer> substrait_buffer,
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator,
+      std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext 
exec_context)
+      : substrait_buffer_(std::move(substrait_buffer)),
+        generator_(generator),
+        plan_(std::move(plan)),
+        exec_context_(exec_context) {}
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute() {
+    for (const compute::Declaration& decl : declarations_) {
+      RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
+    }
+    RETURN_NOT_OK(plan_->Validate());
+    RETURN_NOT_OK(plan_->StartProducing());
+    // schema of the output can be obtained by the output_schema
+    // of the input to the sink node.
+    auto schema = plan_->sinks()[0]->inputs()[0]->output_schema();
+    std::shared_ptr<RecordBatchReader> sink_reader = 
compute::MakeGeneratorReader(
+        schema, std::move(*generator_), exec_context_.memory_pool());
+    return sink_reader;
+  }
+
+  Status Close() { return plan_->finished().status(); }
+
+  Status Init() {
+    if (substrait_buffer_ == NULLPTR) {
+      return Status::Invalid("Buffer containing Substrait plan is null.");
+    }
+    std::function<std::shared_ptr<compute::SinkNodeConsumer>()> 
consumer_factory = [&] {
+      return std::make_shared<SubstraitSinkConsumer>(generator_);
+    };
+    ARROW_ASSIGN_OR_RAISE(declarations_,
+                          engine::DeserializePlan(*substrait_buffer_, 
consumer_factory));
+    return Status::OK();
+  }
+
+ private:
+  std::shared_ptr<Buffer> substrait_buffer_;
+  AsyncGenerator<util::optional<compute::ExecBatch>>* generator_;
+  std::vector<compute::Declaration> declarations_;
+  std::shared_ptr<compute::ExecPlan> plan_;
+  compute::ExecContext exec_context_;
+};
+
+arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer
+SubstraitSinkConsumer::MakeProducer(
+    AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen) {
+  arrow::PushGenerator<util::optional<compute::ExecBatch>> push_gen;
+  auto out = push_gen.producer();
+  *out_gen = std::move(push_gen);
+  return out;
+}
+
+Result<std::shared_ptr<RecordBatchReader>> ExecuteJsonPlan(
+    const std::string& substrait_json) {
+  ARROW_ASSIGN_OR_RAISE(auto substrait_buffer, ParseJsonPlan(substrait_json));
+  return ExecuteSerializedPlan(substrait_buffer);

Review Comment:
   std::move?



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via 
PushGenerator
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public 
compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {
+    return Status::OK();
+  }
+
+  static arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer 
MakeProducer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen);
+
+  Future<> Finish() override {
+    producer_.Push(IterationEnd<util::optional<compute::ExecBatch>>());
+    if (producer_.Close()) {
+      return Future<>::MakeFinished();
+    }
+    return Future<>::MakeFinished(
+        Status::ExecutionError("Error occurred in closing the batch 
producer"));

Review Comment:
   ExecutionError is for Gandiva, maybe IOError?



##########
python/pyarrow/_substrait.pyx:
##########
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow import Buffer
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan):
+    """
+    Executes a substrait plan and returns a RecordBatchReader.
+
+    Parameters
+    ----------
+    plan : bytes or Buffer
+        Substrait plan can be fed as a serialized plan (Buffer) 
+        or a JSON plan. 
+    """
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        RecordBatchReader reader
+        c_string c_str_plan
+        shared_ptr[CBuffer] c_buf_plan
+
+    if isinstance(plan, bytes):
+        c_str_plan = plan
+        c_res_reader = ExecuteJsonPlan(c_str_plan)
+    elif isinstance(plan, Buffer):

Review Comment:
   note that IMO it's still weird to do different things based on the input 
type since semantically bytes and Buffer should be the same



##########
python/pyarrow/_substrait.pyx:
##########
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow import Buffer
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan):
+    """
+    Executes a substrait plan and returns a RecordBatchReader.
+
+    Parameters
+    ----------
+    plan : bytes or Buffer
+        Substrait plan can be fed as a serialized plan (Buffer) 
+        or a JSON plan. 
+    """
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        RecordBatchReader reader
+        c_string c_str_plan
+        shared_ptr[CBuffer] c_buf_plan
+
+    if isinstance(plan, bytes):
+        c_str_plan = plan
+        c_res_reader = ExecuteJsonPlan(c_str_plan)
+    elif isinstance(plan, Buffer):
+        c_buf_plan = pyarrow_unwrap_buffer(plan)
+        c_res_reader = ExecuteSerializedPlan(c_buf_plan)
+    else:
+        raise ValueError("Expected bytes or pyarrow.Buffer")
+
+    c_reader = GetResultValue(c_res_reader)
+
+    reader = RecordBatchReader.__new__(RecordBatchReader)
+    reader.reader = c_reader
+    return reader
+
+
+def _parse_json_plan(plan):
+    """
+    Parse a JSON plan into equivalent serialized Protobuf.
+
+    Parameters
+    ----------
+    plan: byte
+        Parse a Substrait plan in JSON to a serialized plan.
+
+    Returns
+    -------
+    Buffer
+        pyarrow.Buffer object is returned.
+    """
+
+    cdef:
+        CResult[shared_ptr[CBuffer]] c_res_buffer
+        c_string c_str_plan
+        shared_ptr[CBuffer] c_buf_plan
+
+    if isinstance(plan, bytes):
+        c_str_plan = plan
+        c_res_buffer = ParseJsonPlan(c_str_plan)
+        c_buf_plan = GetResultValue(c_res_buffer)
+    else:
+        raise ValueError("Expected plan in bytes.")

Review Comment:
   though in this case I believe Cython will do the type check for you if you 
write the natural Pythonic code



##########
python/pyarrow/_substrait.pyx:
##########
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow import Buffer
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan):
+    """
+    Executes a substrait plan and returns a RecordBatchReader.
+
+    Parameters
+    ----------
+    plan : bytes or Buffer
+        Substrait plan can be fed as a serialized plan (Buffer) 
+        or a JSON plan. 
+    """
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        RecordBatchReader reader
+        c_string c_str_plan
+        shared_ptr[CBuffer] c_buf_plan
+
+    if isinstance(plan, bytes):
+        c_str_plan = plan
+        c_res_reader = ExecuteJsonPlan(c_str_plan)
+    elif isinstance(plan, Buffer):
+        c_buf_plan = pyarrow_unwrap_buffer(plan)
+        c_res_reader = ExecuteSerializedPlan(c_buf_plan)
+    else:
+        raise ValueError("Expected bytes or pyarrow.Buffer")
+
+    c_reader = GetResultValue(c_res_reader)
+
+    reader = RecordBatchReader.__new__(RecordBatchReader)
+    reader.reader = c_reader
+    return reader
+
+
+def _parse_json_plan(plan):
+    """
+    Parse a JSON plan into equivalent serialized Protobuf.
+
+    Parameters
+    ----------
+    plan: byte

Review Comment:
   ```suggestion
       plan: bytes
   ```



##########
python/pyarrow/_substrait.pyx:
##########
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow import Buffer
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan):
+    """
+    Executes a substrait plan and returns a RecordBatchReader.
+
+    Parameters
+    ----------
+    plan : bytes or Buffer
+        Substrait plan can be fed as a serialized plan (Buffer) 
+        or a JSON plan. 
+    """
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        RecordBatchReader reader
+        c_string c_str_plan
+        shared_ptr[CBuffer] c_buf_plan
+
+    if isinstance(plan, bytes):
+        c_str_plan = plan
+        c_res_reader = ExecuteJsonPlan(c_str_plan)
+    elif isinstance(plan, Buffer):
+        c_buf_plan = pyarrow_unwrap_buffer(plan)
+        c_res_reader = ExecuteSerializedPlan(c_buf_plan)
+    else:
+        raise ValueError("Expected bytes or pyarrow.Buffer")
+
+    c_reader = GetResultValue(c_res_reader)
+
+    reader = RecordBatchReader.__new__(RecordBatchReader)
+    reader.reader = c_reader
+    return reader
+
+
+def _parse_json_plan(plan):
+    """
+    Parse a JSON plan into equivalent serialized Protobuf.
+
+    Parameters
+    ----------
+    plan: byte
+        Parse a Substrait plan in JSON to a serialized plan.
+
+    Returns
+    -------
+    Buffer
+        pyarrow.Buffer object is returned.

Review Comment:
   ```suggestion
           A buffer containing the serialized Protobuf plan.
   ```



##########
python/pyarrow/_substrait.pyx:
##########
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow import Buffer
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan):
+    """
+    Executes a substrait plan and returns a RecordBatchReader.
+
+    Parameters
+    ----------
+    plan : bytes or Buffer
+        Substrait plan can be fed as a serialized plan (Buffer) 
+        or a JSON plan. 
+    """
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        RecordBatchReader reader
+        c_string c_str_plan
+        shared_ptr[CBuffer] c_buf_plan
+
+    if isinstance(plan, bytes):
+        c_str_plan = plan
+        c_res_reader = ExecuteJsonPlan(c_str_plan)
+    elif isinstance(plan, Buffer):
+        c_buf_plan = pyarrow_unwrap_buffer(plan)
+        c_res_reader = ExecuteSerializedPlan(c_buf_plan)
+    else:
+        raise ValueError("Expected bytes or pyarrow.Buffer")
+
+    c_reader = GetResultValue(c_res_reader)
+
+    reader = RecordBatchReader.__new__(RecordBatchReader)
+    reader.reader = c_reader
+    return reader
+
+
+def _parse_json_plan(plan):
+    """
+    Parse a JSON plan into equivalent serialized Protobuf.
+
+    Parameters
+    ----------
+    plan: byte
+        Parse a Substrait plan in JSON to a serialized plan.
+
+    Returns
+    -------
+    Buffer
+        pyarrow.Buffer object is returned.
+    """
+
+    cdef:
+        CResult[shared_ptr[CBuffer]] c_res_buffer
+        c_string c_str_plan
+        shared_ptr[CBuffer] c_buf_plan
+
+    if isinstance(plan, bytes):
+        c_str_plan = plan
+        c_res_buffer = ParseJsonPlan(c_str_plan)
+        c_buf_plan = GetResultValue(c_res_buffer)
+    else:
+        raise ValueError("Expected plan in bytes.")

Review Comment:
   ```suggestion
           raise TypeError("plan must be bytes")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to