CurtHagenlocher commented on code in PR #2847:
URL: https://github.com/apache/arrow-adbc/pull/2847#discussion_r2124267954
##########
csharp/src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj:
##########
@@ -7,6 +7,10 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Apache.Arrow" Version="20.0.0" />
+ <PackageReference Include="OpenTelemetry" Version="1.12.0" />
+ <PackageReference Include="OpenTelemetry.Exporter.Console"
Version="1.12.0" />
+ <PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol"
Version="1.12.0" />
Review Comment:
Is there a way to factor the code so that we don't need the base project to
have so many OTel dependencies? For instance, could we move the exporters to
another package and then ask the ADBC-based application to opt into supporting
specific exporters at runtime?
##########
csharp/src/Apache.Arrow.Adbc/AdbcOptions.cs:
##########
@@ -209,6 +209,25 @@ public static class IngestMode
public const string CreateAppend =
"adbc.ingest.mode.create_append";
}
+ public static class Telemetry
+ {
+ /// <summary>
+ /// EXPERIMENTAL. Sets/Gets the trace parent on OpenTelemetry
traces
+ /// </summary>
+ public const string TraceParent = "adbc.telemetry.trace_parent";
+
+ public static class Traces
+ {
+ public static class Exporter
+ {
+ public const string None = "none";
Review Comment:
Should there be a standard property name for these e.g.
`adbc.telemetry.exporter`?
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2ExtendedConnection.cs:
##########
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+using Apache.Arrow.Adbc.Tracing;
Review Comment:
seems unused
##########
csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Text;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ public static class ActivityExtensions
+ {
+ /// <summary>
+ /// Add a new <see cref="ActivityEvent"/> object to the <see
cref="Activity.Events" /> list.
+ /// </summary>
+ /// <param name="eventName">The name of the event.</param>
+ /// <param name="tags">The optional list of tags to attach to the
event.</param>
+ /// <returns><see langword="this"/> for convenient chaining.</returns>
+ public static Activity? AddEvent(this Activity? activity, string
eventName, IReadOnlyList<KeyValuePair<string, object?>>? tags = default)
+ {
+ if (activity == null) return activity;
+ ActivityTagsCollection? tagsCollection = tags == null ? null : new
ActivityTagsCollection(tags);
+ return activity?.AddEvent(new ActivityEvent(eventName, tags:
tagsCollection));
+ }
+
+ /// <summary>
+ /// Add a new <see cref="ActivityLink"/> to the <see
cref="Activity.Links"/> list.
+ /// </summary>
+ /// <param name="traceParent">The traceParent id for the associated
<see cref="ActivityContext"/>.</param>
+ /// <param name="tags">The optional list of tags to attach to the
event.</param>
+ /// <returns><see langword="this"/> for convenient chaining.</returns>
+ public static Activity? AddLink(this Activity? activity, string
traceParent, IReadOnlyList<KeyValuePair<string, object?>>? tags = default)
+ {
+ if (activity == null) return activity;
+ ActivityTagsCollection? tagsCollection = tags == null ? null : new
ActivityTagsCollection(tags);
+ return activity?.AddLink(new
ActivityLink(ActivityContext.Parse(traceParent, null), tags: tagsCollection));
+ }
+
+ /// <summary>
+ /// Update the Activity to have a tag with an additional 'key' and
value 'value'.
+ /// This shows up in the <see cref="TagObjects"/> enumeration. It is
meant for information that
+ /// is useful to log but not needed for runtime control (for the
latter, <see cref="Baggage"/>)
+ /// </summary>
+ /// <returns><see langword="this" /> for convenient chaining.</returns>
+ /// <param name="key">The tag key name as a function</param>
+ /// <param name="value">The tag value mapped to the input key as a
function</param>
+ public static Activity? AddTag(this Activity? activity, Func<string>
key, Func<object?> value)
+ {
+ return activity?.AddTag(key(), value());
+ }
+
+ /// <summary>
+ /// Update the Activity to have a tag with an additional 'key' and
value 'value'.
+ /// This shows up in the <see cref="TagObjects"/> enumeration. It is
meant for information that
+ /// is useful to log but not needed for runtime control (for the
latter, <see cref="Baggage"/>)
+ /// </summary>
+ /// <returns><see langword="this" /> for convenient chaining.</returns>
+ /// <param name="key">The tag key name</param>
+ /// <param name="value">The tag value mapped to the input key as a
function</param>
+ /// <param name="guidFormat">The format indicator for 16-byte GUID
arrays.</param>
+ public static Activity? AddTag(this Activity? activity, string key,
byte[]? value, string? guidFormat)
+ {
+ if ( value == null)
+ {
+ return activity?.AddTag(key, value);
+ }
+ if (value.Length == 16)
+ {
+ return activity?.AddTag(key, new
Guid(value).ToString(guidFormat));
+ }
+#if NET5_0_OR_GREATER
+ return activity?.AddTag(key, Convert.ToHexString(value));
+#else
+ return activity?.AddTag(key, ToHexString(value));
+#endif
+ }
+
+ private static string ToHexString(byte[] value)
+ {
+#if NET5_0_OR_GREATER
Review Comment:
If the call site is ifdef'd then we can ifdef the entire function
implementation instead of having two separate implementations. Given that the
JIT is likely to inline the NET5_0 case anyway, though, I think it's cleaner to
remove the ifdef from the call site and always call the helper.
##########
csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection.cs:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Threading.Tasks;
+using OpenTelemetry.Trace;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ public abstract class TracingConnection : AdbcConnection, IActivityTracer
+ {
+ private TracerProvider? _tracerProvider;
+ private bool _isDisposed;
+ internal readonly ActivityTrace _trace;
+
+ protected TracingConnection(IReadOnlyDictionary<string, string>
properties)
+ {
+ _tracerProvider = ActivityTrace.InitTracerProvider(out string
activitySourceName, out _, TracingAssembly);
+ properties.TryGetValue(AdbcOptions.Telemetry.TraceParent, out
string? traceParent);
+ _trace = new ActivityTrace(activitySourceName, traceParent);
+ }
+
+ public abstract Assembly TracingAssembly { get; }
Review Comment:
Under what circumstances would this have a value other than
`GetType().Assembly`? It's defined that way in every derived class so why not
just move it here and make it protected and non-virtual for now?
(Comment applies to multiple types.)
##########
csharp/src/Apache.Arrow.Adbc/Tracing/FileExporter/FileExporter.cs:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Text.Json;
+using System.Threading.Tasks;
+using System.Threading;
+using OpenTelemetry;
+using System.Collections.Concurrent;
+using System.IO;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Apache.Arrow.Adbc.Tracing.FileExporter
+{
+ internal class FileExporter : BaseExporter<Activity>
+ {
+ internal const long MaxFileSizeKbDefault = 1024;
+ internal const int MaxTraceFilesDefault = 999;
+ internal const string ApacheArrowAdbcNamespace = "Apache.Arrow.Adbc";
+ private const string TracesFolderName = "Traces";
+
+ private static readonly string s_tracingLocationDefault =
TracingLocationDefault;
+ private static readonly ConcurrentDictionary<string,
Lazy<FileExporterInstance>> s_fileExporters = new();
+ private static readonly byte[] s_newLine =
Encoding.UTF8.GetBytes(Environment.NewLine);
+
+ private readonly TracingFile _tracingFile;
+ private readonly string _fileBaseName;
+ private readonly string _tracesDirectoryFullName;
+ private readonly ConcurrentQueue<Activity> _activityQueue = new();
+ private readonly CancellationTokenSource _cancellationTokenSource;
+
+ private bool _disposed = false;
+
+ internal static bool TryCreate(out FileExporter? fileExporter,
FileExporterOptions options)
+ {
+ return TryCreate(
+ out fileExporter,
+ options.FileBaseName ?? ApacheArrowAdbcNamespace,
+ options.TraceLocation ?? TracingLocationDefault,
+ options.MaxTraceFileSizeKb,
+ options.MaxTraceFiles);
+ }
+
+ internal static bool TryCreate(
+ out FileExporter? fileExporter,
+ string fileBaseName,
+ string traceLocation,
+ long maxTraceFileSizeKb,
+ int maxTraceFiles)
+ {
+ ValidParameters(fileBaseName, traceLocation, maxTraceFileSizeKb,
maxTraceFiles);
+
+ DirectoryInfo tracesDirectory = new(traceLocation ??
s_tracingLocationDefault);
+ string tracesDirectoryFullName = tracesDirectory.FullName;
+
+ // In case we don't need to create this object, we'll lazy load
the object only if added to the collection.
+ var exporterInstance = new Lazy<FileExporterInstance>(() =>
+ {
+ CancellationTokenSource cancellationTokenSource = new();
+ FileExporter fileExporter = new(fileBaseName, tracesDirectory,
maxTraceFileSizeKb, maxTraceFiles);
+ return new FileExporterInstance(
+ fileExporter,
+ // This listens/polls for activity in the queue and writes
them to file
+ Task.Run(async () => await
ProcessActivitiesAsync(fileExporter, cancellationTokenSource.Token)),
+ cancellationTokenSource);
+ });
+
+ // We only want one exporter listening on a source in a particular
folder.
+ // If two or more exporters are running, it'll create duplicate
trace entries.
+ // On Dispose, ensure to stop and remove the only instance, in
case we need a new one later.
+ string listenerId = GetListenerId(fileBaseName,
tracesDirectoryFullName);
+ bool isAdded = s_fileExporters.TryAdd(listenerId,
exporterInstance);
+ if (isAdded)
+ {
+ // This instance was added so load the object now.
+ fileExporter = exporterInstance.Value.FileExporter;
+ return true;
+ }
+
+ // There is already an exporter listening on the source/location
+ fileExporter = null;
+ return false;
+ }
+
+ internal static void ValidParameters(string fileBaseName, string
traceLocation, long maxTraceFileSizeKb, int maxTraceFiles)
Review Comment:
```suggestion
internal static void ValidateParameters(string fileBaseName, string
traceLocation, long maxTraceFileSizeKb, int maxTraceFiles)
```
##########
csharp/src/Apache.Arrow.Adbc/Tracing/SerializableActivity.cs:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text.Json.Serialization;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ /// <summary>
+ /// Simplified version of <see cref="Activity"/> that excludes some
properties, etc.
+ /// </summary>
+ internal class SerializableActivity
Review Comment:
Should this be in the directory with the FileExporter?
##########
csharp/src/Apache.Arrow.Adbc/Tracing/FileExporter/FileExporter.cs:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Text.Json;
+using System.Threading.Tasks;
+using System.Threading;
+using OpenTelemetry;
+using System.Collections.Concurrent;
+using System.IO;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Apache.Arrow.Adbc.Tracing.FileExporter
+{
+ internal class FileExporter : BaseExporter<Activity>
+ {
+ internal const long MaxFileSizeKbDefault = 1024;
+ internal const int MaxTraceFilesDefault = 999;
+ internal const string ApacheArrowAdbcNamespace = "Apache.Arrow.Adbc";
+ private const string TracesFolderName = "Traces";
+
+ private static readonly string s_tracingLocationDefault =
TracingLocationDefault;
+ private static readonly ConcurrentDictionary<string,
Lazy<FileExporterInstance>> s_fileExporters = new();
+ private static readonly byte[] s_newLine =
Encoding.UTF8.GetBytes(Environment.NewLine);
+
+ private readonly TracingFile _tracingFile;
+ private readonly string _fileBaseName;
+ private readonly string _tracesDirectoryFullName;
+ private readonly ConcurrentQueue<Activity> _activityQueue = new();
+ private readonly CancellationTokenSource _cancellationTokenSource;
+
+ private bool _disposed = false;
+
+ internal static bool TryCreate(out FileExporter? fileExporter,
FileExporterOptions options)
+ {
+ return TryCreate(
+ out fileExporter,
+ options.FileBaseName ?? ApacheArrowAdbcNamespace,
+ options.TraceLocation ?? TracingLocationDefault,
+ options.MaxTraceFileSizeKb,
+ options.MaxTraceFiles);
+ }
+
+ internal static bool TryCreate(
+ out FileExporter? fileExporter,
+ string fileBaseName,
+ string traceLocation,
+ long maxTraceFileSizeKb,
+ int maxTraceFiles)
+ {
+ ValidParameters(fileBaseName, traceLocation, maxTraceFileSizeKb,
maxTraceFiles);
+
+ DirectoryInfo tracesDirectory = new(traceLocation ??
s_tracingLocationDefault);
+ string tracesDirectoryFullName = tracesDirectory.FullName;
+
+ // In case we don't need to create this object, we'll lazy load
the object only if added to the collection.
+ var exporterInstance = new Lazy<FileExporterInstance>(() =>
+ {
+ CancellationTokenSource cancellationTokenSource = new();
+ FileExporter fileExporter = new(fileBaseName, tracesDirectory,
maxTraceFileSizeKb, maxTraceFiles);
+ return new FileExporterInstance(
+ fileExporter,
+ // This listens/polls for activity in the queue and writes
them to file
+ Task.Run(async () => await
ProcessActivitiesAsync(fileExporter, cancellationTokenSource.Token)),
+ cancellationTokenSource);
+ });
+
+ // We only want one exporter listening on a source in a particular
folder.
+ // If two or more exporters are running, it'll create duplicate
trace entries.
+ // On Dispose, ensure to stop and remove the only instance, in
case we need a new one later.
+ string listenerId = GetListenerId(fileBaseName,
tracesDirectoryFullName);
+ bool isAdded = s_fileExporters.TryAdd(listenerId,
exporterInstance);
+ if (isAdded)
+ {
+ // This instance was added so load the object now.
+ fileExporter = exporterInstance.Value.FileExporter;
+ return true;
+ }
+
+ // There is already an exporter listening on the source/location
+ fileExporter = null;
+ return false;
+ }
+
+ internal static void ValidParameters(string fileBaseName, string
traceLocation, long maxTraceFileSizeKb, int maxTraceFiles)
+ {
+ if (string.IsNullOrWhiteSpace(fileBaseName))
+ throw new ArgumentNullException(nameof(fileBaseName));
+ if (fileBaseName.IndexOfAny(Path.GetInvalidFileNameChars()) >= 0)
+ throw new ArgumentException("Invalid or unsupported file
name", nameof(fileBaseName));
+ if (string.IsNullOrWhiteSpace(traceLocation) ||
traceLocation.IndexOfAny(Path.GetInvalidPathChars()) >= 0)
+ throw new ArgumentException("Invalid or unsupported folder
name", nameof(traceLocation));
+ if (maxTraceFileSizeKb < 1)
+ throw new ArgumentException("maxTraceFileSizeKb must be
greater than zero", nameof(maxTraceFileSizeKb));
+ if (maxTraceFiles < 1)
+ throw new ArgumentException("maxTraceFiles must be greater
than zero.", nameof(maxTraceFiles));
+
+ IsDirectoryWritable(traceLocation, throwIfFails: true);
+ }
+
+ private static string GetListenerId(string sourceName, string
traceFolderLocation) => $"{sourceName}{traceFolderLocation}";
+
+ public override ExportResult Export(in Batch<Activity> batch)
+ {
+ foreach (Activity activity in batch)
+ {
+ if (activity == null) continue;
+ _activityQueue.Enqueue(activity);
+ }
+ return ExportResult.Success;
+ }
+
+ private static async Task ProcessActivitiesAsync(FileExporter
fileExporter, CancellationToken cancellationToken)
+ {
+ TimeSpan delay = TimeSpan.FromMilliseconds(100);
+ // Polls for and then writes any activities in the queue
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ await Task.Delay(delay, cancellationToken);
+ await
fileExporter._tracingFile.WriteLinesAsync(GetActivitiesAsync(fileExporter._activityQueue),
cancellationToken);
+ }
+ }
+
+ private static bool IsDirectoryWritable(string traceLocation, bool
throwIfFails = false)
+ {
+ try
+ {
+ if (!Directory.Exists(traceLocation))
+ {
+ Directory.CreateDirectory(traceLocation);
+ }
+ string tempFilePath = Path.Combine(traceLocation,
Path.GetRandomFileName());
+ using FileStream fs = File.Create(tempFilePath, 1,
FileOptions.DeleteOnClose);
+ return true;
+ }
+ catch
+ {
+ if (throwIfFails)
+ throw;
+ else
+ return false;
+ }
+ }
+
+ private static async IAsyncEnumerable<Stream>
GetActivitiesAsync(ConcurrentQueue<Activity> activityQueue)
+ {
+ MemoryStream stream = new MemoryStream();
+ while (activityQueue.TryDequeue(out Activity? activity))
+ {
+ stream.SetLength(0);
+ SerializableActivity serilalizableActivity = new(activity);
Review Comment:
```suggestion
SerializableActivity serializableActivity = new(activity);
```
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs:
##########
@@ -822,6 +798,20 @@ internal IReadOnlyDictionary<string, int>
GetColumnIndexMap(List<TColumnDesc> co
protected abstract bool IsColumnSizeValidForDecimal { get; }
+ public override void SetOption(string key, string value)
+ {
+ switch (key.ToLowerInvariant())
+ {
+ case AdbcOptions.Telemetry.TraceParent:
+ TraceParent = string.IsNullOrEmpty(value) ? null : value;
Review Comment:
It seems strange to allow the parent to be set independently for the
connection and statement, and for these to have no relationship to each other.
But maybe my mental model for the parent isn't right?
##########
csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Text;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ public static class ActivityExtensions
+ {
+ /// <summary>
+ /// Add a new <see cref="ActivityEvent"/> object to the <see
cref="Activity.Events" /> list.
+ /// </summary>
+ /// <param name="eventName">The name of the event.</param>
+ /// <param name="tags">The optional list of tags to attach to the
event.</param>
+ /// <returns><see langword="this"/> for convenient chaining.</returns>
+ public static Activity? AddEvent(this Activity? activity, string
eventName, IReadOnlyList<KeyValuePair<string, object?>>? tags = default)
+ {
+ if (activity == null) return activity;
+ ActivityTagsCollection? tagsCollection = tags == null ? null : new
ActivityTagsCollection(tags);
+ return activity?.AddEvent(new ActivityEvent(eventName, tags:
tagsCollection));
+ }
+
+ /// <summary>
+ /// Add a new <see cref="ActivityLink"/> to the <see
cref="Activity.Links"/> list.
+ /// </summary>
+ /// <param name="traceParent">The traceParent id for the associated
<see cref="ActivityContext"/>.</param>
+ /// <param name="tags">The optional list of tags to attach to the
event.</param>
+ /// <returns><see langword="this"/> for convenient chaining.</returns>
+ public static Activity? AddLink(this Activity? activity, string
traceParent, IReadOnlyList<KeyValuePair<string, object?>>? tags = default)
+ {
+ if (activity == null) return activity;
+ ActivityTagsCollection? tagsCollection = tags == null ? null : new
ActivityTagsCollection(tags);
+ return activity?.AddLink(new
ActivityLink(ActivityContext.Parse(traceParent, null), tags: tagsCollection));
+ }
+
+ /// <summary>
+ /// Update the Activity to have a tag with an additional 'key' and
value 'value'.
+ /// This shows up in the <see cref="TagObjects"/> enumeration. It is
meant for information that
+ /// is useful to log but not needed for runtime control (for the
latter, <see cref="Baggage"/>)
+ /// </summary>
+ /// <returns><see langword="this" /> for convenient chaining.</returns>
+ /// <param name="key">The tag key name as a function</param>
+ /// <param name="value">The tag value mapped to the input key as a
function</param>
+ public static Activity? AddTag(this Activity? activity, Func<string>
key, Func<object?> value)
Review Comment:
Presumably the laziness is because the value might be expensive to compute
and we don't want to do it if the activity is null. But under what
circumstances would the key be lazy to compute?
##########
csharp/src/Apache.Arrow.Adbc/Tracing/IActivityTracer.cs:
##########
@@ -0,0 +1,93 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ /// <summary>
+ /// Provides a base implementation for a tracing source. If drivers want
to enable tracing,
+ /// they need to add a trace listener (e.g., <see cref="FileExporter"/>).
+ /// </summary>
+ public interface IActivityTracer : IDisposable
Review Comment:
It's not clear from the code how this interface is intended to be used, as
it's implemented by three types but not referenced otherwise. Also, each of the
three implementations is basically identical -- delegating the operation to the
underlying ActivityTrace. Consider instead defining something like
```
interface IActivityTracer
{
ActivityTrace Trace { get; }
string TraceParent? { get; }
}
```
and then defining these methods as extension methods on `IActivityTracer` so
they only need to be defined once.
##########
csharp/src/Apache.Arrow.Adbc/AdbcOptions.cs:
##########
@@ -209,6 +209,25 @@ public static class IngestMode
public const string CreateAppend =
"adbc.ingest.mode.create_append";
}
+ public static class Telemetry
Review Comment:
Are we proposing to align these across all ADBC implementations?
##########
csharp/src/Apache.Arrow.Adbc/Tracing/FileExporter/FileExporter.cs:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Text.Json;
+using System.Threading.Tasks;
+using System.Threading;
+using OpenTelemetry;
+using System.Collections.Concurrent;
+using System.IO;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Apache.Arrow.Adbc.Tracing.FileExporter
+{
+ internal class FileExporter : BaseExporter<Activity>
+ {
+ internal const long MaxFileSizeKbDefault = 1024;
+ internal const int MaxTraceFilesDefault = 999;
+ internal const string ApacheArrowAdbcNamespace = "Apache.Arrow.Adbc";
+ private const string TracesFolderName = "Traces";
+
+ private static readonly string s_tracingLocationDefault =
TracingLocationDefault;
+ private static readonly ConcurrentDictionary<string,
Lazy<FileExporterInstance>> s_fileExporters = new();
+ private static readonly byte[] s_newLine =
Encoding.UTF8.GetBytes(Environment.NewLine);
+
+ private readonly TracingFile _tracingFile;
+ private readonly string _fileBaseName;
+ private readonly string _tracesDirectoryFullName;
+ private readonly ConcurrentQueue<Activity> _activityQueue = new();
+ private readonly CancellationTokenSource _cancellationTokenSource;
+
+ private bool _disposed = false;
+
+ internal static bool TryCreate(out FileExporter? fileExporter,
FileExporterOptions options)
Review Comment:
Can we follow the convention of making the "out" parameter the last
parameter? (Applies throughout this change.)
##########
csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs:
##########
@@ -0,0 +1,393 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Tracing.FileExporter;
+using OpenTelemetry;
+using OpenTelemetry.Resources;
+using OpenTelemetry.Trace;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ /// <summary>
+ /// Provides a base implementation for a tracing source. If drivers want
to enable tracing,
+ /// they need to add a trace listener (e.g., <see cref="FileExporter"/>).
+ /// </summary>
+ public class ActivityTrace
+ {
+ private const string ProductVersionDefault = "1.0.0";
+ private static readonly string s_assemblyVersion = GetProductVersion();
Review Comment:
This is always going to be the version of this assembly and not of the
driver assembly. Is that what we want?
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2ConnectionFactory.cs:
##########
@@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
+using Apache.Arrow.Adbc.Tracing;
Review Comment:
Seems unused
##########
csharp/src/Drivers/Apache/Hive2/README.md:
##########
@@ -116,3 +117,31 @@ adbc.hive.path = "/hive2"
adbc.hive.host = $"{clusterHostName}"
username = $"{clusterUserName}"
password = $"{clusterPassword}"
+
+## Tracing Support
+
+OpenTelemetry tracing is now supported.
+
+One of the following exporters can be enabled via the environment variable
`OTEL_TRACES_EXPORTER`.
+If the environment variable is not set or empty, it behaves the same as for
`none`.
Review Comment:
Should there also be a way to set this via the connection properties?
##########
csharp/src/Apache.Arrow.Adbc/Tracing/SerializableActivity.cs:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text.Json.Serialization;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ /// <summary>
+ /// Simplified version of <see cref="Activity"/> that excludes some
properties, etc.
+ /// </summary>
+ internal class SerializableActivity
+ {
+ [JsonConstructor]
+ public SerializableActivity() { }
+
+ internal SerializableActivity(
+ ActivityStatusCode status,
+ string? statusDescription,
+ bool hasRemoteParent,
+ ActivityKind kind,
+ string operationName,
+ TimeSpan duration,
+ DateTime startTimeUtc,
+ string? id,
+ string? parentId,
+ string? rootId,
+ string? traceStateString,
+ ActivitySpanId spanId,
+ ActivityTraceId traceId,
+ bool recorded,
+ bool isAllDataRequested,
+ ActivityTraceFlags activityTraceFlags,
+ ActivitySpanId parentSpanId,
+ ActivityIdFormat idFormat,
+ IReadOnlyList<KeyValuePair<string, object?>> tagObjects,
+ IReadOnlyList<SerializableActivityEvent> events,
+ IReadOnlyList<SerializableActivityLink> links,
+ IReadOnlyList<KeyValuePair<string, string?>> baggage)
+ {
+ Status = statusDescription ?? status.ToString();
+ HasRemoteParent = hasRemoteParent;
+ Kind = kind.ToString();
+ OperationName = operationName;
+ Duration = duration;
+ StartTimeUtc = startTimeUtc;
+ Id = id;
+ ParentId = parentId;
+ RootId = rootId;
+ TraceStateString = traceStateString;
+ SpanId = spanId.ToHexString();
+ TraceId = traceId.ToHexString();
+ Recorded = recorded;
+ IsAllDataRequested = isAllDataRequested;
+ ActivityTraceFlags = activityTraceFlags.ToString();
+ ParentSpanId = parentSpanId.ToHexString();
+ IdFormat = idFormat.ToString();
+ TagObjects = tagObjects;
+ Events = events;
+ Links = links;
+ Baggage = baggage;
+ }
+
+ internal SerializableActivity(Activity activity) : this(
+ activity.Status,
+ activity.StatusDescription,
+ activity.HasRemoteParent,
+ activity.Kind,
+ activity.OperationName,
+ activity.Duration,
+ activity.StartTimeUtc,
+ activity.Id,
+ activity.ParentId,
+ activity.RootId,
+ activity.TraceStateString,
+ activity.SpanId,
+ activity.TraceId,
+ activity.Recorded,
+ activity.IsAllDataRequested,
+ activity.ActivityTraceFlags,
+ activity.ParentSpanId,
+ activity.IdFormat,
+ activity.TagObjects.ToArray(),
+ activity.Events.Select(e =>
(SerializableActivityEvent)e).ToArray(),
+ activity.Links.Select(l => (SerializableActivityLink)l).ToArray(),
+ activity.Baggage.ToArray())
+ { }
+
+ public string? Status { get; set; }
+ public bool HasRemoteParent { get; set; }
+ public string? Kind { get; set; }
+ public string OperationName { get; set; } = "";
+ public TimeSpan Duration { get; set; }
+ public DateTime StartTimeUtc { get; set; }
+ public string? Id { get; set; }
+ public string? ParentId { get; set; }
+ public string? RootId { get; set; }
+
+ public string? TraceStateString { get; set; }
+ public string? SpanId { get; set; }
+ public string? TraceId { get; set; }
+ public bool Recorded { get; set; }
+ public bool IsAllDataRequested { get; set; }
+ public string? ActivityTraceFlags { get; set; }
+ public string? ParentSpanId { get; set; }
+ public string? IdFormat { get; set; }
+
+ public IReadOnlyList<KeyValuePair<string, object?>> TagObjects { get;
set; } = [];
+ public IReadOnlyList<SerializableActivityEvent> Events { get; set; } =
[];
+ public IReadOnlyList<SerializableActivityLink> Links { get; set; } =
[];
+ public IReadOnlyList<KeyValuePair<string, string?>> Baggage { get;
set; } = [];
Review Comment:
Can `TagObjects` and `Baggage` have duplicated keys?
##########
csharp/src/Apache.Arrow.Adbc/Tracing/FileExporter/FileExporterExtensions.cs:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using OpenTelemetry;
+using OpenTelemetry.Trace;
+using Microsoft.Extensions.DependencyInjection;
Review Comment:
nit: sorting
##########
csharp/src/Apache.Arrow.Adbc/Tracing/FileExporter/FileExporter.cs:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Text.Json;
+using System.Threading.Tasks;
+using System.Threading;
+using OpenTelemetry;
+using System.Collections.Concurrent;
+using System.IO;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Apache.Arrow.Adbc.Tracing.FileExporter
+{
+ internal class FileExporter : BaseExporter<Activity>
+ {
+ internal const long MaxFileSizeKbDefault = 1024;
+ internal const int MaxTraceFilesDefault = 999;
+ internal const string ApacheArrowAdbcNamespace = "Apache.Arrow.Adbc";
+ private const string TracesFolderName = "Traces";
+
+ private static readonly string s_tracingLocationDefault =
TracingLocationDefault;
+ private static readonly ConcurrentDictionary<string,
Lazy<FileExporterInstance>> s_fileExporters = new();
+ private static readonly byte[] s_newLine =
Encoding.UTF8.GetBytes(Environment.NewLine);
+
+ private readonly TracingFile _tracingFile;
+ private readonly string _fileBaseName;
+ private readonly string _tracesDirectoryFullName;
+ private readonly ConcurrentQueue<Activity> _activityQueue = new();
+ private readonly CancellationTokenSource _cancellationTokenSource;
+
+ private bool _disposed = false;
+
+ internal static bool TryCreate(out FileExporter? fileExporter,
FileExporterOptions options)
+ {
+ return TryCreate(
+ out fileExporter,
+ options.FileBaseName ?? ApacheArrowAdbcNamespace,
+ options.TraceLocation ?? TracingLocationDefault,
+ options.MaxTraceFileSizeKb,
+ options.MaxTraceFiles);
+ }
+
+ internal static bool TryCreate(
+ out FileExporter? fileExporter,
+ string fileBaseName,
+ string traceLocation,
+ long maxTraceFileSizeKb,
+ int maxTraceFiles)
+ {
+ ValidParameters(fileBaseName, traceLocation, maxTraceFileSizeKb,
maxTraceFiles);
+
+ DirectoryInfo tracesDirectory = new(traceLocation ??
s_tracingLocationDefault);
+ string tracesDirectoryFullName = tracesDirectory.FullName;
+
+ // In case we don't need to create this object, we'll lazy load
the object only if added to the collection.
+ var exporterInstance = new Lazy<FileExporterInstance>(() =>
+ {
+ CancellationTokenSource cancellationTokenSource = new();
+ FileExporter fileExporter = new(fileBaseName, tracesDirectory,
maxTraceFileSizeKb, maxTraceFiles);
+ return new FileExporterInstance(
+ fileExporter,
+ // This listens/polls for activity in the queue and writes
them to file
+ Task.Run(async () => await
ProcessActivitiesAsync(fileExporter, cancellationTokenSource.Token)),
+ cancellationTokenSource);
+ });
+
+ // We only want one exporter listening on a source in a particular
folder.
+ // If two or more exporters are running, it'll create duplicate
trace entries.
+ // On Dispose, ensure to stop and remove the only instance, in
case we need a new one later.
+ string listenerId = GetListenerId(fileBaseName,
tracesDirectoryFullName);
+ bool isAdded = s_fileExporters.TryAdd(listenerId,
exporterInstance);
+ if (isAdded)
+ {
+ // This instance was added so load the object now.
+ fileExporter = exporterInstance.Value.FileExporter;
+ return true;
+ }
+
+ // There is already an exporter listening on the source/location
+ fileExporter = null;
+ return false;
+ }
+
+ internal static void ValidParameters(string fileBaseName, string
traceLocation, long maxTraceFileSizeKb, int maxTraceFiles)
+ {
+ if (string.IsNullOrWhiteSpace(fileBaseName))
+ throw new ArgumentNullException(nameof(fileBaseName));
+ if (fileBaseName.IndexOfAny(Path.GetInvalidFileNameChars()) >= 0)
+ throw new ArgumentException("Invalid or unsupported file
name", nameof(fileBaseName));
+ if (string.IsNullOrWhiteSpace(traceLocation) ||
traceLocation.IndexOfAny(Path.GetInvalidPathChars()) >= 0)
+ throw new ArgumentException("Invalid or unsupported folder
name", nameof(traceLocation));
+ if (maxTraceFileSizeKb < 1)
+ throw new ArgumentException("maxTraceFileSizeKb must be
greater than zero", nameof(maxTraceFileSizeKb));
+ if (maxTraceFiles < 1)
+ throw new ArgumentException("maxTraceFiles must be greater
than zero.", nameof(maxTraceFiles));
+
+ IsDirectoryWritable(traceLocation, throwIfFails: true);
+ }
+
+ private static string GetListenerId(string sourceName, string
traceFolderLocation) => $"{sourceName}{traceFolderLocation}";
+
+ public override ExportResult Export(in Batch<Activity> batch)
+ {
+ foreach (Activity activity in batch)
+ {
+ if (activity == null) continue;
+ _activityQueue.Enqueue(activity);
+ }
+ return ExportResult.Success;
+ }
+
+ private static async Task ProcessActivitiesAsync(FileExporter
fileExporter, CancellationToken cancellationToken)
+ {
+ TimeSpan delay = TimeSpan.FromMilliseconds(100);
+ // Polls for and then writes any activities in the queue
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ await Task.Delay(delay, cancellationToken);
+ await
fileExporter._tracingFile.WriteLinesAsync(GetActivitiesAsync(fileExporter._activityQueue),
cancellationToken);
+ }
+ }
+
+ private static bool IsDirectoryWritable(string traceLocation, bool
throwIfFails = false)
+ {
+ try
+ {
+ if (!Directory.Exists(traceLocation))
+ {
+ Directory.CreateDirectory(traceLocation);
+ }
+ string tempFilePath = Path.Combine(traceLocation,
Path.GetRandomFileName());
+ using FileStream fs = File.Create(tempFilePath, 1,
FileOptions.DeleteOnClose);
+ return true;
+ }
+ catch
Review Comment:
```suggestion
catch when (!throwIfFails)
```
##########
csharp/src/Apache.Arrow.Adbc/Tracing/FileExporter/FileExporter.cs:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Text.Json;
+using System.Threading.Tasks;
+using System.Threading;
+using OpenTelemetry;
Review Comment:
nit: ordering
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs:
##########
@@ -664,50 +628,62 @@ internal static async Task
PollForResponseAsync(TOperationHandle operationHandle
private string GetInfoTypeStringValue(TGetInfoType infoType)
{
- TGetInfoReq req = new()
+ return TraceActivity(activity =>
{
- SessionHandle = SessionHandle ?? throw new
InvalidOperationException("session not created"),
- InfoType = infoType,
- };
+ TGetInfoReq req = new()
+ {
+ SessionHandle = SessionHandle ?? throw new
InvalidOperationException("session not created"),
+ InfoType = infoType,
+ };
- CancellationToken cancellationToken =
ApacheUtility.GetCancellationToken(QueryTimeoutSeconds,
ApacheUtility.TimeUnit.Seconds);
- try
- {
- TGetInfoResp getInfoResp = Client.GetInfo(req,
cancellationToken).Result;
- if (getInfoResp.Status.StatusCode == TStatusCode.ERROR_STATUS)
+ CancellationToken cancellationToken =
ApacheUtility.GetCancellationToken(QueryTimeoutSeconds,
ApacheUtility.TimeUnit.Seconds);
+ try
{
- throw new
HiveServer2Exception(getInfoResp.Status.ErrorMessage)
- .SetNativeError(getInfoResp.Status.ErrorCode)
- .SetSqlState(getInfoResp.Status.SqlState);
- }
+ TGetInfoResp getInfoResp = Client.GetInfo(req,
cancellationToken).Result;
+ ApacheUtility.HandleThriftResponse(getInfoResp.Status,
GetResponseHandlers(activity));
- return getInfoResp.InfoValue.StringValue;
- }
- catch (Exception ex) when
(ExceptionHelper.IsOperationCanceledOrCancellationRequested(ex,
cancellationToken))
- {
- throw new TimeoutException("The metadata query execution timed
out. Consider increasing the query timeout value.", ex);
- }
- catch (Exception ex) when (ex is not HiveServer2Exception)
- {
- throw new HiveServer2Exception($"An unexpected error occurred
while running metadata query. '{ex.Message}'", ex);
- }
+ return getInfoResp.InfoValue.StringValue;
+ }
+ catch (Exception ex) when
(ExceptionHelper.IsOperationCanceledOrCancellationRequested(ex,
cancellationToken))
+ {
+ throw new TimeoutException("The metadata query execution
timed out. Consider increasing the query timeout value.", ex);
+ }
+ catch (Exception ex) when (ex is not HiveServer2Exception)
+ {
+ throw new HiveServer2Exception($"An unexpected error
occurred while running metadata query. '{ex.Message}'", ex);
+ }
+ });
}
- public override void Dispose()
+ protected override void Dispose(bool disposing)
{
- if (_client != null && SessionHandle != null)
+ TraceActivity(activity =>
Review Comment:
`activity` isn't used until HandleThriftResponse. Consider factoring the
network operation into a separate method and putting the `TraceActivity` into
that method and keeping the `Dispose` small and more readable.
##########
csharp/src/Apache.Arrow.Adbc/Tracing/TracingReader.cs:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ public abstract class TracingReader : IArrowArrayStream, IActivityTracer
+ {
+ private bool _isDisposed;
+ private readonly TracingStatement _statement;
+
+ protected TracingReader(TracingStatement statement)
+ {
+ _statement = statement;
+ }
+
+ public abstract Schema Schema { get; }
+
+ public abstract ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default);
+
+ private ActivityTrace Trace => _statement._trace;
+
+ private string? TraceParent => _statement.TraceParent;
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!_isDisposed)
+ {
+ if (disposing)
+ {
+ // TODO: dispose managed state (managed objects)
Review Comment:
Please remove all these TODOs
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs:
##########
@@ -822,6 +798,20 @@ internal IReadOnlyDictionary<string, int>
GetColumnIndexMap(List<TColumnDesc> co
protected abstract bool IsColumnSizeValidForDecimal { get; }
+ public override void SetOption(string key, string value)
+ {
+ switch (key.ToLowerInvariant())
+ {
+ case AdbcOptions.Telemetry.TraceParent:
+ TraceParent = string.IsNullOrEmpty(value) ? null : value;
Review Comment:
Hmm... so I guess this is where the parent is set? And the scenario is that
the caller opens a connection once but can set a new parent every time the
execute a statement?
##########
csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs:
##########
@@ -0,0 +1,393 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Tracing.FileExporter;
+using OpenTelemetry;
+using OpenTelemetry.Resources;
+using OpenTelemetry.Trace;
+
+namespace Apache.Arrow.Adbc.Tracing
+{
+ /// <summary>
+ /// Provides a base implementation for a tracing source. If drivers want
to enable tracing,
+ /// they need to add a trace listener (e.g., <see cref="FileExporter"/>).
+ /// </summary>
+ public class ActivityTrace
+ {
+ private const string ProductVersionDefault = "1.0.0";
+ private static readonly string s_assemblyVersion = GetProductVersion();
+ private const string SourceNameDefault = "apache.arrow.adbc";
+ private const string OTelTracesExporterEnvironment =
"OTEL_TRACES_EXPORTER";
+ private bool _isDisposed;
+
+ /// <summary>
+ /// Constructs a new <see cref="ActivityTrace"/> object. If <paramref
name="activitySourceName"/> is set, it provides the
+ /// activity source name, otherwise the current assembly name is used
as the activity source name.
+ /// </summary>
+ /// <param name="activitySourceName"></param>
+ public ActivityTrace(string? activitySourceName = default, string?
traceParent = default)
+ {
+ activitySourceName ??= GetType().Assembly.GetName().Name!;
+ if (string.IsNullOrWhiteSpace(activitySourceName))
+ {
+ throw new ArgumentNullException(nameof(activitySourceName));
+ }
+
+ // This is required to be disposed
+ ActivitySource = new(activitySourceName, s_assemblyVersion);
+ TraceParent = traceParent;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="System.Diagnostics.ActivitySource"/>.
+ /// </summary>
+ public ActivitySource ActivitySource { get; }
+
+ /// <summary>
+ /// Gets the name of the <see
cref="System.Diagnostics.ActivitySource"/>
+ /// </summary>
+ public string ActivitySourceName => ActivitySource.Name;
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns>Returns a new <see cref="Activity"/> object if there is
any listener to the Activity, returns null otherwise</returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/>. If
an exception is thrown by the delegate, the Activity
+ /// status is set to <see cref="ActivityStatusCode.Error"/> and an
Activity <see cref="ActivityEvent"/> is added to the actitity
+ /// and finally the exception is rethrown.
+ /// </remarks>
+ public void TraceActivity(Action<Activity?> call, [CallerMemberName]
string? activityName = default, string? traceParent = default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <typeparam name="T">The return type for the delegate.</typeparam>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns>The result of the call to the delegate.</returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/> and
the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is
set to <see cref="ActivityStatusCode.Error"/>
+ /// and an Event <see cref="ActivityEvent"/> is added to the actitity
and finally the exception is rethrown.
+ /// </remarks>
+ public T TraceActivity<T>(Func<Activity?, T> call, [CallerMemberName]
string? activityName = default, string? traceParent = default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ T? result = call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ return result;
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns></returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/> and
the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is
set to <see cref="ActivityStatusCode.Error"/>
+ /// and an Event <see cref="ActivityEvent"/> is added to the actitity
and finally the exception is rethrown.
+ /// </remarks>
+ public async Task TraceActivityAsync(Func<Activity?, Task> call,
[CallerMemberName] string? activityName = default, string? traceParent =
default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ await call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <typeparam name="T">The return type for the delegate.</typeparam>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns>The result of the call to the delegate.</returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/> and
the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is
set to <see cref="ActivityStatusCode.Error"/>
+ /// and an Event <see cref="ActivityEvent"/> is added to the actitity
and finally the exception is rethrown.
+ /// </remarks>
+ public async Task<T> TraceActivityAsync<T>(Func<Activity?, Task<T>>
call, [CallerMemberName] string? activityName = default, string? traceParent =
default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
ActivitySource, traceParent ?? TraceParent);
+ try
+ {
+ T? result = await call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ return result;
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <param name="activitySource">The <see cref="ActivitySource"/> to
start the <see cref="Activity"/> on.</param>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns></returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/> and
the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is
set to <see cref="ActivityStatusCode.Error"/>
+ /// and an Event <see cref="ActivityEvent"/> is added to the actitity
and finally the exception is rethrown.
+ /// </remarks>
+ public static async Task TraceActivityAsync(ActivitySource
activitySource, Func<Activity?, Task> call, [CallerMemberName] string?
activityName = default, string? traceParent = default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
activitySource, traceParent);
+ try
+ {
+ await call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Invokes the delegate within the context of a new started <see
cref="Activity"/>.
+ /// </summary>
+ /// <typeparam name="T">The return type for the delegate.</typeparam>
+ /// <param name="activitySource">The <see cref="ActivitySource"/> to
start the <see cref="Activity"/> on.</param>
+ /// <param name="call">The delegate to call within the context of a
newly started <see cref="Activity"/></param>
+ /// <param name="methodName">The name of the method for the
activity.</param>
+ /// <returns>The result of the call to the delegate.</returns>
+ /// <remarks>
+ /// Creates and starts a new <see cref="Activity"/> object if there is
any listener for the ActivitySource.
+ /// Passes the Activity to the delegate and invokes the delegate. If
there are no exceptions thrown by the delegate the
+ /// Activity status is set to <see cref="ActivityStatusCode.Ok"/> and
the result is returned.
+ /// If an exception is thrown by the delegate, the Activity status is
set to <see cref="ActivityStatusCode.Error"/>
+ /// and an Event <see cref="ActivityEvent"/> is added to the actitity
and finally the exception is rethrown.
+ /// </remarks>
+ public static async Task<T> TraceActivityAsync<T>(ActivitySource
activitySource, Func<Activity?, Task<T>> call, [CallerMemberName] string?
activityName = default, string? traceParent = default)
+ {
+ using Activity? activity = StartActivityInternal(activityName,
activitySource, traceParent);
+ try
+ {
+ T? result = await call.Invoke(activity);
+ if (activity?.Status == ActivityStatusCode.Unset)
activity?.SetStatus(ActivityStatusCode.Ok);
+ return result;
+ }
+ catch (Exception ex)
+ {
+ TraceException(ex, activity);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the trace parent context.
+ /// </summary>
+ public string? TraceParent { get; set; }
Review Comment:
Under what circumstances should the parent be changed after the activity is
created?
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs:
##########
@@ -76,50 +78,54 @@ public HiveServer2Reader(
HiveServer2Statement statement,
Schema schema,
DataTypeConversion dataTypeConversion,
- bool enableBatchSizeStopCondition = true)
+ bool enableBatchSizeStopCondition = true) : base(statement)
{
_statement = statement;
Schema = schema;
_dataTypeConversion = dataTypeConversion;
_enableBatchSizeStopCondition = enableBatchSizeStopCondition;
}
- public Schema Schema { get; }
+ public override Schema Schema { get; }
- public async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+ public override async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
- // All records have been exhausted
- if (_statement == null)
+ return await TraceActivity(async activity =>
{
- return null;
- }
+ // All records have been exhausted
+ if (_hasNoMoreData)
+ {
+ return null;
+ }
+ try
+ {
+ // Await the fetch response
+ TFetchResultsResp response = await FetchNext(_statement,
cancellationToken);
+ ApacheUtility.HandleThriftResponse(response.Status,
HiveServer2Connection.GetResponseHandlers(activity));
- try
- {
- // Await the fetch response
- TFetchResultsResp response = await FetchNext(_statement,
cancellationToken);
+ int columnCount = GetColumnCount(response.Results);
+ int rowCount = GetRowCount(response.Results, columnCount);
+ if ((_enableBatchSizeStopCondition && _statement.BatchSize
> 0 && rowCount < _statement.BatchSize) || rowCount == 0)
+ {
+ // This is the last batch
+ _hasNoMoreData = true;
+ }
- int columnCount = GetColumnCount(response.Results);
- int rowCount = GetRowCount(response.Results, columnCount);
- if ((_enableBatchSizeStopCondition && _statement.BatchSize > 0
&& rowCount < _statement.BatchSize) || rowCount == 0)
+ activity?.AddTag(TagOptions.Db.Response.ReturnedRows,
rowCount);
Review Comment:
This tag will be added multiple times with the same name. That's okay?
##########
csharp/src/Drivers/Apache/Impala/ImpalaConnectionFactory.cs:
##########
@@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
+using Apache.Arrow.Adbc.Tracing;
Review Comment:
Seems unused. There are a few more places where this was added; I'll stop
pointing it out now.
##########
csharp/src/Drivers/Apache/Impala/ImpalaConnection.cs:
##########
@@ -20,6 +20,7 @@
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Tracing;
Review Comment:
Seems unused
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]