pitrou commented on code in PR #41614:
URL: https://github.com/apache/arrow/pull/41614#discussion_r1598463169


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -918,34 +922,39 @@ class CompositeTableBuilder {
 // guaranteeing this probability is below 1 in a billion. The fix is 128-bit 
hashing.
 // See ARROW-17653
 class AsofJoinNode : public ExecNode {
-  // Advances the RHS as far as possible to be up to date for the current LHS 
timestamp
-  Result<bool> UpdateRhs() {
+  // Advances the RHS as far as possible to be up to date for the current LHS 
timestamp.
+  // Returns a pair of booleans. The first is if any RHS has advanced. The 
second is if
+  // all RHS are up to date for LHS.
+  Result<std::pair<bool, bool>> UpdateRhsAndCheckUpToDateWithLhs() {

Review Comment:
   Can you return a struct with explanatory member names instead? 



##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -548,8 +548,10 @@ class InputState {
   // true when the queue is empty and, when memo may have future entries (the 
case of a
   // positive tolerance), when the memo is empty.
   // used when checking whether RHS is up to date with LHS.
-  bool CurrentEmpty() const {
-    return memo_.no_future_ ? Empty() : memo_.times_.empty() && Empty();
+  // NOTE: The emptiness must be decided by an atomic all to Empty() in 
caller, due to the

Review Comment:
   ```suggestion
     // NOTE: The emptiness must be decided by an atomic call to Empty() in 
caller, due to the
   ```



##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -548,8 +548,10 @@ class InputState {
   // true when the queue is empty and, when memo may have future entries (the 
case of a
   // positive tolerance), when the memo is empty.
   // used when checking whether RHS is up to date with LHS.
-  bool CurrentEmpty() const {
-    return memo_.no_future_ ? Empty() : memo_.times_.empty() && Empty();
+  // NOTE: The emptiness must be decided by an atomic all to Empty() in 
caller, due to the
+  // potential race with Push(), see GH-41614.
+  bool CurrentEmpty(bool empty) const {
+    return memo_.no_future_ ? empty : memo_.times_.empty() && empty;

Review Comment:
   Can you add parentheses to make sure operator precedence is not mistaken?



##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -548,8 +548,10 @@ class InputState {
   // true when the queue is empty and, when memo may have future entries (the 
case of a
   // positive tolerance), when the memo is empty.
   // used when checking whether RHS is up to date with LHS.
-  bool CurrentEmpty() const {
-    return memo_.no_future_ ? Empty() : memo_.times_.empty() && Empty();
+  // NOTE: The emptiness must be decided by an atomic all to Empty() in 
caller, due to the

Review Comment:
   Also, what do you call "atomic" here? I don't see any synchronization around 
calls to `Empty`.



##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1678,5 +1678,55 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) {
                           /*slow_r0=*/false);
 }
 
+TEST(AsofJoinTest, GH40675) {
+  auto left_batch = ExecBatchFromJSON(

Review Comment:
   It's nice to give the GH issue reference, but it would even be a bit better 
with a comment explaining what specifically this test is doing (without having 
to decipher all the test code).



##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1678,5 +1678,55 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) {
                           /*slow_r0=*/false);
 }
 
+TEST(AsofJoinTest, GH40675) {
+  auto left_batch = ExecBatchFromJSON(
+      {int64(), utf8()}, R"([[1, "a"], [1, "b"], [5, "a"], [6, "b"], [7, 
"f"]])");
+  auto right_batch = ExecBatchFromJSON(
+      {int64(), utf8(), float64()}, R"([[2, "a", 1.0], [9, "b", 3.0], [15, 
"g", 5.0]])");
+
+  Declaration left{
+      "exec_batch_source",
+      ExecBatchSourceNodeOptions(schema({field("colA", int64()), field("col2", 
utf8())}),
+                                 {std::move(left_batch)})};
+  Declaration right{
+      "exec_batch_source",
+      ExecBatchSourceNodeOptions(schema({field("colB", int64()), field("col3", 
utf8()),
+                                         field("colC", float64())}),
+                                 {std::move(right_batch)})};
+  AsofJoinNodeOptions asof_join_opts({{{"colA"}, {{"col2"}}}, {{"colB"}, 
{{"col3"}}}}, 1);
+  Declaration asof_join{
+      "asofjoin", {std::move(left), std::move(right)}, 
std::move(asof_join_opts)};
+
+  ASSERT_OK_AND_ASSIGN(auto result, 
DeclarationToExecBatches(std::move(asof_join)));
+
+  auto exp_batch = ExecBatchFromJSON(
+      {int64(), utf8(), float64()},
+      R"([[1, "a", 1.0], [1, "b", null], [5, "a", null], [6, "b", null], [7, 
"f", null]])");
+  AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, 
result.batches);
+}
+
+TEST(AsofJoinTest, GH41149) {
+  auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");

Review Comment:
   Same here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to