This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new dc358d50e feat(csharp/src/Telemetry/Traces/Exporters): refactor and 
improve performance of file exporter (#3397)
dc358d50e is described below

commit dc358d50eecc93e2f006cb642aefccc5765957da
Author: Bruce Irschick <[email protected]>
AuthorDate: Mon Sep 15 14:14:07 2025 -0700

    feat(csharp/src/Telemetry/Traces/Exporters): refactor and improve 
performance of file exporter (#3397)
    
    Refactors and (significantly) improves performance of the file exporter.
    * Adds use of `System.Threading.Channels` to improve clarity and
    shutdown handling.
    * Now keeps open the current trace file until full or disposed.
    * Only checks for deleting old files when a new file is created.
    * Adds a handler for `OnForceFlush` to try to flush messages when
    needed.
    * Improved shutdown/dispose reliability to flush messages.
    
    Benefits
    - orders of magnitude improvement in performance
    - huge improvement in concurrent use in the same folder
---
 ...he.Arrow.Adbc.Telemetry.Traces.Exporters.csproj |   1 +
 .../Traces/Exporters/FileExporter/FileExporter.cs  |  81 ++++++------
 .../Traces/Exporters/FileExporter/TracingFile.cs   | 145 ++++++++++++++-------
 .../Exporters/FileExporter/FileExporterTests.cs    |  47 ++++---
 .../Exporters/FileExporter/TracingFileTests.cs     |   9 +-
 5 files changed, 177 insertions(+), 106 deletions(-)

diff --git 
a/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj
 
b/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj
index b4109883f..4dec891d1 100644
--- 
a/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj
+++ 
b/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj
@@ -16,6 +16,7 @@
   <ItemGroup>
     <PackageReference Include="OpenTelemetry.Exporter.Console" 
Version="1.12.0" />
     <PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" 
Version="1.12.0" />
+    <PackageReference Include="System.Threading.Channels" Version="9.0.8" />
   </ItemGroup>
 
   <ItemGroup>
diff --git a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs 
b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs
index a034a938a..a1251a8d7 100644
--- a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs
+++ b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs
@@ -17,12 +17,12 @@
 
 using System;
 using System.Collections.Concurrent;
-using System.Collections.Generic;
 using System.Diagnostics;
 using System.IO;
 using System.Text;
 using System.Text.Json;
 using System.Threading;
+using System.Threading.Channels;
 using System.Threading.Tasks;
 using OpenTelemetry;
 
@@ -41,8 +41,8 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
         private readonly TracingFile _tracingFile;
         private readonly string _fileBaseName;
         private readonly string _tracesDirectoryFullName;
-        private readonly ConcurrentQueue<Activity> _activityQueue = new();
-        private readonly CancellationTokenSource _cancellationTokenSource;
+        private readonly Channel<Activity> _channel = 
Channel.CreateUnbounded<Activity>();
+        private volatile bool _isProcessingComplete = false;
 
         private bool _disposed = false;
 
@@ -76,7 +76,7 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
                 return new FileExporterInstance(
                     fileExporter,
                     // This listens/polls for activity in the queue and writes 
them to file
-                    Task.Run(async () => await 
ProcessActivitiesAsync(fileExporter, cancellationTokenSource.Token)),
+                    Task.Run(async () => await 
ProcessActivitiesAsync(fileExporter, 
cancellationTokenSource.Token).ConfigureAwait(false)),
                     cancellationTokenSource);
             });
 
@@ -120,28 +120,54 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
             foreach (Activity activity in batch)
             {
                 if (activity == null) continue;
-                _activityQueue.Enqueue(activity);
+                _channel.Writer.TryWrite(activity);
             }
             return ExportResult.Success;
         }
 
