westonpace commented on code in PR #14799:
URL: https://github.com/apache/arrow/pull/14799#discussion_r1083292813


##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -1179,6 +1198,9 @@ class AsofJoinNode : public ExecNode {
                bool may_rehash);
 
   Status Init() override {
+    if (plan()->query_context()->exec_context()->executor() == nullptr) {
+      return Status::Invalid("AsOfJoinNode requires a non-null executor");
+    }

Review Comment:
   This is now required (it is enforced by the exec plan in StartProducing): 
https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/exec_plan.cc#L98
   
   So you can drop this check.
   ```suggestion
   ```



##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -1200,8 +1222,12 @@ class AsofJoinNode : public ExecNode {
   }
 
   virtual ~AsofJoinNode() {
-    process_.Push(false);  // poison pill
-    process_thread_.join();
+    process_.Push(false);                                          // poison 
pill
+    if (process_thread_.get_id() != std::this_thread::get_id()) {  // avoid 
deadlock
+      process_thread_.join();
+    } else {
+      process_thread_.detach();
+    }

Review Comment:
   ```suggestion
       process_.Push(false);  // poison pill
       process_thread_.join();
   ```
   Can we revert this change?  I thought it was no longer needed.  And I'd 
rather understand and resolve the underlying issue than just detach.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -683,6 +785,52 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
                          std::move(aggregate_schema));
     }
 
