This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new dc358d50e feat(csharp/src/Telemetry/Traces/Exporters): refactor and
improve performance of file exporter (#3397)
dc358d50e is described below
commit dc358d50eecc93e2f006cb642aefccc5765957da
Author: Bruce Irschick <[email protected]>
AuthorDate: Mon Sep 15 14:14:07 2025 -0700
feat(csharp/src/Telemetry/Traces/Exporters): refactor and improve
performance of file exporter (#3397)
Refactors and (significantly) improves performance of the file exporter.
* Adds use of `System.Threading.Channels` to improve clarity and
shutdown handling.
* Now keeps open the current trace file until full or disposed.
* Only checks for deleting old files when a new file is created.
* Adds a handler for `OnForceFlush` to try to flush messages when
needed.
* Improved shutdown/dispose reliability to flush messages.
Benefits
- orders of magnitude improvement in performance
- huge improvement in concurrent use in the same folder
---
...he.Arrow.Adbc.Telemetry.Traces.Exporters.csproj | 1 +
.../Traces/Exporters/FileExporter/FileExporter.cs | 81 ++++++------
.../Traces/Exporters/FileExporter/TracingFile.cs | 145 ++++++++++++++-------
.../Exporters/FileExporter/FileExporterTests.cs | 47 ++++---
.../Exporters/FileExporter/TracingFileTests.cs | 9 +-
5 files changed, 177 insertions(+), 106 deletions(-)
diff --git
a/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj
b/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj
index b4109883f..4dec891d1 100644
---
a/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj
+++
b/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj
@@ -16,6 +16,7 @@
<ItemGroup>
<PackageReference Include="OpenTelemetry.Exporter.Console"
Version="1.12.0" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol"
Version="1.12.0" />
+ <PackageReference Include="System.Threading.Channels" Version="9.0.8" />
</ItemGroup>
<ItemGroup>
diff --git a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs
b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs
index a034a938a..a1251a8d7 100644
--- a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs
+++ b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs
@@ -17,12 +17,12 @@
using System;
using System.Collections.Concurrent;
-using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading;
+using System.Threading.Channels;
using System.Threading.Tasks;
using OpenTelemetry;
@@ -41,8 +41,8 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
private readonly TracingFile _tracingFile;
private readonly string _fileBaseName;
private readonly string _tracesDirectoryFullName;
- private readonly ConcurrentQueue<Activity> _activityQueue = new();
- private readonly CancellationTokenSource _cancellationTokenSource;
+ private readonly Channel<Activity> _channel =
Channel.CreateUnbounded<Activity>();
+ private volatile bool _isProcessingComplete = false;
private bool _disposed = false;
@@ -76,7 +76,7 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
return new FileExporterInstance(
fileExporter,
// This listens/polls for activity in the queue and writes
them to file
- Task.Run(async () => await
ProcessActivitiesAsync(fileExporter, cancellationTokenSource.Token)),
+ Task.Run(async () => await
ProcessActivitiesAsync(fileExporter,
cancellationTokenSource.Token).ConfigureAwait(false)),
cancellationTokenSource);
});
@@ -120,28 +120,54 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
foreach (Activity activity in batch)
{
if (activity == null) continue;
- _activityQueue.Enqueue(activity);
+ _channel.Writer.TryWrite(activity);
}
return ExportResult.Success;
}
+ protected override bool OnForceFlush(int timeoutMilliseconds)
+ {
+ CancellationTokenSource cts = new();
+ cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutMilliseconds));
+ int poolTime = Math.Min(100, timeoutMilliseconds / 10);
+ do
+ {
+ Thread.Sleep(poolTime);
+ } while (!cts.IsCancellationRequested &&
!_channel.Reader.Completion.IsCompleted && _channel.Reader.Count > 0);
+ return !cts.IsCancellationRequested;
+ }
+
private static async Task ProcessActivitiesAsync(FileExporter
fileExporter, CancellationToken cancellationToken)
{
try
{
- TimeSpan delay = TimeSpan.FromMilliseconds(100);
- // Polls for and then writes any activities in the queue
- while (!cancellationToken.IsCancellationRequested)
+ using MemoryStream stream = new();
+ await foreach (Activity activity in
fileExporter._channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
- await Task.Delay(delay, cancellationToken);
- await
fileExporter._tracingFile.WriteLinesAsync(GetActivitiesAsync(fileExporter._activityQueue),
cancellationToken);
+ if (cancellationToken.IsCancellationRequested) break;
+
+ stream.SetLength(0);
+ SerializableActivity serializableActivity = new(activity);
+ await JsonSerializer.SerializeAsync(
+ stream,
+ serializableActivity, cancellationToken:
cancellationToken).ConfigureAwait(false);
+ stream.Write(s_newLine, 0, s_newLine.Length);
+ stream.Position = 0;
+
+ await fileExporter._tracingFile.WriteLineAsync(stream,
cancellationToken).ConfigureAwait(false);
}
}
+ catch (OperationCanceledException ex)
+ {
+ // Expected when cancellationToken is cancelled.
+ Trace.TraceError(ex.ToString());
+ }
catch (Exception ex)
{
// Since this will be called on an independent thread, we need
to avoid uncaught exceptions.
- Debug.WriteLine(ex);
+ Trace.TraceError(ex.ToString());
}
+ fileExporter._isProcessingComplete = true;
}
private static bool IsDirectoryWritable(string traceLocation, bool
throwIfFails = false)
@@ -162,31 +188,12 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
}
}
- private static async IAsyncEnumerable<Stream>
GetActivitiesAsync(ConcurrentQueue<Activity> activityQueue)
- {
- using MemoryStream stream = new MemoryStream();
- while (activityQueue.TryDequeue(out Activity? activity))
- {
- stream.SetLength(0);
- SerializableActivity serializableActivity = new(activity);
- await JsonSerializer.SerializeAsync(
- stream,
- serializableActivity,
-
SerializableActivityJsonContext.Default.SerializableActivity);
- stream.Write(s_newLine, 0, s_newLine.Length);
- stream.Position = 0;
-
- yield return stream;
- }
- }
-
private FileExporter(string fileBaseName, DirectoryInfo
tracesDirectory, long maxTraceFileSizeKb, int maxTraceFiles)
{
string fullName = tracesDirectory.FullName;
_fileBaseName = fileBaseName;
_tracesDirectoryFullName = fullName;
_tracingFile = new(fileBaseName, fullName, maxTraceFileSizeKb,
maxTraceFiles);
- _cancellationTokenSource = new CancellationTokenSource();
}
internal static string TracingLocationDefault { get; } =
@@ -196,12 +203,13 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
ApacheArrowAdbcNamespace,
TracesFolderName)).FullName;
- private async Task FlushAsync(CancellationToken cancellationToken =
default)
+ private void Flush(CancellationToken cancellationToken = default)
{
// Ensure existing writes are completed.
- while (!cancellationToken.IsCancellationRequested &&
!_activityQueue.IsEmpty)
+ _channel.Writer.TryComplete();
+ while (!cancellationToken.IsCancellationRequested &&
!_isProcessingComplete)
{
- await Task.Delay(100);
+ Thread.Sleep(100);
}
}
@@ -212,7 +220,7 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
// Allow flush of any existing events.
using CancellationTokenSource flushTimeout = new();
flushTimeout.CancelAfter(TimeSpan.FromSeconds(5));
- FlushAsync(flushTimeout.Token).Wait();
+ Flush(flushTimeout.Token);
// Remove and dispose of single instance of exporter
string listenerId = GetListenerId(_fileBaseName,
_tracesDirectoryFullName);
@@ -221,8 +229,7 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
{
listener?.Value.Dispose();
}
- _cancellationTokenSource.Cancel();
- _cancellationTokenSource.Dispose();
+ _tracingFile.Dispose();
_disposed = true;
}
base.Dispose(disposing);
@@ -249,7 +256,7 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
CancellationTokenSource.Cancel();
try
{
- WriteActivitiesTask.Wait();
+
WriteActivitiesTask.ConfigureAwait(false).GetAwaiter().GetResult();
}
catch
{
diff --git a/csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs
b/csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs
index 8595056df..3bb0a4cfc 100644
--- a/csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs
+++ b/csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs
@@ -29,8 +29,9 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
/// Provides access to writing trace files, limiting the
/// individual files size and ensuring unique file names.
/// </summary>
- internal class TracingFile
+ internal class TracingFile : IDisposable
{
+ private const int KbInByes = 1024;
private static readonly string s_defaultTracePath =
FileExporter.TracingLocationDefault;
private static readonly Random s_globalRandom = new();
private static readonly ThreadLocal<Random> s_threadLocalRandom =
new(NewRandom);
@@ -38,6 +39,7 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
private readonly string _fileBaseName;
private readonly DirectoryInfo _tracingDirectory;
private FileInfo? _currentTraceFileInfo;
+ private FileStream? _currentFileStream;
private readonly long _maxFileSizeKb;
private readonly int _maxTraceFiles;
@@ -61,35 +63,44 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
/// <param name="streams">The enumerable of trace lines.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns></returns>
- internal async Task WriteLinesAsync(IAsyncEnumerable<Stream> streams,
CancellationToken cancellationToken = default)
+ internal async Task WriteLineAsync(Stream stream, CancellationToken
cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested) return;
- string findSearchPattern = _fileBaseName +
$"-trace-*-{ProcessId}.log";
- if (_currentTraceFileInfo == null)
+ try
{
- IOrderedEnumerable<FileInfo>? traceFileInfos = await
GetTracingFilesAsync(_tracingDirectory, findSearchPattern);
- FileInfo? mostRecentFile = traceFileInfos?.FirstOrDefault();
- mostRecentFile?.Refresh();
-
- // Use the latest file, if it is not maxxed-out, or start a
new tracing file.
- _currentTraceFileInfo = mostRecentFile != null &&
mostRecentFile.Length < _maxFileSizeKb * 1024
- ? mostRecentFile
- : new FileInfo(NewFileName());
+ if (_currentTraceFileInfo == null || _currentFileStream ==
null)
+ {
+ await OpenNewTracingFileAsync().ConfigureAwait(false);
+ }
+ // Write out to the file and retry if IO errors occur.
+ await ActionWithRetryAsync<IOException>(async () =>
+ await WriteSingleLineAsync(stream).ConfigureAwait(false),
cancellationToken: cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ this._currentTraceFileInfo = null;
+ this._currentFileStream?.Dispose();
+ this._currentFileStream = null;
+ Trace.TraceError(ex.ToString());
}
- // Write out to the file and retry if IO errors occur.
- await ActionWithRetryAsync<IOException>(async () => await
WriteLinesAsync(streams), cancellationToken: cancellationToken);
+ }
+ private async Task TryRemoveOlderFiles()
+ {
// Check if we need to remove old files
if (_tracingDirectory.Exists)
{
// This will clean-up files for all processes in the same
directory.
string deleteSearchPattern = _fileBaseName + $"-trace-*.log";
- FileInfo[] tracingFiles = [.. await
GetTracingFilesAsync(_tracingDirectory, deleteSearchPattern)];
- if (tracingFiles != null && tracingFiles.Length >
_maxTraceFiles)
+ IOrderedEnumerable<FileInfo> orderedFiles = await
GetTracingFilesAsync(_tracingDirectory,
deleteSearchPattern).ConfigureAwait(false);
+ // Avoid accidentally trying to delete the current file.
+ FileInfo[] tracingFiles = orderedFiles.Where(f =>
!f.FullName.Equals(_currentTraceFileInfo?.FullName))?.ToArray() ?? new
FileInfo[0];
+ if (tracingFiles.Length >= _maxTraceFiles)
{
- for (int i = tracingFiles.Length - 1; i >= _maxTraceFiles;
i--)
+ int lastIndex = Math.Max(0, _maxTraceFiles - 1);
+ for (int i = tracingFiles.Length - 1; i >= lastIndex; i--)
{
FileInfo? file = tracingFiles.ElementAtOrDefault(i);
// Note: don't pass the cancellation token, as we want
this to ALWAYS run at the end.
@@ -97,58 +108,91 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
{
file?.Delete();
return Task.CompletedTask;
- });
+ }).ConfigureAwait(false);
}
}
}
}
- private async Task WriteLinesAsync(IAsyncEnumerable<Stream> streams)
+ private async Task WriteSingleLineAsync(Stream stream)
+ {
+ if ((_currentFileStream!.Length + stream.Length) >=
(_maxFileSizeKb * KbInByes))
+ {
+ // If tracing file is maxxed-out, start a new tracing file.
+ await OpenNewTracingFileAsync().ConfigureAwait(false);
+ }
+ await stream.CopyToAsync(_currentFileStream).ConfigureAwait(false);
+ }
+
+ private async Task OpenNewTracingFileAsync()
{
- bool hasMoreData;
+ _currentFileStream?.Dispose();
+ _currentFileStream = null;
+ const int maxAttempts = 20;
+ int attempts = 0;
+ Exception? lastException;
+
do
{
- bool newFileRequired = false;
- _currentTraceFileInfo!.Refresh();
- using (FileStream fileStream =
_currentTraceFileInfo!.OpenWrite())
+ lastException = null;
+ _currentTraceFileInfo = new FileInfo(NewFileName());
+ try
{
- fileStream.Position = fileStream.Length;
- hasMoreData = false;
- await foreach (Stream stream in streams)
+ attempts++;
+ if (!_tracingDirectory.Exists)
{
- if (fileStream.Length >= _maxFileSizeKb * 1024)
- {
- hasMoreData = true;
- newFileRequired = true;
- break;
- }
-
- await stream.CopyToAsync(fileStream);
+ _tracingDirectory.Create();
}
+ _currentFileStream =
_currentTraceFileInfo.Open(FileMode.CreateNew, FileAccess.Write,
FileShare.Read);
+ }
+ catch (IOException ioEx)
+ {
+ lastException = ioEx;
+ // If we can't open the file, just set to null.
+ _currentFileStream = null;
+ int delayMs = ThreadLocalRandom.Next(5, 10 * attempts);
+ await Task.Delay(delayMs).ConfigureAwait(false);
}
- await Task.Yield(); // Yield to allow other tasks to run.
- if (newFileRequired)
+ catch (Exception ex)
{
- // If tracing file is maxxed-out, start a new tracing file.
- _currentTraceFileInfo = new FileInfo(NewFileName());
+ lastException = ex;
+ Trace.TraceError(ex.ToString());
}
- } while (hasMoreData);
+ } while (_currentFileStream == null && attempts <= maxAttempts);
+ if (_currentFileStream == null && lastException != null)
+ {
+ _currentTraceFileInfo = null;
+ throw new IOException($"Unable to create a new tracing file
after {attempts - 1} attempts.", lastException);
+ }
+
+ await TryRemoveOlderFiles().ConfigureAwait(false);
}
private static async Task<IOrderedEnumerable<FileInfo>>
GetTracingFilesAsync(DirectoryInfo tracingDirectory, string searchPattern)
{
- return await Task.Run(() => tracingDirectory
- .EnumerateFiles(searchPattern, SearchOption.TopDirectoryOnly)
- .OrderByDescending(f => f.LastWriteTimeUtc));
+ return await Task.Run(() =>
+ {
+ IEnumerable<FileInfo> unorderedFiles;
+ if (!tracingDirectory.Exists)
+ {
+ tracingDirectory.Create();
+ unorderedFiles = [];
+ }
+ else
+ {
+ unorderedFiles =
tracingDirectory.EnumerateFiles(searchPattern, SearchOption.TopDirectoryOnly);
+ }
+ return unorderedFiles.OrderByDescending(f =>
f.LastWriteTimeUtc.Ticks);
+ }).ConfigureAwait(false);
}
private static async Task ActionWithRetryAsync<T>(
Func<Task> action,
- int maxRetries = 100,
+ int maxRetries = 10,
CancellationToken cancellationToken = default) where T : Exception
{
int retryCount = 0;
- int delayTime = ThreadLocalRandom.Next(50, 500); // Introduce a
small random delay to avoid contention.
+ int delayTime = ThreadLocalRandom.Next(10, 500); // Introduce a
small random delay to avoid contention.
TimeSpan pauseTime = TimeSpan.FromMilliseconds(delayTime);
bool completed = false;
@@ -156,7 +200,7 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
{
try
{
- await action.Invoke();
+ await action.Invoke().ConfigureAwait(false);
completed = true;
}
catch (T) when (retryCount < (maxRetries - 1))
@@ -164,7 +208,7 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
retryCount++;
try
{
- await Task.Delay(pauseTime, cancellationToken);
+ await Task.Delay(pauseTime,
cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -177,7 +221,7 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
private string NewFileName()
{
- string dateTimeSortable =
DateTimeOffset.UtcNow.ToString("yyyy-MM-dd-HH-mm-ss-fff");
+ string dateTimeSortable =
DateTimeOffset.UtcNow.ToString("yyyy-MM-dd-HH-mm-ss-ffffff");
return Path.Combine(_tracingDirectory.FullName,
$"{_fileBaseName}-trace-{dateTimeSortable}-{ProcessId}.log");
}
@@ -207,5 +251,12 @@ namespace
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter
// This ensures that each thread gets a different seed.
return new Random(seed);
}
+
+ public void Dispose()
+ {
+ _currentFileStream?.Dispose();
+ _currentFileStream = null;
+ _currentTraceFileInfo = null;
+ }
}
}
diff --git
a/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs
b/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs
index 16f0c370d..2eaf7dc72 100644
--- a/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs
+++ b/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs
@@ -29,7 +29,7 @@ using
Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter;
namespace Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
{
- public class TracingFileExporterTests : IDisposable
+ public class FileExporterTests : IDisposable
{
private readonly ITestOutputHelper? _outputHelper;
private bool _disposed;
@@ -37,7 +37,7 @@ namespace
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
private readonly ActivitySource _activitySource;
private static readonly string s_localApplicationDataFolderPath =
Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData);
- public TracingFileExporterTests(ITestOutputHelper? outputHelper)
+ public FileExporterTests(ITestOutputHelper? outputHelper)
{
_outputHelper = outputHelper;
_activitySourceName = ExportersBuilderTests.NewName();
@@ -104,7 +104,7 @@ namespace
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
[Fact]
internal async Task CanSetCustomMaxFileSize()
{
- const long maxTraceFileSizeKb = 30;
+ const long maxTraceFileSizeKb = 50;
const long kilobyte = 1024;
string customFolderName = ExportersBuilderTests.NewName();
string traceFolder =
Path.Combine(s_localApplicationDataFolderPath, customFolderName);
@@ -141,7 +141,7 @@ namespace
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
}
for (int i = 0; i < files.Length; i++)
{
- long expectedUpperSizeLimit = (maxTraceFileSizeKb +
(long)(0.2 * maxTraceFileSizeKb)) * kilobyte;
+ long expectedUpperSizeLimit = maxTraceFileSizeKb * 2 *
kilobyte;
Assert.True(files[i].Length < expectedUpperSizeLimit,
summary.ToString());
}
_outputHelper?.WriteLine($"number of files: {files.Length}");
@@ -208,7 +208,7 @@ namespace
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
for (int i = 0; i < 100; i++)
{
await AddEvent("test");
- await Task.Delay(TimeSpan.FromMilliseconds(0.1));
+ await Task.Delay(TimeSpan.FromMilliseconds(0.11));
}
}
@@ -252,12 +252,26 @@ namespace
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
if (Directory.Exists(traceFolder)) Directory.Delete(traceFolder,
true);
try
{
+ // This simulates two drivers/connections in the same process
trying to write to the same
+ // trace file(s). In fact, because the FileExporter is a
singleton by the combined key of
+ // the activity source name and the trace file path, both
providers will register only one listener.
+ TracerProvider provider1 = Sdk.CreateTracerProviderBuilder()
+ .AddSource(_activitySourceName)
+ .AddAdbcFileExporter(_activitySourceName, traceFolder)
+ .Build();
+ TracerProvider provider2 = Sdk.CreateTracerProviderBuilder()
+ .AddSource(_activitySourceName)
+ .AddAdbcFileExporter(_activitySourceName, traceFolder)
+ .Build();
var tasks = new Task[]
{
- Task.Run(async () => await TraceActivities(traceFolder,
"activity1", writeCount)),
- Task.Run(async () => await TraceActivities(traceFolder,
"activity2", writeCount)),
+ Task.Run(async () => await TraceActivities(traceFolder,
"activity1", writeCount, provider1)),
+ Task.Run(async () => await TraceActivities(traceFolder,
"activity2", writeCount, provider2)),
};
await Task.WhenAll(tasks);
+ await Task.Delay(500);
+ provider1.Dispose();
+ provider2.Dispose();
int activity1Count = 0;
int activity2Count = 0;
@@ -278,8 +292,8 @@ namespace
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
// Note, because we don't reference count, one of the
listeners will likely
// close the shared instance before the other is finished.
// That can result in some events not being written.
- Assert.InRange(activity1Count, writeCount * 0.9, writeCount);
- Assert.InRange(activity2Count, writeCount * 0.9, writeCount);
+ Assert.InRange(activity1Count, writeCount * 0.8, writeCount);
+ Assert.InRange(activity2Count, writeCount * 0.8, writeCount);
}
finally
{
@@ -287,19 +301,14 @@ namespace
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
}
}
- private async Task TraceActivities(string traceFolder, string
activityName, int writeCount)
+ private async Task TraceActivities(string traceFolder, string
activityName, int writeCount, TracerProvider provider)
{
- using (TracerProvider provider = Sdk.CreateTracerProviderBuilder()
- .AddSource(_activitySourceName)
- .AddAdbcFileExporter(_activitySourceName, traceFolder)
- .Build())
+ for (int i = 0; i < writeCount; i++)
{
- for (int i = 0; i < writeCount; i++)
- {
- await StartActivity(activityName);
- await Task.Delay(TimeSpan.FromMilliseconds(0.1));
- }
+ await StartActivity(activityName);
+ await Task.Delay(TimeSpan.FromMilliseconds(0.1));
}
+ provider.ForceFlush(2000);
}
private Task AddEvent(string eventName, string activityName =
nameof(AddEvent))
diff --git
a/csharp/test/Telemetry/Traces/Exporters/FileExporter/TracingFileTests.cs
b/csharp/test/Telemetry/Traces/Exporters/FileExporter/TracingFileTests.cs
index 99767756e..3392ea031 100644
--- a/csharp/test/Telemetry/Traces/Exporters/FileExporter/TracingFileTests.cs
+++ b/csharp/test/Telemetry/Traces/Exporters/FileExporter/TracingFileTests.cs
@@ -41,7 +41,7 @@ namespace
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
internal async Task TestMultipleConcurrentTracingFiles()
{
CancellationTokenSource tokenSource = new
CancellationTokenSource();
- int concurrentCount = 5;
+ int concurrentCount = 50;
Task[] tasks = new Task[concurrentCount];
int[] lineCounts = new int[concurrentCount];
string sourceName = ExportersBuilderTests.NewName();
@@ -80,8 +80,11 @@ namespace
Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter
private async Task Run(string sourceName, string traceFolder,
CancellationToken cancellationToken)
{
int instanceNumber = Interlocked.Increment(ref _testInstance) - 1;
- TracingFile tracingFile = new TracingFile(sourceName, traceFolder);
- await tracingFile.WriteLinesAsync(GetLinesAsync(instanceNumber,
100, cancellationToken), cancellationToken);
+ using TracingFile tracingFile = new TracingFile(sourceName,
traceFolder);
+ await foreach (var stream in GetLinesAsync(instanceNumber, 100,
cancellationToken))
+ {
+ await tracingFile.WriteLineAsync(stream, cancellationToken);
+ }
}
private static async IAsyncEnumerable<Stream> GetLinesAsync(int
instanceNumber, int lineCount, [EnumeratorCancellation] CancellationToken
cancellationToken = default)