lidavidm commented on a change in pull request #12609:
URL: https://github.com/apache/arrow/pull/12609#discussion_r838476520
##########
File path: cpp/src/arrow/util/thread_pool.cc
##########
@@ -58,6 +60,19 @@ SerialExecutor::~SerialExecutor() = default;
Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
StopToken stop_token, StopCallback&&
stop_callback) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ // Wrap the task to propagate a parent tracing span to it
+ struct {
+ void operator()() {
+ auto scope =
::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan);
+ std::move(func)();
+ }
+ FnOnce<void()> func;
+ opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> activeSpan;
Review comment:
This should be named `active_span` (same below)
##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -242,7 +243,23 @@ class ConcreteFutureImpl : public FutureImpl {
void AddCallback(Callback callback, CallbackOptions opts) {
CheckOptions(opts);
std::unique_lock<std::mutex> lock(mutex_);
+#ifdef ARROW_WITH_OPENTELEMETRY
+ struct Wrapstruct {
+ void operator()(const FutureImpl& impl) {
+ auto scope =
::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan);
+ std::move(func)(impl);
+ }
+ Callback func;
+ opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> activeSpan;
Review comment:
This should be named `active_span`
##########
File path: cpp/src/arrow/util/thread_pool.cc
##########
@@ -58,6 +60,19 @@ SerialExecutor::~SerialExecutor() = default;
Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
StopToken stop_token, StopCallback&&
stop_callback) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ // Wrap the task to propagate a parent tracing span to it
+ struct {
+ void operator()() {
+ auto scope =
::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan);
+ std::move(func)();
+ }
+ FnOnce<void()> func;
+ opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> activeSpan;
+ } wrapper{std::forward<FnOnce<void()>>(task),
Review comment:
is `std::forward` the right call here, or just `std::move`?
##########
File path: cpp/src/arrow/util/thread_pool.cc
##########
@@ -58,6 +60,19 @@ SerialExecutor::~SerialExecutor() = default;
Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
StopToken stop_token, StopCallback&&
stop_callback) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ // Wrap the task to propagate a parent tracing span to it
+ struct {
+ void operator()() {
+ auto scope =
::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan);
+ std::move(func)();
+ }
+ FnOnce<void()> func;
+ opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> activeSpan;
+ } wrapper{std::forward<FnOnce<void()>>(task),
Review comment:
Nit, but the struct declarations are subtly different here and in
future.cc, maybe we can keep them consistent? (I think the style here is more
common in the codebase.)
##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -1173,15 +1187,13 @@ Future<std::shared_ptr<Table>>
FileReaderImpl::DecodeRowGroups(
// OptionalParallelForAsync requires an executor
if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();
- auto read_column = [row_groups, self, this](size_t i,
-
std::shared_ptr<ColumnReaderImpl> reader)
+ auto read_column = [=](size_t i, std::shared_ptr<ColumnReaderImpl> reader)
mutable
Review comment:
Is the `mutable` still necessary here?
##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -1060,7 +1072,8 @@ class RowGroupGenerator {
}
auto ready = reader->parquet_reader()->WhenBuffered({row_group},
column_indices);
if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
- return ready.Then([=]() -> ::arrow::Future<RecordBatchGenerator> {
+
+ return ready.Then([=]() mutable -> ::arrow::Future<RecordBatchGenerator> {
Review comment:
Is the `mutable` still necessary here?
##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -144,12 +144,12 @@ class ARROW_EXPORT Executor {
// will return the callable's result value once.
// The callable's arguments are copied before execution.
template <typename Function, typename... Args,
+ typename FuncResult = ::arrow::detail::result_of_t<Function &&
(Args && ...)>,
typename FutureType = typename
::arrow::detail::ContinueFuture::ForSignature<
Function && (Args && ...)>>
Result<FutureType> Submit(TaskHints hints, StopToken stop_token, Function&&
func,
Args&&... args) {
using ValueType = typename FutureType::ValueType;
-
auto future = FutureType::Make();
Review comment:
Are the changes here still necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]