westonpace commented on code in PR #34392:
URL: https://github.com/apache/arrow/pull/34392#discussion_r1185225329
##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -200,8 +207,74 @@ class ConcurrentQueue {
std::condition_variable cond_;
};
+class AsofJoinNode;
+
+#ifndef NDEBUG
+// Get the debug-stream associated with the as-of-join node
+std::ostream* GetDebugStream(AsofJoinNode& node);
+
+// Get the debug-mutex associated with the as-of-join node
+std::mutex* GetDebugMutex(AsofJoinNode& node);
+
+// A debug-facility that wraps output-stream insertions with synchronization.
Code like
+//
+// DebugSync(as_of_join_node) << ... << ... << ... ;
+//
+// will insert to the node's debug-stream and guard all insertions as one
operation using
+// the node's debug-mutex.
+class DebugSync {
Review Comment:
I typically work around this by using concatenation and newlines instead of
streams but that's more of a hack.
Can we put this in a util file somewhere. Perhaps we could do this in a
follow-up logging PR?
##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -451,6 +438,53 @@ struct BasicTest {
return types;
}
+// code generation for the by_key types supported by AsofJoinNodeOptions
constructors
+// which cannot be directly done using templates because of failure to deduce
the template
+// argument for an invocation with a string- or initializer_list-typed
keys-argument
+#define EXPAND_BY_KEY_TYPE(macro) \
+ macro(const FieldRef); \
+ macro(std::vector<FieldRef>); \
+ macro(std::initializer_list<FieldRef>);
+
+#define CHECK_RUN_OUTPUT(by_key_type)
\
+ void CheckRunOutput(
\
+ const BatchesWithSchema& l_batches, const BatchesWithSchema& r0_batches,
\
+ const BatchesWithSchema& r1_batches, const BatchesWithSchema&
exp_batches, \
+ const FieldRef time, by_key_type key, const int64_t tolerance) {
\
+ CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches,
\
+ GetRepeatedOptions(3, time, {key}, tolerance));
\
+ }
+
+ EXPAND_BY_KEY_TYPE(CHECK_RUN_OUTPUT)
+
+#undef CHECK_RUN_OUTPUT
+#undef EXPAND_BY_KEY_TYPE
+
+ void CheckRunOutput(const BatchesWithSchema& l_batches,
+ const BatchesWithSchema& r0_batches,
+ const BatchesWithSchema& r1_batches,
+ const BatchesWithSchema& exp_batches,
+ const AsofJoinNodeOptions join_options) {
+#ifndef NDEBUG
Review Comment:
Do we need `NDEBUG` checks in tests?
##########
cpp/src/arrow/acero/options_internal.h:
##########
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifndef NDEBUG
+
+#include <mutex>
+#include <ostream>
+
+namespace arrow {
+namespace acero {
+
+struct DebugOptions {
+ DebugOptions(std::ostream* os, std::mutex* mutex) : os(os), mutex(mutex) {}
+
+ std::ostream* os;
+ std::mutex* mutex;
+};
Review Comment:
Instead of passing these all around in options can we just make it part of
the query context & query options?
##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -370,13 +475,20 @@ class KeyHasher {
column_arrays_[k] =
ColumnArrayFromArrayDataAndMetadata(array_data, metadata_[k], i,
length);
}
+ // write directly to the cache
Hashing64::HashMultiColumn(column_arrays_, &ctx_, hashes_.data() + i);
}
- batch_ = batch;
+#ifndef NDEBUG
+ DebugSync(*node_) << "key hasher " << index_ << " got hashes "
+ << compute::internal::GenericToString(hashes_) <<
std::endl;
+#endif
Review Comment:
I wonder if it would be possible to do something like...
```
DEBUG_SYNC(*node_, "key hasher", index_, "got hashes ",
compute::internal::GenericToString(hashes_), std::endl);
```
The macro could compile to nothing in release mode and then we can get rid
of the `#ifndef`
##########
cpp/src/arrow/chunked_array.h:
##########
@@ -263,8 +263,10 @@ Status ApplyBinaryChunked(const ChunkedArray& left, const
ChunkedArray& right,
Action&& action) {
MultipleChunkIterator iterator(left, right);
std::shared_ptr<Array> left_piece, right_piece;
+ int64_t pos = iterator.position();
while (iterator.Next(&left_piece, &right_piece)) {
- ARROW_RETURN_NOT_OK(action(*left_piece, *right_piece,
iterator.position()));
+ ARROW_RETURN_NOT_OK(action(*left_piece, *right_piece, pos));
+ pos = iterator.position();
Review Comment:
Why do we need this change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]