birschick-bq commented on code in PR #2847:
URL: https://github.com/apache/arrow-adbc/pull/2847#discussion_r2132907632
##########
csharp/src/Apache.Arrow.Adbc/AdbcOptions.cs:
##########
@@ -209,6 +209,25 @@ public static class IngestMode
public const string CreateAppend =
"adbc.ingest.mode.create_append";
}
+ public static class Telemetry
Review Comment:
Removed. Will refactor into a separate project/package.
##########
csharp/src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj:
##########
@@ -7,6 +7,10 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Apache.Arrow" Version="20.0.0" />
+ <PackageReference Include="OpenTelemetry" Version="1.12.0" />
+ <PackageReference Include="OpenTelemetry.Exporter.Console"
Version="1.12.0" />
+ <PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol"
Version="1.12.0" />
Review Comment:
Refactoring to a separate project/package.
##########
csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs:
##########
@@ -0,0 +1,393 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Tracing.FileExporter;
+using OpenTelemetry;
+using OpenTelemetry.Resources;
+using OpenTelemetry.Trace;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ /// <summary>
+ /// Provides a base implementation for a tracing source. If drivers want
to enable tracing,
+ /// they need to add a trace listener (e.g., <see cref="FileExporter"/>).
+ /// </summary>
+ public class ActivityTrace
+ {
+ private const string ProductVersionDefault = "1.0.0";
+ private static readonly string s_assemblyVersion = GetProductVersion();
+ private const string SourceNameDefault = "apache.arrow.adbc";
+ private const string OTelTracesExporterEnvironment =
"OTEL_TRACES_EXPORTER";
+ private bool _isDisposed;
+
+ /// <summary>
+ /// Constructs a new <see cref="ActivityTrace"/> object. If <paramref
name="activitySourceName"/> is set, it provides the
+ /// activity source name, otherwise the current assembly name is used
as the activity source name.
+ /// </summary>
+ /// <param name="activitySourceName"></param>
+ public ActivityTrace(string? activitySourceName = default, string?
traceParent = default)
+ {
+ activitySourceName ??= GetType().Assembly.GetName().Name!;
+ if (string.IsNullOrWhiteSpace(activitySourceName))
+ {
+ throw new ArgumentNullException(nameof(activitySourceName));
+ }
+
+ // This is required to be disposed
+ ActivitySource = new(activitySourceName, s_assemblyVersion);
+ TraceParent = traceParent;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="System.Diagnostics.ActivitySource"/>.
+ /// </summary>
+ public ActivitySource ActivitySource { get; }
+
+ /// <summary>
+ /// Gets the name of the <see
cref="System.Diagnostics.ActivitySource"/>
+ /// </summary>
+ public string ActivitySourceName => ActivitySource.Name;
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns>Returns a new <see cref="Activity"/> object if there is
any listener to the Activity, returns null otherwise</returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/>. If
an exception is thrown by the delegate, the Activity
+ /// status is set to <see cref="ActivityStatusCode.Error"/> and an
Activity <see cref="ActivityEvent"/> is added to the actitity
+ /// and finally the exception is rethrown.
+ /// </remarks>
+ public void TraceActivity(Action<Activity?> call, [CallerMemberName]
string? activityName = default, string? traceParent = default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <typeparam name="T">The return type for the delegate.</typeparam>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns>The result of the call to the delegate.</returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/> and
the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is
set to <see cref="ActivityStatusCode.Error"/>
+ /// and an Event <see cref="ActivityEvent"/> is added to the actitity
and finally the exception is rethrown.
+ /// </remarks>
+ public T TraceActivity<T>(Func<Activity?, T> call, [CallerMemberName]
string? activityName = default, string? traceParent = default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ T? result = call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ return result;
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns></returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/> and
the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is
set to <see cref="ActivityStatusCode.Error"/>
+ /// and an Event <see cref="ActivityEvent"/> is added to the actitity
and finally the exception is rethrown.
+ /// </remarks>
+ public async Task TraceActivityAsync(Func<Activity?, Task> call,
[CallerMemberName] string? activityName = default, string? traceParent =
default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ await call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <typeparam name="T">The return type for the delegate.</typeparam>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns>The result of the call to the delegate.</returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/> and
the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is
set to <see cref="ActivityStatusCode.Error"/>
+ /// and an Event <see cref="ActivityEvent"/> is added to the actitity
and finally the exception is rethrown.
+ /// </remarks>
+ public async Task<T> TraceActivityAsync<T>(Func<Activity?, Task<T>>
call, [CallerMemberName] string? activityName = default, string? traceParent =
default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ T? result = await call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ return result;
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <param name="activitySource">The <see cref="ActivitySource"/> to
start the <see cref="Activity"/> on.</param>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns></returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/> and
the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is
set to <see cref="ActivityStatusCode.Error"/>
+ /// and an Event <see cref="ActivityEvent"/> is added to the actitity
and finally the exception is rethrown.
+ /// </remarks>
+ public static async Task TraceActivityAsync(ActivitySource
activitySource, Func<Activity?, Task> call, [CallerMemberName] string?
activityName = default, string? traceParent = default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
activitySource, traceParent);
+ try
+ {
+ await call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <typeparam name="T">The return type for the delegate.</typeparam>
+ /// <param name="activitySource">The <see cref="ActivitySource"/> to
start the <see cref="Activity"/> on.</param>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns>The result of the call to the delegate.</returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/> and
the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is
set to <see cref="ActivityStatusCode.Error"/>
+ /// and an Event <see cref="ActivityEvent"/> is added to the actitity
and finally the exception is rethrown.
+ /// </remarks>
+ public static async Task<T> TraceActivityAsync<T>(ActivitySource
activitySource, Func<Activity?, Task<T>> call, [CallerMemberName] string?
activityName = default, string? traceParent = default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
activitySource, traceParent);
+ try
+ {
+ T? result = await call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ return result;
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the trace parent context.
+ /// </summary>
+ public string? TraceParent { get; set; }
Review Comment:
If the connection is cached and reused for another separate operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]