+        protected override bool OnForceFlush(int timeoutMilliseconds)
+        {
+            CancellationTokenSource cts = new();
+            cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutMilliseconds));
+            int poolTime = Math.Min(100, timeoutMilliseconds / 10);
+            do
+            {
+                Thread.Sleep(poolTime);
+            } while (!cts.IsCancellationRequested && 
!_channel.Reader.Completion.IsCompleted && _channel.Reader.Count > 0);
+            return !cts.IsCancellationRequested;
+        }
+
         private static async Task ProcessActivitiesAsync(FileExporter 
fileExporter, CancellationToken cancellationToken)
         {
             try
             {
-                TimeSpan delay = TimeSpan.FromMilliseconds(100);
-                // Polls for and then writes any activities in the queue
-                while (!cancellationToken.IsCancellationRequested)
+                using MemoryStream stream = new();
+                await foreach (Activity activity in 
fileExporter._channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
                 {
-                    await Task.Delay(delay, cancellationToken);
-                    await 
fileExporter._tracingFile.WriteLinesAsync(GetActivitiesAsync(fileExporter._activityQueue),
 cancellationToken);
+                    if (cancellationToken.IsCancellationRequested) break;
+
+                    stream.SetLength(0);
+                    SerializableActivity serializableActivity = new(activity);
+                    await JsonSerializer.SerializeAsync(
+                        stream,
+                        serializableActivity, cancellationToken: 
cancellationToken).ConfigureAwait(false);
+                    stream.Write(s_newLine, 0, s_newLine.Length);
+                    stream.Position = 0;
+
+                    await fileExporter._tracingFile.WriteLineAsync(stream, 
cancellationToken).ConfigureAwait(false);
                 }
             }
+            catch (OperationCanceledException ex)
+            {
+                // Expected when cancellationToken is cancelled.
+                Trace.TraceError(ex.ToString());
+            }
             catch (Exception ex)
             {
                 // Since this will be called on an independent thread, we need 
to avoid uncaught exceptions.
-                Debug.WriteLine(ex);
+                Trace.TraceError(ex.ToString());
             }
+            fileExporter._isProcessingComplete = true;
         }
 
         private static bool IsDirectoryWritable(string traceLocation, bool 
throwIfFails = false)
@@ -162,31 +188,12 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
             }
         }
 
-        private static async IAsyncEnumerable<Stream> 
GetActivitiesAsync(ConcurrentQueue<Activity> activityQueue)
-        {
-            using MemoryStream stream = new MemoryStream();
-            while (activityQueue.TryDequeue(out Activity? activity))
-            {
-                stream.SetLength(0);
-                SerializableActivity serializableActivity = new(activity);
-                await JsonSerializer.SerializeAsync(
-                    stream,
-                    serializableActivity,
-                    
SerializableActivityJsonContext.Default.SerializableActivity);
-                stream.Write(s_newLine, 0, s_newLine.Length);
-                stream.Position = 0;
-
-                yield return stream;
-            }
-        }
-
         private FileExporter(string fileBaseName, DirectoryInfo 
tracesDirectory, long maxTraceFileSizeKb, int maxTraceFiles)
         {
             string fullName = tracesDirectory.FullName;
             _fileBaseName = fileBaseName;
             _tracesDirectoryFullName = fullName;
             _tracingFile = new(fileBaseName, fullName, maxTraceFileSizeKb, 
maxTraceFiles);
-            _cancellationTokenSource = new CancellationTokenSource();
         }
 
         internal static string TracingLocationDefault { get; } =
@@ -196,12 +203,13 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
                     ApacheArrowAdbcNamespace,
                     TracesFolderName)).FullName;
 
-        private async Task FlushAsync(CancellationToken cancellationToken = 
default)
+        private void Flush(CancellationToken cancellationToken = default)
         {
             // Ensure existing writes are completed.
-            while (!cancellationToken.IsCancellationRequested && 
!_activityQueue.IsEmpty)
+            _channel.Writer.TryComplete();
+            while (!cancellationToken.IsCancellationRequested && 
!_isProcessingComplete)
             {
-                await Task.Delay(100);
+                Thread.Sleep(100);
             }
         }
 
@@ -212,7 +220,7 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
                 // Allow flush of any existing events.
                 using CancellationTokenSource flushTimeout = new();
                 flushTimeout.CancelAfter(TimeSpan.FromSeconds(5));
-                FlushAsync(flushTimeout.Token).Wait();
+                Flush(flushTimeout.Token);
 
                 // Remove and dispose of single instance of exporter
                 string listenerId = GetListenerId(_fileBaseName, 
_tracesDirectoryFullName);
@@ -221,8 +229,7 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
                 {
                     listener?.Value.Dispose();
                 }
-                _cancellationTokenSource.Cancel();
-                _cancellationTokenSource.Dispose();
+                _tracingFile.Dispose();
                 _disposed = true;
             }
             base.Dispose(disposing);
