westonpace commented on code in PR #14485:
URL: https://github.com/apache/arrow/pull/14485#discussion_r1032038252
##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -393,22 +393,35 @@ class ARROW_EXPORT HashJoinNodeOptions : public
ExecNodeOptions {
/// This node will output one row for each row in the left table.
class ARROW_EXPORT AsofJoinNodeOptions : public ExecNodeOptions {
public:
- AsofJoinNodeOptions(FieldRef on_key, std::vector<FieldRef> by_key, int64_t
tolerance)
- : on_key(std::move(on_key)), by_key(by_key), tolerance(tolerance) {}
-
- /// \brief "on" key for the join.
+ /// \brief Keys for one input table of the AsofJoin operation
///
- /// All inputs tables must be sorted by the "on" key. Must be a single field
of a common
- /// type. Inexact match is used on the "on" key. i.e., a row is considered
match iff
- /// left_on - tolerance <= right_on <= left_on.
- /// Currently, the "on" key must be of an integer, date, or timestamp type.
- FieldRef on_key;
- /// \brief "by" key for the join.
+ /// The keys must be consistent across the input tables:
+ /// Each "on" key must refer to a field of the same type and units across
the tables.
+ /// Each "by" key must refer to a list of fields of the same types across
the tables.
+ struct Keys {
+ /// \brief "on" key for the join.
+ ///
+ /// The input table must be sorted by the "on" key. Must be a single field
of a common
+ /// type. Inexact match is used on the "on" key. i.e., a row is considered
match iff
+ /// left_on - tolerance <= right_on <= left_on.
+ /// Currently, the "on" key must be of an integer, date, or timestamp type.
+ FieldRef on_key;
+ /// \brief "by" key for the join.
+ ///
+ /// The input table must have each field of the "by" key. Exact equality
is used for
Review Comment:
```suggestion
/// Each input table must have each field of the "by" key. Exact
equality is used for
```
##########
cpp/cmake_modules/ThirdpartyToolchain.cmake:
##########
@@ -657,6 +657,13 @@ else()
"${THIRDPARTY_MIRROR_URL}/snappy-${ARROW_SNAPPY_BUILD_VERSION}.tar.gz")
endif()
+# Remove these two lines once
https://github.com/substrait-io/substrait/pull/342 merges
+set(ENV{ARROW_SUBSTRAIT_URL}
+
"https://github.com/substrait-io/substrait/archive/e59008b6b202f8af06c2266991161b1e45cb056a.tar.gz"
+)
+set(ARROW_SUBSTRAIT_BUILD_SHA256_CHECKSUM
+ "f64629cb377fcc62c9d3e8fe69fa6a4cf326f34d756e03db84843c5cce8d04cd")
+
Review Comment:
This will need a rebase now that #14415 has merged. I'm not sure if this
block will merge cleanly but it can be removed.
##########
cpp/src/arrow/engine/substrait/expression_internal.cc:
##########
@@ -926,16 +941,12 @@
Result<std::unique_ptr<substrait::Expression::ScalarFunction>> EncodeSubstraitCa
ToProto(*call.output_type(), call.output_nullable(), ext_set,
conversion_options));
scalar_fn->set_allocated_output_type(output_type.release());
- for (uint32_t i = 0; i < call.size(); i++) {
+ for (int i = 0; i < call.size(); i++) {
substrait::FunctionArgument* arg = scalar_fn->add_arguments();
if (call.HasEnumArg(i)) {
auto enum_val = std::make_unique<substrait::FunctionArgument::Enum>();
- ARROW_ASSIGN_OR_RAISE(std::optional<std::string_view> enum_arg,
call.GetEnumArg(i));
- if (enum_arg) {
- enum_val->set_specified(std::string(*enum_arg));
- } else {
- enum_val->set_allocated_unspecified(new google::protobuf::Empty());
- }
+ ARROW_ASSIGN_OR_RAISE(std::string_view enum_arg, call.GetEnumArg(i));
+ enum_val->set_specified(std::string(enum_arg));
Review Comment:
The changes in this file are probably out of date. If you hit merge issues
I'm pretty sure you can just discard these changes in this file.
##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -393,22 +393,35 @@ class ARROW_EXPORT HashJoinNodeOptions : public
ExecNodeOptions {
/// This node will output one row for each row in the left table.
class ARROW_EXPORT AsofJoinNodeOptions : public ExecNodeOptions {
public:
- AsofJoinNodeOptions(FieldRef on_key, std::vector<FieldRef> by_key, int64_t
tolerance)
- : on_key(std::move(on_key)), by_key(by_key), tolerance(tolerance) {}
-
- /// \brief "on" key for the join.
+ /// \brief Keys for one input table of the AsofJoin operation
///
- /// All inputs tables must be sorted by the "on" key. Must be a single field
of a common
- /// type. Inexact match is used on the "on" key. i.e., a row is considered
match iff
- /// left_on - tolerance <= right_on <= left_on.
- /// Currently, the "on" key must be of an integer, date, or timestamp type.
- FieldRef on_key;
- /// \brief "by" key for the join.
+ /// The keys must be consistent across the input tables:
+ /// Each "on" key must refer to a field of the same type and units across
the tables.
+ /// Each "by" key must refer to a list of fields of the same types across
the tables.
+ struct Keys {
+ /// \brief "on" key for the join.
+ ///
+ /// The input table must be sorted by the "on" key. Must be a single field
of a common
+ /// type. Inexact match is used on the "on" key. i.e., a row is considered
match iff
+ /// left_on - tolerance <= right_on <= left_on.
Review Comment:
Is the right side really `left_on` or is it `left_on + tolerance`?
##########
cpp/proto/substrait/extension_rels.proto:
##########
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto3";
+
+package arrow.substrait;
Review Comment:
@rtpsw I believe, in our experiment branch, we (and ibis-substrait) ended up
going with `package arrow.substrait_ext;` Do you want to update this here (and
if so we can remove the extra `namespace` statements. If not, we will need to
update ibis-substrait I think.
##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -393,22 +393,35 @@ class ARROW_EXPORT HashJoinNodeOptions : public
ExecNodeOptions {
/// This node will output one row for each row in the left table.
class ARROW_EXPORT AsofJoinNodeOptions : public ExecNodeOptions {
public:
- AsofJoinNodeOptions(FieldRef on_key, std::vector<FieldRef> by_key, int64_t
tolerance)
- : on_key(std::move(on_key)), by_key(by_key), tolerance(tolerance) {}
-
- /// \brief "on" key for the join.
+ /// \brief Keys for one input table of the AsofJoin operation
///
- /// All inputs tables must be sorted by the "on" key. Must be a single field
of a common
- /// type. Inexact match is used on the "on" key. i.e., a row is considered
match iff
- /// left_on - tolerance <= right_on <= left_on.
- /// Currently, the "on" key must be of an integer, date, or timestamp type.
- FieldRef on_key;
- /// \brief "by" key for the join.
+ /// The keys must be consistent across the input tables:
+ /// Each "on" key must refer to a field of the same type and units across
the tables.
+ /// Each "by" key must refer to a list of fields of the same types across
the tables.
+ struct Keys {
+ /// \brief "on" key for the join.
+ ///
+ /// The input table must be sorted by the "on" key. Must be a single field
of a common
+ /// type. Inexact match is used on the "on" key. i.e., a row is considered
match iff
Review Comment:
```suggestion
/// type. Inexact match is used on the "on" key. i.e., a row is
considered a match iff
```
##########
cpp/src/arrow/engine/substrait/options.cc:
##########
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <iostream>
+
+#include "arrow/engine/substrait/options.h"
+
+#include <google/protobuf/util/json_util.h>
+#include "arrow/compute/exec/asof_join_node.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/engine/substrait/expression_internal.h"
+#include "arrow/engine/substrait/options_internal.h"
+#include "arrow/engine/substrait/relation_internal.h"
+#include "substrait/extension_rels.pb.h"
+
+namespace arrow {
+namespace engine {
+
+namespace substrait = ::substrait;
+
+class DefaultExtensionProvider : public ExtensionProvider {
+ public:
+ Result<DeclarationInfo> MakeRel(const std::vector<DeclarationInfo>& inputs,
+ const google::protobuf::Any& rel,
+ const ExtensionSet& ext_set) override {
+ if (rel.Is<arrow::substrait::AsOfJoinRel>()) {
+ arrow::substrait::AsOfJoinRel as_of_join_rel;
+ rel.UnpackTo(&as_of_join_rel);
+ return MakeAsOfJoinRel(inputs, as_of_join_rel, ext_set);
+ }
+ return Status::NotImplemented("Unrecognized extension in Susbstrait plan:
",
+ rel.DebugString());
+ }
+
+ private:
+ Result<DeclarationInfo> MakeAsOfJoinRel(
+ const std::vector<DeclarationInfo>& inputs,
+ const arrow::substrait::AsOfJoinRel& as_of_join_rel, const ExtensionSet&
ext_set) {
+ if (inputs.size() < 2) {
+ return Status::Invalid("substrait::AsOfJoinNode too few input tables: ",
+ inputs.size());
+ }
+ if (static_cast<size_t>(as_of_join_rel.keys_size()) != inputs.size()) {
+ return Status::Invalid("substrait::AsOfJoinNode mismatched number of
inputs");
+ }
+
+ size_t n_input = inputs.size(), i = 0;
+ std::vector<compute::AsofJoinNodeOptions::Keys> input_keys(n_input);
+ for (const auto& keys : as_of_join_rel.keys()) {
+ // on-key
+ if (!keys.has_on()) {
+ return Status::Invalid("substrait::AsOfJoinNode missing on-key for
input ", i);
+ }
+ ARROW_ASSIGN_OR_RAISE(auto on_key_expr, FromProto(keys.on(), ext_set,
{}));
+ if (on_key_expr.field_ref() == NULLPTR) {
+ return Status::NotImplemented(
+ "substrait::AsOfJoinNode non-field-ref on-key for input ", i);
+ }
+ const FieldRef& on_key = *on_key_expr.field_ref();
+
+ // by-key
+ std::vector<FieldRef> by_key;
+ for (const auto& by_item : keys.by()) {
+ ARROW_ASSIGN_OR_RAISE(auto by_key_expr, FromProto(by_item, ext_set,
{}));
+ if (by_key_expr.field_ref() == NULLPTR) {
+ return Status::NotImplemented(
+ "substrait::AsOfJoinNode non-field-ref by-key for input ", i);
+ }
+ by_key.push_back(*by_key_expr.field_ref());
+ }
+
+ input_keys[i] = {std::move(on_key), std::move(by_key)};
+ ++i;
+ }
+
+ // schema
+ int64_t tolerance = as_of_join_rel.tolerance();
+ std::vector<std::shared_ptr<Schema>> input_schema(inputs.size());
+ for (size_t i = 0; i < inputs.size(); i++) {
+ input_schema[i] = inputs[i].output_schema;
+ }
+ ARROW_ASSIGN_OR_RAISE(auto schema,
+ compute::asofjoin::MakeOutputSchema(input_schema,
input_keys));
+ compute::AsofJoinNodeOptions asofjoin_node_opts{std::move(input_keys),
tolerance};
+
+ // declaration
+ std::vector<compute::Declaration::Input> input_decls(inputs.size());
+ for (size_t i = 0; i < inputs.size(); i++) {
+ input_decls[i] = inputs[i].declaration;
+ }
+ return DeclarationInfo{
+ compute::Declaration("asofjoin", input_decls,
std::move(asofjoin_node_opts)),
+ std::move(schema)};
+ }
Review Comment:
There is a proposal on
https://github.com/ibis-project/ibis-substrait/issues/423 to fix up the output
ordering to something ibis-substrait expects. I think we can handle it as part
of this PR or as a separate follow-up PR. Let me know which you'd prefer.
##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -393,22 +393,35 @@ class ARROW_EXPORT HashJoinNodeOptions : public
ExecNodeOptions {
/// This node will output one row for each row in the left table.
class ARROW_EXPORT AsofJoinNodeOptions : public ExecNodeOptions {
public:
- AsofJoinNodeOptions(FieldRef on_key, std::vector<FieldRef> by_key, int64_t
tolerance)
- : on_key(std::move(on_key)), by_key(by_key), tolerance(tolerance) {}
-
- /// \brief "on" key for the join.
+ /// \brief Keys for one input table of the AsofJoin operation
///
- /// All inputs tables must be sorted by the "on" key. Must be a single field
of a common
- /// type. Inexact match is used on the "on" key. i.e., a row is considered
match iff
- /// left_on - tolerance <= right_on <= left_on.
- /// Currently, the "on" key must be of an integer, date, or timestamp type.
- FieldRef on_key;
- /// \brief "by" key for the join.
+ /// The keys must be consistent across the input tables:
+ /// Each "on" key must refer to a field of the same type and units across
the tables.
+ /// Each "by" key must refer to a list of fields of the same types across
the tables.
+ struct Keys {
+ /// \brief "on" key for the join.
+ ///
+ /// The input table must be sorted by the "on" key. Must be a single field
of a common
+ /// type. Inexact match is used on the "on" key. i.e., a row is considered
match iff
+ /// left_on - tolerance <= right_on <= left_on.
+ /// Currently, the "on" key must be of an integer, date, or timestamp type.
+ FieldRef on_key;
+ /// \brief "by" key for the join.
+ ///
+ /// The input table must have each field of the "by" key. Exact equality
is used for
+ /// each field of the "by" key.
+ /// Currently, each field of the "by" key must be of an integer, date,
timestamp, or
+ /// base-binary type.
Review Comment:
Do we support bool here now? Or is that still only for payloads?
##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -393,22 +393,35 @@ class ARROW_EXPORT HashJoinNodeOptions : public
ExecNodeOptions {
/// This node will output one row for each row in the left table.
class ARROW_EXPORT AsofJoinNodeOptions : public ExecNodeOptions {
public:
- AsofJoinNodeOptions(FieldRef on_key, std::vector<FieldRef> by_key, int64_t
tolerance)
- : on_key(std::move(on_key)), by_key(by_key), tolerance(tolerance) {}
-
- /// \brief "on" key for the join.
+ /// \brief Keys for one input table of the AsofJoin operation
///
- /// All inputs tables must be sorted by the "on" key. Must be a single field
of a common
- /// type. Inexact match is used on the "on" key. i.e., a row is considered
match iff
- /// left_on - tolerance <= right_on <= left_on.
- /// Currently, the "on" key must be of an integer, date, or timestamp type.
- FieldRef on_key;
- /// \brief "by" key for the join.
+ /// The keys must be consistent across the input tables:
+ /// Each "on" key must refer to a field of the same type and units across
the tables.
+ /// Each "by" key must refer to a list of fields of the same types across
the tables.
+ struct Keys {
+ /// \brief "on" key for the join.
+ ///
+ /// The input table must be sorted by the "on" key. Must be a single field
of a common
+ /// type. Inexact match is used on the "on" key. i.e., a row is considered
match iff
+ /// left_on - tolerance <= right_on <= left_on.
+ /// Currently, the "on" key must be of an integer, date, or timestamp type.
+ FieldRef on_key;
+ /// \brief "by" key for the join.
+ ///
+ /// The input table must have each field of the "by" key. Exact equality
is used for
+ /// each field of the "by" key.
+ /// Currently, each field of the "by" key must be of an integer, date,
timestamp, or
+ /// base-binary type.
+ std::vector<FieldRef> by_key;
+ };
+
+ AsofJoinNodeOptions(std::vector<Keys> input_keys, int64_t tolerance)
+ : input_keys(std::move(input_keys)), tolerance(tolerance) {}
+
+ /// \brief AsofJoin keys per input table.
///
- /// All input tables must have the "by" key. Exact equality
- /// is used for the "by" key.
- /// Currently, the "by" key must be of an integer, date, timestamp, or
base-binary type
- std::vector<FieldRef> by_key;
+ /// See `Keys` for details.
Review Comment:
```suggestion
/// \see `Keys` for details.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]