joosthooz commented on a change in pull request #12609:
URL: https://github.com/apache/arrow/pull/12609#discussion_r829896797
##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -167,9 +168,14 @@ static inline Result<csv::ReadOptions> GetReadOptions(
static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options, Executor* cpu_executor) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ auto tracer = arrow::internal::tracing::GetTracer();
+ auto span =
tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
Review comment:
I'd like to, but it results in errors such as
```
arrow/cpp/src/arrow/dataset/file_csv.cc:205:7: error: use of deleted
function ‘arrow::util::tracing::Span::Span(const arrow::util::tracing::Span&)’
205 | [=](const Status& err) ->
Result<std::shared_ptr<csv::StreamingReader>> {
|
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
206 | #ifdef ARROW_WITH_OPENTELEMETRY
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
207 | arrow::internal::tracing::MarkSpan(err, span.get());
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
208 | span->End();
| ~~~~~~~~~~~~
209 | #endif
| ~~~~~~
210 | return err.WithMessage("Could not open CSV input source '",
path, "': ", err);
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
211 | });
| ~
```
Because I need to create a span to use these macros:
```
util::tracing::Span span;
START_SPAN(span, "arrow::dataset::CsvFileFormat::OpenReaderAsync");
```
This also happens in other places
##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -167,9 +168,14 @@ static inline Result<csv::ReadOptions> GetReadOptions(
static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options, Executor* cpu_executor) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ auto tracer = arrow::internal::tracing::GetTracer();
+ auto span =
tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
Review comment:
I'd like to, but it results in errors such as
```
arrow/cpp/src/arrow/dataset/file_csv.cc:197:7: error: use of deleted
function ‘arrow::util::tracing::Span::Span(const arrow::util::tracing::Span&)’
197 | [=](const std::shared_ptr<csv::StreamingReader>& reader)
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
198 | -> Result<std::shared_ptr<csv::StreamingReader>> {
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
199 | MARK_SPAN(span, Status::OK());
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
200 | END_SPAN(span);
| ~~~~~~~~~~~~~~~
201 | return reader;
| ~~~~~~~~~~~~~~
202 | },
| ~
```
Because I need to create a span to use these macros:
```
util::tracing::Span span;
START_SPAN(span, "arrow::dataset::CsvFileFormat::OpenReaderAsync");
```
This also happens in other places
##########
File path: cpp/src/arrow/util/tracing_internal.h
##########
@@ -146,6 +210,19 @@ opentelemetry::trace::StartSpanOptions
SpanOptionsWithParent(
return st;
\
})
+#define GET_CURRENT_SPAN(span) \
+ auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()
+
+#define SET_SPAN_SCOPE(scope, span) \
+ auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span)
+
+#define TIE_SPAN_TO_GENERATOR(generator) \
+ generator =
::arrow::internal::tracing::TieSpanToAsyncGenerator(generator)
+
+#define PROPAGATE_SPAN_TO_GENERATOR(generator) \
+ generator = \
+
::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(generator)
+
Review comment:
Their purpose is to have blank versions when compiling without
opentelemetry
```
#ifdef ARROW_WITH_OPENTELEMETRY
#define GET_CURRENT_SPAN(span) \
auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()
...
#else
#define START_SPAN(target_span, ...)
```
So if I remove them, I need to guard all of the calls to these functions
with `#ifdef ARROW_WITH_OPENTELEMETRY`
##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -270,6 +273,20 @@ class FileReaderImpl : public FileReader {
records_to_read +=
reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values();
}
+#ifdef ARROW_WITH_OPENTELEMETRY
+ std::string column_name = reader_->metadata()->schema()->Column(i)->name();
+ std::string phys_type =
TypeToString(reader_->metadata()->schema()->Column(i)->physical_type());
+ auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan();
+ ::arrow::util::tracing::Span childspan;
+ ::arrow::util::tracing::Span parentspan;
+ parentspan.Set(::arrow::util::tracing::Span::Impl{span});
Review comment:
Looking at this again, it seems superfluous. I'll remove it and just
start a span the normal way. That should already take care of setting the
current span as the parent automatically.
##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -1060,7 +1077,10 @@ class RowGroupGenerator {
}
auto ready = reader->parquet_reader()->WhenBuffered({row_group},
column_indices);
if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
- return ready.Then([=]() -> ::arrow::Future<RecordBatchGenerator> {
+
+ GET_CURRENT_SPAN(span);
Review comment:
I agree that would be better, but then the span object declaration is
not guarded with the `#ifdef
ARROW_WITH_OPENTELEMETRY` (that was the reason I did it like this). But
I'll try it and see if it works.
##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -388,10 +388,16 @@ Result<RecordBatchGenerator>
ParquetFileFormat::ScanBatchesAsync(
pre_filtered = true;
if (row_groups.empty()) return
MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
}
+#ifdef ARROW_WITH_OPENTELEMETRY
+ auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan();
+#endif
// Open the reader and pay the real IO cost.
auto make_generator =
[=](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable
-> Result<RecordBatchGenerator> {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span);
Review comment:
The span is automatically ended when the scope is destroyed, so that is
what we want. I tried to create a generic wrapper that can wrap this lambda
like we do with the AsyncGenerators etc, but I couldn't manage to figure it out
so I dropped it for now. Do you think such an approach is possible?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]