joosthooz commented on a change in pull request #12609:
URL: https://github.com/apache/arrow/pull/12609#discussion_r829896797



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -167,9 +168,14 @@ static inline Result<csv::ReadOptions> GetReadOptions(
 static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
     const FileSource& source, const CsvFileFormat& format,
     const std::shared_ptr<ScanOptions>& scan_options, Executor* cpu_executor) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+  auto tracer = arrow::internal::tracing::GetTracer();
+  auto span = 
tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");

Review comment:
       I'd like to, but it results in errors such as 
   ```
   arrow/cpp/src/arrow/dataset/file_csv.cc:205:7: error: use of deleted 
function ‘arrow::util::tracing::Span::Span(const arrow::util::tracing::Span&)’
     205 |       [=](const Status& err) -> 
Result<std::shared_ptr<csv::StreamingReader>> {
         |       
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     206 | #ifdef ARROW_WITH_OPENTELEMETRY
         | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     207 |         arrow::internal::tracing::MarkSpan(err, span.get());
         |         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     208 |         span->End();
         |         ~~~~~~~~~~~~
     209 | #endif
         | ~~~~~~ 
     210 |         return err.WithMessage("Could not open CSV input source '", 
path, "': ", err);
         |         
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     211 |       });
         |       ~
   ```
   Because I need to create a span to use these macros:
   ```
   util::tracing::Span span;
     START_SPAN(span, "arrow::dataset::CsvFileFormat::OpenReaderAsync");
   ```
   This also happens in other places

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -167,9 +168,14 @@ static inline Result<csv::ReadOptions> GetReadOptions(
 static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
     const FileSource& source, const CsvFileFormat& format,
     const std::shared_ptr<ScanOptions>& scan_options, Executor* cpu_executor) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+  auto tracer = arrow::internal::tracing::GetTracer();
+  auto span = 
tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");

Review comment:
       I'd like to, but it results in errors such as 
   ```
   arrow/cpp/src/arrow/dataset/file_csv.cc:197:7: error: use of deleted 
function ‘arrow::util::tracing::Span::Span(const arrow::util::tracing::Span&)’
     197 |       [=](const std::shared_ptr<csv::StreamingReader>& reader)
         |       ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     198 |           -> Result<std::shared_ptr<csv::StreamingReader>> {
         |           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     199 |         MARK_SPAN(span, Status::OK());
         |         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     200 |         END_SPAN(span);
         |         ~~~~~~~~~~~~~~~
     201 |         return reader;
         |         ~~~~~~~~~~~~~~
     202 |       },
         |       ~
   
   ```
   Because I need to create a span to use these macros:
   ```
   util::tracing::Span span;
     START_SPAN(span, "arrow::dataset::CsvFileFormat::OpenReaderAsync");
   ```
   This also happens in other places

##########
File path: cpp/src/arrow/util/tracing_internal.h
##########
@@ -146,6 +210,19 @@ opentelemetry::trace::StartSpanOptions 
SpanOptionsWithParent(
         return st;                                                             
   \
       })
 
+#define GET_CURRENT_SPAN(span) \
+  auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()
+
+#define SET_SPAN_SCOPE(scope, span) \
+  auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span)
+
+#define TIE_SPAN_TO_GENERATOR(generator) \
+        generator = 
::arrow::internal::tracing::TieSpanToAsyncGenerator(generator)
+
+#define PROPAGATE_SPAN_TO_GENERATOR(generator) \
+        generator = \
+            
::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(generator)
+

Review comment:
       Their purpose is to have blank versions when compiling without 
opentelemetry
   ```
   #ifdef ARROW_WITH_OPENTELEMETRY
   #define GET_CURRENT_SPAN(span) \
     auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()
   ...
   #else
   #define START_SPAN(target_span, ...)
   ```
   So if I remove them, I need to guard all of the calls to these functions 
with `#ifdef ARROW_WITH_OPENTELEMETRY`

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -270,6 +273,20 @@ class FileReaderImpl : public FileReader {
       records_to_read +=
           
reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values();
     }
+#ifdef ARROW_WITH_OPENTELEMETRY
+    std::string column_name = reader_->metadata()->schema()->Column(i)->name();
+    std::string phys_type = 
TypeToString(reader_->metadata()->schema()->Column(i)->physical_type());
+    auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan();
+    ::arrow::util::tracing::Span childspan;
+    ::arrow::util::tracing::Span parentspan;
+    parentspan.Set(::arrow::util::tracing::Span::Impl{span});

Review comment:
       Looking at this again, it seems superfluous. I'll remove it and just 
start a span the normal way. That should already take care of setting the 
current span as the parent automatically.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -1060,7 +1077,10 @@ class RowGroupGenerator {
     }
     auto ready = reader->parquet_reader()->WhenBuffered({row_group}, 
column_indices);
     if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
-    return ready.Then([=]() -> ::arrow::Future<RecordBatchGenerator> {
+
+    GET_CURRENT_SPAN(span);

Review comment:
       I agree that would be better, but then the span object declaration is 
not guarded with the `#ifdef 
    ARROW_WITH_OPENTELEMETRY` (that was the reason I did it like this). But 
I'll try it and see if it works.

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -388,10 +388,16 @@ Result<RecordBatchGenerator> 
ParquetFileFormat::ScanBatchesAsync(
     pre_filtered = true;
     if (row_groups.empty()) return 
MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
   }
+#ifdef ARROW_WITH_OPENTELEMETRY
+  auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan();
+#endif
   // Open the reader and pay the real IO cost.
   auto make_generator =
       [=](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable
       -> Result<RecordBatchGenerator> {
+#ifdef ARROW_WITH_OPENTELEMETRY
+    auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span);

Review comment:
       The span is automatically ended when the scope is destroyed, so that is 
what we want. I tried to create a generic wrapper that can wrap this lambda 
like we do with the AsyncGenerators etc, but I couldn't manage to figure it out 
so I dropped it for now. Do you think such an approach is possible?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to