@@ -249,7 +256,7 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
                     CancellationTokenSource.Cancel();
                     try
                     {
-                        WriteActivitiesTask.Wait();
+                        
WriteActivitiesTask.ConfigureAwait(false).GetAwaiter().GetResult();
                     }
                     catch
                     {
diff --git a/csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs 
b/csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs
index 8595056df..3bb0a4cfc 100644
--- a/csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs
+++ b/csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs
@@ -29,8 +29,9 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
     /// Provides access to writing trace files, limiting the
     /// individual files size and ensuring unique file names.
     /// </summary>
-    internal class TracingFile
+    internal class TracingFile : IDisposable
     {
+        private const int KbInByes = 1024;
         private static readonly string s_defaultTracePath = 
FileExporter.TracingLocationDefault;
         private static readonly Random s_globalRandom = new();
         private static readonly ThreadLocal<Random> s_threadLocalRandom = 
new(NewRandom);
@@ -38,6 +39,7 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
         private readonly string _fileBaseName;
         private readonly DirectoryInfo _tracingDirectory;
         private FileInfo? _currentTraceFileInfo;
+        private FileStream? _currentFileStream;
         private readonly long _maxFileSizeKb;
         private readonly int _maxTraceFiles;
 
@@ -61,35 +63,44 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
         /// <param name="streams">The enumerable of trace lines.</param>
         /// <param name="cancellationToken">The cancellation token.</param>
         /// <returns></returns>
-        internal async Task WriteLinesAsync(IAsyncEnumerable<Stream> streams, 
CancellationToken cancellationToken = default)
+        internal async Task WriteLineAsync(Stream stream, CancellationToken 
cancellationToken = default)
         {
             if (cancellationToken.IsCancellationRequested) return;
 
-            string findSearchPattern = _fileBaseName + 
$"-trace-*-{ProcessId}.log";
-            if (_currentTraceFileInfo == null)
+            try
             {
-                IOrderedEnumerable<FileInfo>? traceFileInfos = await 
GetTracingFilesAsync(_tracingDirectory, findSearchPattern);
-                FileInfo? mostRecentFile = traceFileInfos?.FirstOrDefault();
-                mostRecentFile?.Refresh();
-
-                // Use the latest file, if it is not maxxed-out, or start a 
new tracing file.
-                _currentTraceFileInfo = mostRecentFile != null && 
mostRecentFile.Length < _maxFileSizeKb * 1024
-                    ? mostRecentFile
-                    : new FileInfo(NewFileName());
+                if (_currentTraceFileInfo == null || _currentFileStream == 
null)
+                {
+                    await OpenNewTracingFileAsync().ConfigureAwait(false);
+                }
+                // Write out to the file and retry if IO errors occur.
+                await ActionWithRetryAsync<IOException>(async () =>
+                    await WriteSingleLineAsync(stream).ConfigureAwait(false), 
cancellationToken: cancellationToken).ConfigureAwait(false);
+            }
+            catch (Exception ex)
+            {
+                this._currentTraceFileInfo = null;
+                this._currentFileStream?.Dispose();
+                this._currentFileStream = null;
+                Trace.TraceError(ex.ToString());
             }
 
-            // Write out to the file and retry if IO errors occur.
-            await ActionWithRetryAsync<IOException>(async () => await 
WriteLinesAsync(streams), cancellationToken: cancellationToken);
+        }
 
+        private async Task TryRemoveOlderFiles()
+        {
             // Check if we need to remove old files
             if (_tracingDirectory.Exists)
             {
                 // This will clean-up files for all processes in the same 
directory.
                 string deleteSearchPattern = _fileBaseName + $"-trace-*.log";
-                FileInfo[] tracingFiles = [.. await 
GetTracingFilesAsync(_tracingDirectory, deleteSearchPattern)];
-                if (tracingFiles != null && tracingFiles.Length > 
_maxTraceFiles)
+                IOrderedEnumerable<FileInfo> orderedFiles = await 
GetTracingFilesAsync(_tracingDirectory, 
deleteSearchPattern).ConfigureAwait(false);
+                // Avoid accidentally trying to delete the current file.
+                FileInfo[] tracingFiles = orderedFiles.Where(f => 
!f.FullName.Equals(_currentTraceFileInfo?.FullName))?.ToArray() ?? new 
FileInfo[0];
+                if (tracingFiles.Length >= _maxTraceFiles)
                 {
-                    for (int i = tracingFiles.Length - 1; i >= _maxTraceFiles; 
i--)
+                    int lastIndex = Math.Max(0, _maxTraceFiles - 1);
+                    for (int i = tracingFiles.Length - 1; i >= lastIndex; i--)
                     {
                         FileInfo? file = tracingFiles.ElementAtOrDefault(i);
                         // Note: don't pass the cancellation token, as we want 
this to ALWAYS run at the end.
@@ -97,58 +108,91 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
                         {
                             file?.Delete();
                             return Task.CompletedTask;
-                        });
+                        }).ConfigureAwait(false);
                     }
                 }
             }
         }
 
