westonpace commented on code in PR #33909:
URL: https://github.com/apache/arrow/pull/33909#discussion_r1093868774


##########
cpp/proto/substrait/extension_rels.proto:
##########
@@ -44,3 +44,16 @@ message AsOfJoinRel {
     repeated .substrait.Expression by = 2;
   }
 }
+
+// Named tap relation
+//
+// A tap is a relation having a single input relation that it passes through, 
while also
+// causing some side-effect, e.g., writing to external storage.
+message NamedTapRel {
+  // The kind of tap
+  string kind = 1;
+  // A name used to configure the tap, e.g., a URI defining the destination of 
writing
+  string name = 2;
+  // Column names for the tap's output. Their number must be 0 or equal to the 
input's.

Review Comment:
   ```suggestion
     // Column names for the tap's output. If specified there must be one name 
per field.
     // This can be empty and field names will be generated
   ```



##########
cpp/src/arrow/engine/substrait/options.cc:
##########
@@ -28,36 +28,58 @@
 namespace arrow {
 namespace engine {
 
+namespace {
+
+std::vector<compute::Declaration::Input> MakeDeclarationInputss(
+    const std::vector<DeclarationInfo>& inputs) {
+  std::vector<compute::Declaration::Input> input_decls(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    input_decls[i] = inputs[i].declaration;
+  }
+  return input_decls;
+}
+
+}  // namespace
+
 class BaseExtensionProvider : public ExtensionProvider {
  public:
-  Result<RelationInfo> MakeRel(const std::vector<DeclarationInfo>& inputs,
+  Result<RelationInfo> MakeRel(const ConversionOptions& conv_opts,
+                               const std::vector<DeclarationInfo>& inputs,
                                const ExtensionDetails& ext_details,
                                const ExtensionSet& ext_set) override {
     auto details = dynamic_cast<const DefaultExtensionDetails&>(ext_details);
-    return MakeRel(inputs, details.rel, ext_set);
+    return MakeRel(conv_opts, inputs, details.rel, ext_set);
   }
 
-  virtual Result<RelationInfo> MakeRel(const std::vector<DeclarationInfo>& 
inputs,
+  virtual Result<RelationInfo> MakeRel(const ConversionOptions& conv_opts,
+                                       const std::vector<DeclarationInfo>& 
inputs,
                                        const google::protobuf::Any& rel,
                                        const ExtensionSet& ext_set) = 0;
 };
 
 class DefaultExtensionProvider : public BaseExtensionProvider {
  public:
-  Result<RelationInfo> MakeRel(const std::vector<DeclarationInfo>& inputs,
+  Result<RelationInfo> MakeRel(const ConversionOptions& conv_opts,
+                               const std::vector<DeclarationInfo>& inputs,
                                const google::protobuf::Any& rel,
                                const ExtensionSet& ext_set) override {
     if (rel.Is<substrait_ext::AsOfJoinRel>()) {
       substrait_ext::AsOfJoinRel as_of_join_rel;
       rel.UnpackTo(&as_of_join_rel);
-      return MakeAsOfJoinRel(inputs, as_of_join_rel, ext_set);
+      return MakeAsOfJoinRel(conv_opts, inputs, as_of_join_rel, ext_set);
+    }
+    if (rel.Is<substrait_ext::NamedTapRel>()) {
+      substrait_ext::NamedTapRel named_tap_rel;
+      rel.UnpackTo(&named_tap_rel);
+      return MakeNamedTapRel(conv_opts, inputs, named_tap_rel, ext_set);
     }
     return Status::NotImplemented("Unrecognized extension in Susbstrait plan: 
",
                                   rel.DebugString());
   }
 
  private:
-  Result<RelationInfo> MakeAsOfJoinRel(const std::vector<DeclarationInfo>& 
inputs,
+  Result<RelationInfo> MakeAsOfJoinRel(const ConversionOptions& conv_opts,

Review Comment:
   Any particular reason to add `conv_opts` here?



##########
cpp/src/arrow/engine/substrait/options.cc:
##########
@@ -28,36 +28,58 @@
 namespace arrow {
 namespace engine {
 
+namespace {
+
+std::vector<compute::Declaration::Input> MakeDeclarationInputss(

Review Comment:
   ```suggestion
   std::vector<compute::Declaration::Input> MakeDeclarationInputs(
   ```



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -5355,5 +5380,97 @@ TEST(Substrait, AsOfJoinDefaultEmit) {
   CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options);
 }
 
+TEST(Substrait, PlanWithNamedTapExtension) {
+  // This demos an extension relation
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait_ext.NamedTapRel",
+              "kind" : "pass_for_named_tap",
+              "name" : "does_not_matter",
+              "columns" : ["pass_time", "pass_key", "pass_value"]
+            }
+          }
+        },
+        "names": ["t", "k", "v"]
+      }
+    }],
+    "expectedTypeUrls": []
+  })";
+
+  ASSERT_OK(AddPassFactory("pass_for_named_tap"));
+
+  std::shared_ptr<Schema> input_schema =
+      schema({field("time", int32()), field("key", int32()), field("value", 
float64())});
+  NamedTableProvider table_provider = ProvideMadeTable(
+      [&input_schema](
+          const std::vector<std::string>& names) -> 
Result<std::shared_ptr<Table>> {
+        if (names.size() != 1) {
+          return Status::Invalid("Multiple test table names");
+        }
+        if (names[0] == "T") {
+          return TableFromJSON(input_schema, {"[[2, 1, 1.1], [4, 1, 2.1], [6, 
2, 3.1]]"});
+        }
+        return Status::Invalid("Unknown test table name ", names[0]);
+      });

Review Comment:
   Use `AlwaysProvideSameTable` for brevity.



##########
cpp/src/arrow/engine/substrait/options.h:
##########
@@ -93,6 +106,11 @@ struct ARROW_ENGINE_EXPORT ConversionOptions {
   /// The default behavior will return an invalid status if the plan has any
   /// named table relations.
   NamedTableProvider named_table_provider = kDefaultNamedTableProvider;
+  /// \brief A custom strategy to be used for mapping a tap kind to a function 
name
+  ///
+  /// The default behavior will return an invalid status if the plan has any
+  /// named tap relations.

Review Comment:
   Couldn't a default implementation pass the `kind` through directly?  E.g. 
`pass_for_named_tap -> pass_for_named_tap`?



##########
cpp/src/arrow/engine/substrait/options.cc:
##########
@@ -111,15 +133,43 @@ class DefaultExtensionProvider : public 
BaseExtensionProvider {
     compute::AsofJoinNodeOptions asofjoin_node_opts{std::move(input_keys), 
tolerance};
 
     // declaration
-    std::vector<compute::Declaration::Input> input_decls(inputs.size());
-    for (size_t i = 0; i < inputs.size(); i++) {
-      input_decls[i] = inputs[i].declaration;
-    }
+    auto input_decls = MakeDeclarationInputss(inputs);
     return RelationInfo{
         {compute::Declaration("asofjoin", input_decls, 
std::move(asofjoin_node_opts)),
          std::move(schema)},
         std::move(field_output_indices)};
   }
+
+  Result<RelationInfo> MakeNamedTapRel(const ConversionOptions& conv_opts,
+                                       const std::vector<DeclarationInfo>& 
inputs,
+                                       const substrait_ext::NamedTapRel& 
named_tap_rel,
+                                       const ExtensionSet& ext_set) {
+    if (inputs.size() != 1) {
+      return Status::Invalid(
+          "substrait_ext::NamedTapNode requires a single table but got: ", 
inputs.size());

Review Comment:
   ```suggestion
             "substrait_ext::NamedTapRel requires a single input but got: ", 
inputs.size());
   ```



##########
cpp/src/arrow/engine/substrait/options.cc:
##########
@@ -111,15 +133,43 @@ class DefaultExtensionProvider : public 
BaseExtensionProvider {
     compute::AsofJoinNodeOptions asofjoin_node_opts{std::move(input_keys), 
tolerance};
 
     // declaration
-    std::vector<compute::Declaration::Input> input_decls(inputs.size());
-    for (size_t i = 0; i < inputs.size(); i++) {
-      input_decls[i] = inputs[i].declaration;
-    }
+    auto input_decls = MakeDeclarationInputss(inputs);

Review Comment:
   ```suggestion
       auto input_decls = MakeDeclarationInputs(inputs);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to