+    case substrait::Rel::RelTypeCase::kExtensionLeaf:
+    case substrait::Rel::RelTypeCase::kExtensionSingle:
+    case substrait::Rel::RelTypeCase::kExtensionMulti: {
+      std::vector<DeclarationInfo> ext_rel_inputs;
+      ARROW_ASSIGN_OR_RAISE(
+          auto ext_rel_info,
+          GetExtensionRelationInfo(rel, ext_set, conversion_options, 
&ext_rel_inputs));
+      const auto& ext_decl_info = ext_rel_info.decl_info;
+      auto ext_common_opt = GetExtensionRelCommon(rel);
+      bool has_emit = ext_common_opt && ext_common_opt->emit_kind_case() ==
+                                            
substrait::RelCommon::EmitKindCase::kEmit;
+      if (!ext_rel_info.field_output_indices) {
+        if (!has_emit) {
+          return ProcessEmitProject(ext_common_opt, ext_decl_info,
+                                    ext_decl_info.output_schema);
+        }

Review Comment:
   ```suggestion
           if (!has_emit) {
             return ext_decl_info;
           }
   ```
   
   Given that we know there is no emit that is all `ProcessEmitProject` is 
going to do anyways.  Then we can get rid of the change you made introducing 
`ProcessEmitProject` (e.g. creating the helper function) because we don't need 
it anymore.



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -169,6 +169,19 @@ inline compute::Expression UseBoringRefs(const 
compute::Expression& expr) {
   return compute::Expression{std::move(modified_call)};
 }
 
+Status CheckTable(std::shared_ptr<Buffer>& buf,
+                  const ConversionOptions& conversion_options = {}) {
+  std::shared_ptr<ExtensionIdRegistry> sp_ext_id_reg = 
MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  ExtensionSet ext_set(ext_id_reg);
+  ARROW_ASSIGN_OR_RAISE(auto sink_decls, DeserializePlans(
+                                             *buf, [] { return kNullConsumer; 
},
+                                             ext_id_reg, &ext_set, 
conversion_options));

Review Comment:
   You can pass `nullptr` for `ext_id_reg` and `ext_set` and get the same 
behavior I'm pretty sure.



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -4412,5 +4425,378 @@ TEST(Substrait, PlanWithAsOfJoinExtension) {
   CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options);
 }
 
+TEST(Substrait, PlanWithExtension) {
+  // This demos an extension relation
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "common": {
+              "emit": {
+                "outputMapping": [0, 1, 2, 5]
+              }
+            },
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value1"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T1"]
+                  }
+                }
+              },
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value2"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T2"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait_ext.AsOfJoinRel",
+              "keys" : [
+                {
+                  "on": {
+                    "selection": {
+                      "directReference": {
+                        "structField": {
+                          "field": 0,
+                        }
+                      },
+                      "rootReference": {}
+                    }
+                  },
+                  "by": [
+                    {
+                      "selection": {
+                        "directReference": {
+                          "structField": {
+                            "field": 1,
+                          }
+                        },
+                        "rootReference": {}
+                      }
+                    }
+                  ]
+               },
+                {
+                  "on": {
+                    "selection": {
+                      "directReference": {
+                        "structField": {
+                          "field": 0,
+                        }
+                      },
+                      "rootReference": {}
+                    }
+                  },
+                  "by": [
+                    {
+                      "selection": {
+                        "directReference": {
+                          "structField": {
+                            "field": 1,
+                          }
+                        },
+                        "rootReference": {}
+                      }
+                    }
+                  ]
+               }
+             ],
+              "tolerance": 1000
+            }
+          }
+        },
+        "names": ["time", "key", "value1", "value2"]
+      }
+    }],
+    "expectedTypeUrls": []
+  })";
+
+  std::vector<std::shared_ptr<Schema>> input_schema = {
+      schema({field("time", int32()), field("key", int32()), field("value1", 
float64())}),
+      schema(
+          {field("time", int32()), field("key", int32()), field("value2", 
float64())})};
+  NamedTableProvider table_provider = ProvideMadeTable(
+      [&input_schema](
+          const std::vector<std::string>& names) -> 
Result<std::shared_ptr<Table>> {
+        if (names.size() != 1) {
+          return Status::Invalid("Multiple test table names");
+        }
+        if (names[0] == "T1") {
+          return TableFromJSON(input_schema[0],
+                               {"[[2, 1, 1.1], [4, 1, 2.1], [6, 2, 3.1]]"});
+        }
+        if (names[0] == "T2") {
+          return TableFromJSON(input_schema[1],
+                               {"[[1, 1, 1.2], [3, 2, 2.2], [5, 2, 3.2]]"});
+        }
+        return Status::Invalid("Unknown test table name ", names[0]);
+      });
+  ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", 
substrait_json));
+
+  ASSERT_OK_AND_ASSIGN(
+      auto out_schema,
+      compute::asofjoin::MakeOutputSchema(
+          input_schema, {{FieldRef(0), {FieldRef(1)}}, {FieldRef(0), 
{FieldRef(1)}}}));
+  auto expected_table = TableFromJSON(
+      out_schema, {"[[2, 1, 1.1, 1.2], [4, 1, 2.1, 1.2], [6, 2, 3.1, 3.2]]"});
+  CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options);
+}
+
+TEST(Substrait, AsOfJoinDefaultEmit) {
+  std::string substrait_json = R"({
+    "extensionUris": [],
+    "extensions": [],
+    "relations": [{
+      "root": {
+        "input": {
+          "extension_multi": {
+            "inputs": [
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value1"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T1"]
+                  }
+                }
+              },
+              {
+                "read": {
+                  "common": {
+                    "direct": {
+                    }
+                  },
+                  "baseSchema": {
+                    "names": ["time", "key", "value2"],
+                    "struct": {
+                      "types": [
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "i32": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        },
+                        {
+                          "fp64": {
+                            "typeVariationReference": 0,
+                            "nullability": "NULLABILITY_NULLABLE"
+                          }
+                        }
+                      ],
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  },
+                  "namedTable": {
+                    "names": ["T2"]
+                  }
+                }
+              }
+            ],
+            "detail": {
+              "@type": "/arrow.substrait_ext.AsOfJoinRel",
+              "keys" : [
+                {
+                  "on": {
+                    "selection": {
+                      "directReference": {
+                        "structField": {
+                          "field": 0,
+                        }
+                      },
+                      "rootReference": {}
+                    }
+                  },
+                  "by": [
+                    {
+                      "selection": {
+                        "directReference": {
+                          "structField": {
+                            "field": 1,
+                          }
+                        },
+                        "rootReference": {}
+                      }
+                    }
+                  ]
+               },
+                {
+                  "on": {
+                    "selection": {
+                      "directReference": {
+                        "structField": {
+                          "field": 0,
+                        }
+                      },
+                      "rootReference": {}
+                    }
+                  },
+                  "by": [
+                    {
+                      "selection": {
+                        "directReference": {
+                          "structField": {
+                            "field": 1,
+                          }
+                        },
+                        "rootReference": {}
+                      }
+                    }
+                  ]
+               }
+             ],
+              "tolerance": 1000
+            }
+          }
+        },
+        "names": ["time", "key", "value1", "time2", "key2", "value2"]
+      }
+    }],
+    "expectedTypeUrls": []
+  })";
+
+  std::vector<std::shared_ptr<Schema>> input_schema = {
+      schema({field("time", int32()), field("key", int32()), field("value1", 
float64())}),
+      schema(
+          {field("time", int32()), field("key", int32()), field("value2", 
float64())})};
+  NamedTableProvider table_provider = ProvideMadeTable(
+      [&input_schema](
+          const std::vector<std::string>& names) -> 
Result<std::shared_ptr<Table>> {
+        if (names.size() != 1) {
+          return Status::Invalid("Multiple test table names");
+        }
+        if (names[0] == "T1") {
+          return TableFromJSON(input_schema[0],
+                               {"[[2, 1, 1.1], [4, 1, 2.1], [6, 2, 3.1]]"});
+        }
+        if (names[0] == "T2") {
+          return TableFromJSON(input_schema[1],
+                               {"[[1, 1, 1.2], [3, 2, 2.2], [5, 2, 3.2]]"});
+        }
+        return Status::Invalid("Unknown test table name ", names[0]);
+      });
+  ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", 
substrait_json));
+
+  auto out_schema = schema({field("time", int32()), field("key", int32()),
+                            field("value1", float64()), field("time2", 
int32()),
+                            field("key2", int32()), field("value2", 
float64())});
+
+  auto expected_table = TableFromJSON(
+      out_schema,
+      {"[[2, 1, 1.1, 2, 1, 1.2], [4, 1, 2.1, 4, 1, 1.2], [6, 2, 3.1, 6, 2, 
3.2]]"});
+  ASSERT_OK_AND_ASSIGN(auto io_executor, arrow::internal::ThreadPool::Make(1));
+  compute::ExecContext exec_context(default_memory_pool(), io_executor.get());

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -169,6 +169,19 @@ inline compute::Expression UseBoringRefs(const 
compute::Expression& expr) {
   return compute::Expression{std::move(modified_call)};
 }
 