-        private async Task WriteLinesAsync(IAsyncEnumerable<Stream> streams)
+        private async Task WriteSingleLineAsync(Stream stream)
+        {
+            if ((_currentFileStream!.Length + stream.Length) >= 
(_maxFileSizeKb * KbInByes))
+            {
+                // If tracing file is maxxed-out, start a new tracing file.
+                await OpenNewTracingFileAsync().ConfigureAwait(false);
+            }
+            await stream.CopyToAsync(_currentFileStream).ConfigureAwait(false);
+        }
+
+        private async Task OpenNewTracingFileAsync()
         {
-            bool hasMoreData;
+            _currentFileStream?.Dispose();
+            _currentFileStream = null;
+            const int maxAttempts = 20;
+            int attempts = 0;
+            Exception? lastException;
+
             do
             {
-                bool newFileRequired = false;
-                _currentTraceFileInfo!.Refresh();
-                using (FileStream fileStream = 
_currentTraceFileInfo!.OpenWrite())
+                lastException = null;
+                _currentTraceFileInfo = new FileInfo(NewFileName());
+                try
                 {
-                    fileStream.Position = fileStream.Length;
-                    hasMoreData = false;
-                    await foreach (Stream stream in streams)
+                    attempts++;
+                    if (!_tracingDirectory.Exists)
                     {
-                        if (fileStream.Length >= _maxFileSizeKb * 1024)
-                        {
-                            hasMoreData = true;
-                            newFileRequired = true;
-                            break;
-                        }
-
-                        await stream.CopyToAsync(fileStream);
+                        _tracingDirectory.Create();
                     }
+                    _currentFileStream = 
_currentTraceFileInfo.Open(FileMode.CreateNew, FileAccess.Write, 
FileShare.Read);
+                }
+                catch (IOException ioEx)
+                {
+                    lastException = ioEx;
+                    // If we can't open the file, just set to null.
+                    _currentFileStream = null;
+                    int delayMs = ThreadLocalRandom.Next(5, 10 * attempts);
+                    await Task.Delay(delayMs).ConfigureAwait(false);
                 }
-                await Task.Yield(); // Yield to allow other tasks to run.
-                if (newFileRequired)
+                catch (Exception ex)
                 {
-                    // If tracing file is maxxed-out, start a new tracing file.
-                    _currentTraceFileInfo = new FileInfo(NewFileName());
+                    lastException = ex;
+                    Trace.TraceError(ex.ToString());
                 }
-            } while (hasMoreData);
+            } while (_currentFileStream == null && attempts <= maxAttempts);
+            if (_currentFileStream == null && lastException != null)
+            {
+                _currentTraceFileInfo = null;
+                throw new IOException($"Unable to create a new tracing file 
after {attempts - 1} attempts.", lastException);
+            }
+
+            await TryRemoveOlderFiles().ConfigureAwait(false);
         }
 
         private static async Task<IOrderedEnumerable<FileInfo>> 
