lidavidm commented on a change in pull request #11964:
URL: https://github.com/apache/arrow/pull/11964#discussion_r771715353
##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -276,7 +291,12 @@ Result<RecordBatchGenerator>
CsvFileFormat::ScanBatchesAsync(
auto source = file->source();
auto reader_fut =
OpenReaderAsync(source, *this, scan_options,
::arrow::internal::GetCpuThreadPool());
- return GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
+ auto generator = GeneratorFromReader(std::move(reader_fut),
scan_options->batch_size);
+#ifdef ARROW_WITH_OPENTELEMETRY
+ generator = arrow::internal::tracing::WrapAsyncGenerator(
+ std::move(generator),
"arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next");
Review comment:
This method is an override, so we shouldn't have
`FileFormat::ScanBatchesAsync::Next`. That will happen only for ORC.
##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -148,9 +149,14 @@ static inline Result<csv::ReadOptions> GetReadOptions(
static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options, Executor* cpu_executor) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ auto tracer = arrow::internal::tracing::GetTracer();
+ auto span =
tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
+#endif
Review comment:
We could. I didn't want to wrap too much of the API, also, I figured
this would be best if people were very concerned about overhead.
##########
File path: cpp/src/arrow/util/tracing_internal.h
##########
@@ -97,6 +98,57 @@ AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T>
wrapped,
return fut;
};
}
+
+/// \brief Start a new span for each invocation of a generator.
+///
+/// The parent span of the new span will be the currently active span
+/// (if any) as of when WrapAsyncGenerator was itself called.
+template <typename T>
+AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped,
+ const std::string& span_name) {
+ opentelemetry::trace::StartSpanOptions options;
+ options.parent = GetTracer()->GetCurrentSpan()->GetContext();
+ return WrapAsyncGenerator(std::move(wrapped), std::move(options), span_name);
+}
+
+/// \brief End the given span when the given async generator ends.
+///
+/// The span will be made the active span each time the generator is called.
+template <typename T>
+AsyncGenerator<T> TieSpanToAsyncGenerator(
+ AsyncGenerator<T> wrapped,
+ opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span) {
+ return [=]() mutable -> Future<T> {
+ auto scope = GetTracer()->WithActiveSpan(span);
+ auto fut = wrapped();
+ fut.AddCallback([span](const Result<T>& result) {
+ if (!result.ok() || IsIterationEnd(*result)) {
+ MarkSpan(result.status(), span.get());
+ span->End();
+ }
+ });
+ return fut;
Review comment:
Also, this callback is added before any consumer has a chance to add
callbacks right? Or is the order of callbacks undetermined?
##########
File path: cpp/src/arrow/util/tracing_internal.h
##########
@@ -97,6 +98,57 @@ AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T>
wrapped,
return fut;
};
}
+
+/// \brief Start a new span for each invocation of a generator.
+///
+/// The parent span of the new span will be the currently active span
+/// (if any) as of when WrapAsyncGenerator was itself called.
+template <typename T>
+AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped,
+ const std::string& span_name) {
+ opentelemetry::trace::StartSpanOptions options;
+ options.parent = GetTracer()->GetCurrentSpan()->GetContext();
+ return WrapAsyncGenerator(std::move(wrapped), std::move(options), span_name);
+}
+
+/// \brief End the given span when the given async generator ends.
+///
+/// The span will be made the active span each time the generator is called.
+template <typename T>
+AsyncGenerator<T> TieSpanToAsyncGenerator(
+ AsyncGenerator<T> wrapped,
+ opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span) {
+ return [=]() mutable -> Future<T> {
+ auto scope = GetTracer()->WithActiveSpan(span);
+ auto fut = wrapped();
+ fut.AddCallback([span](const Result<T>& result) {
+ if (!result.ok() || IsIterationEnd(*result)) {
+ MarkSpan(result.status(), span.get());
+ span->End();
+ }
+ });
+ return fut;
Review comment:
Spans are only ended if `End` is called or if all references are freed.
In particular `span` is a `shared_ptr`. The "active span" is only relevant for
automatic span linkage. So starting a new active span does not necessarily end
the old span. We could certainly use `Then` here, but I don't think there's any
risk of ending the wrong span on accident.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]