+Status CheckTable(std::shared_ptr<Buffer>& buf,
+                  const ConversionOptions& conversion_options = {}) {
+  std::shared_ptr<ExtensionIdRegistry> sp_ext_id_reg = 
MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  ExtensionSet ext_set(ext_id_reg);
+  ARROW_ASSIGN_OR_RAISE(auto sink_decls, DeserializePlans(

Review Comment:
   There is now an `arrow::engine::DeserializePlan` which returns a single 
`Declaration` that you can use here.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -683,6 +785,52 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
                          std::move(aggregate_schema));
     }
 
+    case substrait::Rel::RelTypeCase::kExtensionLeaf:
+    case substrait::Rel::RelTypeCase::kExtensionSingle:
+    case substrait::Rel::RelTypeCase::kExtensionMulti: {
+      std::vector<DeclarationInfo> ext_rel_inputs;
+      ARROW_ASSIGN_OR_RAISE(
+          auto ext_rel_info,
+          GetExtensionRelationInfo(rel, ext_set, conversion_options, 
&ext_rel_inputs));
+      const auto& ext_decl_info = ext_rel_info.decl_info;
+      auto ext_common_opt = GetExtensionRelCommon(rel);
+      bool has_emit = ext_common_opt && ext_common_opt->emit_kind_case() ==
+                                            
substrait::RelCommon::EmitKindCase::kEmit;
+      if (!ext_rel_info.field_output_indices) {
+        if (!has_emit) {
+          return ProcessEmitProject(ext_common_opt, ext_decl_info,
+                                    ext_decl_info.output_schema);
+        }
+        return Status::NotImplemented("Emit not supported by ",
+                                      ext_decl_info.declaration.factory_name);
+      }
+      // Set up the emit order - an ordered list of indices that specifies an 
output
+      // mapping as expected by Substrait. This is a sublist of [0..N), where 
N is the
+      // total number of input fields across all inputs of the relation, that 
selects
+      // from these input fields.
+      std::vector<int> emit_order;
+      if (has_emit) {
+        // the emit order in defined in the Substrait plan - pick it up

Review Comment:
   ```suggestion
           // the emit order is defined in the Substrait plan - pick it up
   ```



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -169,6 +169,19 @@ inline compute::Expression UseBoringRefs(const 
compute::Expression& expr) {
   return compute::Expression{std::move(modified_call)};
 }
 
+Status CheckTable(std::shared_ptr<Buffer>& buf,

Review Comment:
   Maybe `CheckPlanRuns` or `CheckPlanDoesNotFail`.  You could use 
`DeclarationToStatus` too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to