GetTracingFilesAsync(DirectoryInfo tracingDirectory, string searchPattern)
         {
-            return await Task.Run(() => tracingDirectory
-                .EnumerateFiles(searchPattern, SearchOption.TopDirectoryOnly)
-                .OrderByDescending(f => f.LastWriteTimeUtc));
+            return await Task.Run(() =>
+            {
+                IEnumerable<FileInfo> unorderedFiles;
+                if (!tracingDirectory.Exists)
+                {
+                    tracingDirectory.Create();
+                    unorderedFiles = [];
+                }
+                else
+                {
+                    unorderedFiles = 
tracingDirectory.EnumerateFiles(searchPattern, SearchOption.TopDirectoryOnly);
+                }
+                return unorderedFiles.OrderByDescending(f => 
f.LastWriteTimeUtc.Ticks);
+            }).ConfigureAwait(false);
         }
 
         private static async Task ActionWithRetryAsync<T>(
             Func<Task> action,
-            int maxRetries = 100,
+            int maxRetries = 10,
             CancellationToken cancellationToken = default) where T : Exception
         {
             int retryCount = 0;
-            int delayTime = ThreadLocalRandom.Next(50, 500); // Introduce a 
small random delay to avoid contention.
+            int delayTime = ThreadLocalRandom.Next(10, 500); // Introduce a 
small random delay to avoid contention.
             TimeSpan pauseTime = TimeSpan.FromMilliseconds(delayTime);
             bool completed = false;
 
@@ -156,7 +200,7 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
             {
                 try
                 {
-                    await action.Invoke();
+                    await action.Invoke().ConfigureAwait(false);
                     completed = true;
                 }
                 catch (T) when (retryCount < (maxRetries - 1))
@@ -164,7 +208,7 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
                     retryCount++;
                     try
                     {
-                        await Task.Delay(pauseTime, cancellationToken);
+                        await Task.Delay(pauseTime, 
cancellationToken).ConfigureAwait(false);
                     }
                     catch (OperationCanceledException)
                     {
@@ -177,7 +221,7 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
 
         private string NewFileName()
         {
-            string dateTimeSortable = 
DateTimeOffset.UtcNow.ToString("yyyy-MM-dd-HH-mm-ss-fff");
+            string dateTimeSortable = 
DateTimeOffset.UtcNow.ToString("yyyy-MM-dd-HH-mm-ss-ffffff");
             return Path.Combine(_tracingDirectory.FullName, 
$"{_fileBaseName}-trace-{dateTimeSortable}-{ProcessId}.log");
         }
 
@@ -207,5 +251,12 @@ namespace 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
             // This ensures that each thread gets a different seed.
             return new Random(seed);
         }
+
+        public void Dispose()
+        {
+            _currentFileStream?.Dispose();
+            _currentFileStream = null;
+            _currentTraceFileInfo = null;
+        }
     }
 }
diff --git 
a/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs 
b/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs
index 16f0c370d..2eaf7dc72 100644
--- a/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs
+++ b/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs
@@ -29,7 +29,7 @@ using 
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter;
 
 namespace Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
 {
-    public class TracingFileExporterTests : IDisposable
+    public class FileExporterTests : IDisposable
     {
         private readonly ITestOutputHelper? _outputHelper;
         private bool _disposed;
@@ -37,7 +37,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
         private readonly ActivitySource _activitySource;
         private static readonly string s_localApplicationDataFolderPath = 
Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData);
 
-        public TracingFileExporterTests(ITestOutputHelper? outputHelper)
+        public FileExporterTests(ITestOutputHelper? outputHelper)
         {
             _outputHelper = outputHelper;
             _activitySourceName = ExportersBuilderTests.NewName();
@@ -104,7 +104,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
         [Fact]
         internal async Task CanSetCustomMaxFileSize()
         {
-            const long maxTraceFileSizeKb = 30;
+            const long maxTraceFileSizeKb = 50;
             const long kilobyte = 1024;
             string customFolderName = ExportersBuilderTests.NewName();
             string traceFolder = 
Path.Combine(s_localApplicationDataFolderPath, customFolderName);
@@ -141,7 +141,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
                 }
                 for (int i = 0; i < files.Length; i++)
                 {
-                    long expectedUpperSizeLimit = (maxTraceFileSizeKb + 
(long)(0.2 * maxTraceFileSizeKb)) * kilobyte;
+                    long expectedUpperSizeLimit = maxTraceFileSizeKb * 2 * 
kilobyte;
                     Assert.True(files[i].Length < expectedUpperSizeLimit, 
summary.ToString());
                 }
                 _outputHelper?.WriteLine($"number of files: {files.Length}");
@@ -208,7 +208,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
                     for (int i = 0; i < 100; i++)
                     {
                         await AddEvent("test");
-                        await Task.Delay(TimeSpan.FromMilliseconds(0.1));
+                        await Task.Delay(TimeSpan.FromMilliseconds(0.11));
                     }
                 }
 
@@ -252,12 +252,26 @@ namespace 
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
             if (Directory.Exists(traceFolder)) Directory.Delete(traceFolder, 
true);
             try
             {
+                // This simulates two drivers/connections in the same process 
trying to write to the same
+                // trace file(s). In fact, because the FileExporter is a 
singleton by the combined key of
+                // the activity source name and the trace file path, both 
providers will register only one listener.
+                TracerProvider provider1 = Sdk.CreateTracerProviderBuilder()
+                    .AddSource(_activitySourceName)
+                    .AddAdbcFileExporter(_activitySourceName, traceFolder)
+                    .Build();
+                TracerProvider provider2 = Sdk.CreateTracerProviderBuilder()
+                    .AddSource(_activitySourceName)
+                    .AddAdbcFileExporter(_activitySourceName, traceFolder)
+                    .Build();
                 var tasks = new Task[]
                 {
-                    Task.Run(async () => await TraceActivities(traceFolder, 
"activity1", writeCount)),
-                    Task.Run(async () => await TraceActivities(traceFolder, 
"activity2", writeCount)),
+                    Task.Run(async () => await TraceActivities(traceFolder, 
"activity1", writeCount, provider1)),
+                    Task.Run(async () => await TraceActivities(traceFolder, 
"activity2", writeCount, provider2)),
                 };
                 await Task.WhenAll(tasks);
+                await Task.Delay(500);
+                provider1.Dispose();
+                provider2.Dispose();
 
                 int activity1Count = 0;
                 int activity2Count = 0;
@@ -278,8 +292,8 @@ namespace 
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
                 // Note, because we don't reference count, one of the 
listeners will likely
                 // close the shared instance before the other is finished.
                 // That can result in some events not being written.
-                Assert.InRange(activity1Count, writeCount * 0.9, writeCount);
-                Assert.InRange(activity2Count, writeCount * 0.9, writeCount);
+                Assert.InRange(activity1Count, writeCount * 0.8, writeCount);
+                Assert.InRange(activity2Count, writeCount * 0.8, writeCount);
             }
             finally
             {
@@ -287,19 +301,14 @@ namespace 
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
             }
         }
 
-        private async Task TraceActivities(string traceFolder, string 
activityName, int writeCount)
+        private async Task TraceActivities(string traceFolder, string 
activityName, int writeCount, TracerProvider provider)
         {
-            using (TracerProvider provider = Sdk.CreateTracerProviderBuilder()
-                .AddSource(_activitySourceName)
-                .AddAdbcFileExporter(_activitySourceName, traceFolder)
-                .Build())
+            for (int i = 0; i < writeCount; i++)
             {
-                for (int i = 0; i < writeCount; i++)
-                {
-                    await StartActivity(activityName);
-                    await Task.Delay(TimeSpan.FromMilliseconds(0.1));
-                }
+                await StartActivity(activityName);
+                await Task.Delay(TimeSpan.FromMilliseconds(0.1));
             }
+            provider.ForceFlush(2000);
         }
 
         private Task AddEvent(string eventName, string activityName = 
nameof(AddEvent))
diff --git 
a/csharp/test/Telemetry/Traces/Exporters/FileExporter/TracingFileTests.cs 
b/csharp/test/Telemetry/Traces/Exporters/FileExporter/TracingFileTests.cs
index 99767756e..3392ea031 100644
--- a/csharp/test/Telemetry/Traces/Exporters/FileExporter/TracingFileTests.cs
+++ b/csharp/test/Telemetry/Traces/Exporters/FileExporter/TracingFileTests.cs
@@ -41,7 +41,7 @@ namespace 
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
         internal async Task TestMultipleConcurrentTracingFiles()
         {
             CancellationTokenSource tokenSource = new 
CancellationTokenSource();
-            int concurrentCount = 5;
+            int concurrentCount = 50;
             Task[] tasks = new Task[concurrentCount];
             int[] lineCounts = new int[concurrentCount];
             string sourceName = ExportersBuilderTests.NewName();
@@ -80,8 +80,11 @@ namespace 
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
         private async Task Run(string sourceName, string traceFolder, 
CancellationToken cancellationToken)
         {
             int instanceNumber = Interlocked.Increment(ref _testInstance) - 1;
-            TracingFile tracingFile = new TracingFile(sourceName, traceFolder);
-            await tracingFile.WriteLinesAsync(GetLinesAsync(instanceNumber, 
100, cancellationToken), cancellationToken);
+            using TracingFile tracingFile = new TracingFile(sourceName, 
traceFolder);
+            await foreach (var stream in GetLinesAsync(instanceNumber, 100, 
cancellationToken))
+            {
+                await tracingFile.WriteLineAsync(stream, cancellationToken);
+            }
         }
 
         private static async IAsyncEnumerable<Stream> GetLinesAsync(int 
instanceNumber, int lineCount, [EnumeratorCancellation] CancellationToken 
cancellationToken = default)

Reply via email to