CurtHagenlocher commented on code in PR #3397: URL: https://github.com/apache/arrow-adbc/pull/3397#discussion_r2337905734
########## csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs: ########## @@ -252,12 +252,23 @@ internal async Task CanTraceMultipleConcurrentWriters() if (Directory.Exists(traceFolder)) Directory.Delete(traceFolder, true); try { + TracerProvider provider1 = Sdk.CreateTracerProviderBuilder() + .AddSource(_activitySourceName) + .AddAdbcFileExporter(_activitySourceName, traceFolder) + .Build(); + TracerProvider provider2 = Sdk.CreateTracerProviderBuilder() + .AddSource(_activitySourceName) + .AddAdbcFileExporter(_activitySourceName, traceFolder) + .Build(); var tasks = new Task[] { - Task.Run(async () => await TraceActivities(traceFolder, "activity1", writeCount)), - Task.Run(async () => await TraceActivities(traceFolder, "activity2", writeCount)), + Task.Run(async () => await TraceActivities(traceFolder, "activity1", writeCount, provider1)), + Task.Run(async () => await TraceActivities(traceFolder, "activity2", writeCount, provider1)), Review Comment: ```suggestion Task.Run(async () => await TraceActivities(traceFolder, "activity2", writeCount, provider2)), ``` ########## csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs: ########## @@ -120,28 +120,54 @@ public override ExportResult Export(in Batch<Activity> batch) foreach (Activity activity in batch) { if (activity == null) continue; - _activityQueue.Enqueue(activity); + _ = _channel.Writer.TryWrite(activity); } return ExportResult.Success; } + protected override bool OnForceFlush(int timeoutMilliseconds) + { + CancellationTokenSource cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutMilliseconds)); + Task.Delay(100).ConfigureAwait(false).GetAwaiter().GetResult(); Review Comment: If these calls are always blocking for the full duration, isn't it much cleaner to just use `Thread.Sleep`? ########## csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs: ########## @@ -103,52 +113,85 @@ await ActionWithRetryAsync<IOException>(() => } } - private async Task WriteLinesAsync(IAsyncEnumerable<Stream> streams) + private async Task WriteSingleLineAsync(Stream stream) { - bool hasMoreData; + if ((_currentFileStream!.Length + stream.Length) >= (_maxFileSizeKb * KbInByes)) + { + // If tracing file is maxxed-out, start a new tracing file. + await OpenNewTracingFileAsync(); + } + _currentFileStream!.Position = _currentFileStream.Length; + await stream.CopyToAsync(_currentFileStream); + } + + private async Task OpenNewTracingFileAsync() + { + _currentFileStream?.Dispose(); + _currentFileStream = null; + const int maxAttempts = 20; + int attempts = 0; + Exception? lastException; + do { - bool newFileRequired = false; - _currentTraceFileInfo!.Refresh(); - using (FileStream fileStream = _currentTraceFileInfo!.OpenWrite()) + lastException = null; + _currentTraceFileInfo = new FileInfo(NewFileName()); + try { - fileStream.Position = fileStream.Length; - hasMoreData = false; - await foreach (Stream stream in streams) + attempts++; + if (!_tracingDirectory.Exists) { - if (fileStream.Length >= _maxFileSizeKb * 1024) - { - hasMoreData = true; - newFileRequired = true; - break; - } - - await stream.CopyToAsync(fileStream); + _tracingDirectory.Create(); } + _currentFileStream = _currentTraceFileInfo.Open(FileMode.CreateNew, FileAccess.Write, FileShare.Read); + } + catch (IOException ioEx) + when ((uint)ioEx.HResult == 0x80070020 // ERROR_SHARING_VIOLATION + || ((uint)ioEx.HResult == 0x80070050)) // ERROR_ALREADY_EXISTS) + { + lastException = ioEx; + // If we can't open the file, just set to null. + _currentFileStream = null; + int delayMs = ThreadLocalRandom.Next(5, 10 * attempts); + await Task.Delay(delayMs).ConfigureAwait(false); } - await Task.Yield(); // Yield to allow other tasks to run. - if (newFileRequired) + catch (Exception ex) { - // If tracing file is maxxed-out, start a new tracing file. - _currentTraceFileInfo = new FileInfo(NewFileName()); + lastException = ex; + Console.WriteLine(ex.Message); } - } while (hasMoreData); + } while (_currentFileStream == null && attempts <= maxAttempts); + if (_currentFileStream == null && lastException != null) + { + _currentTraceFileInfo = null; + throw new IOException($"Unable to create a new tracing file after {attempts - 1} attempts.", lastException); + } + + await TryRemoveOlderFiles(); } private static async Task<IOrderedEnumerable<FileInfo>> GetTracingFilesAsync(DirectoryInfo tracingDirectory, string searchPattern) { - return await Task.Run(() => tracingDirectory - .EnumerateFiles(searchPattern, SearchOption.TopDirectoryOnly) - .OrderByDescending(f => f.LastWriteTimeUtc)); + return await Task.Run(() => + { + if (!tracingDirectory.Exists) + { + tracingDirectory.Create(); + return Enumerable.Empty<FileInfo>().OrderByDescending(f => f.LastWriteTimeUtc); + } + return tracingDirectory + .EnumerateFiles(searchPattern, SearchOption.TopDirectoryOnly) + .OrderByDescending(f => f.LastWriteTimeUtc.Ticks); Review Comment: I would expect the "order by" to be identical between the two code paths. ########## csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs: ########## @@ -120,28 +120,54 @@ public override ExportResult Export(in Batch<Activity> batch) foreach (Activity activity in batch) { if (activity == null) continue; - _activityQueue.Enqueue(activity); + _ = _channel.Writer.TryWrite(activity); } return ExportResult.Success; } + protected override bool OnForceFlush(int timeoutMilliseconds) + { + CancellationTokenSource cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutMilliseconds)); + Task.Delay(100).ConfigureAwait(false).GetAwaiter().GetResult(); + while (!cts.IsCancellationRequested && !_channel.Reader.Completion.IsCompleted && _channel.Reader.Count > 0) + { + Task.Delay(100).ConfigureAwait(false).GetAwaiter().GetResult(); + } + return true; + } + private static async Task ProcessActivitiesAsync(FileExporter fileExporter, CancellationToken cancellationToken) { try { - TimeSpan delay = TimeSpan.FromMilliseconds(100); - // Polls for and then writes any activities in the queue - while (!cancellationToken.IsCancellationRequested) + using MemoryStream stream = new(); + await foreach (Activity activity in fileExporter._channel.Reader.ReadAllAsync(cancellationToken)) { - await Task.Delay(delay, cancellationToken); - await fileExporter._tracingFile.WriteLinesAsync(GetActivitiesAsync(fileExporter._activityQueue), cancellationToken); + if (cancellationToken.IsCancellationRequested) break; + + stream.SetLength(0); + SerializableActivity serializableActivity = new(activity); + await JsonSerializer.SerializeAsync( + stream, + serializableActivity); + stream.Write(s_newLine, 0, s_newLine.Length); + stream.Position = 0; + + await fileExporter._tracingFile.WriteLineAsync(stream, cancellationToken); } } + catch (OperationCanceledException ex) + { + // Expected when cancellationToken is cancelled. + Console.WriteLine(ex); Review Comment: (applies to multiple places in this PR) ########## csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs: ########## @@ -103,52 +113,85 @@ await ActionWithRetryAsync<IOException>(() => } } - private async Task WriteLinesAsync(IAsyncEnumerable<Stream> streams) + private async Task WriteSingleLineAsync(Stream stream) { - bool hasMoreData; + if ((_currentFileStream!.Length + stream.Length) >= (_maxFileSizeKb * KbInByes)) + { + // If tracing file is maxxed-out, start a new tracing file. + await OpenNewTracingFileAsync(); + } + _currentFileStream!.Position = _currentFileStream.Length; Review Comment: Aren't we taking steps to ensure that we're the only writer of this file? ########## csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs: ########## @@ -120,28 +120,54 @@ public override ExportResult Export(in Batch<Activity> batch) foreach (Activity activity in batch) { if (activity == null) continue; - _activityQueue.Enqueue(activity); + _ = _channel.Writer.TryWrite(activity); Review Comment: ```suggestion _channel.Writer.TryWrite(activity); ``` ########## csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs: ########## @@ -120,28 +120,54 @@ public override ExportResult Export(in Batch<Activity> batch) foreach (Activity activity in batch) { if (activity == null) continue; - _activityQueue.Enqueue(activity); + _ = _channel.Writer.TryWrite(activity); } return ExportResult.Success; } + protected override bool OnForceFlush(int timeoutMilliseconds) + { + CancellationTokenSource cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutMilliseconds)); + Task.Delay(100).ConfigureAwait(false).GetAwaiter().GetResult(); + while (!cts.IsCancellationRequested && !_channel.Reader.Completion.IsCompleted && _channel.Reader.Count > 0) + { + Task.Delay(100).ConfigureAwait(false).GetAwaiter().GetResult(); + } + return true; + } + private static async Task ProcessActivitiesAsync(FileExporter fileExporter, CancellationToken cancellationToken) { try { - TimeSpan delay = TimeSpan.FromMilliseconds(100); - // Polls for and then writes any activities in the queue - while (!cancellationToken.IsCancellationRequested) + using MemoryStream stream = new(); + await foreach (Activity activity in fileExporter._channel.Reader.ReadAllAsync(cancellationToken)) { - await Task.Delay(delay, cancellationToken); - await fileExporter._tracingFile.WriteLinesAsync(GetActivitiesAsync(fileExporter._activityQueue), cancellationToken); + if (cancellationToken.IsCancellationRequested) break; + + stream.SetLength(0); + SerializableActivity serializableActivity = new(activity); + await JsonSerializer.SerializeAsync( + stream, + serializableActivity); + stream.Write(s_newLine, 0, s_newLine.Length); + stream.Position = 0; + + await fileExporter._tracingFile.WriteLineAsync(stream, cancellationToken); } } + catch (OperationCanceledException ex) + { + // Expected when cancellationToken is cancelled. + Console.WriteLine(ex); Review Comment: It seems very bad form to write to the console in a library that gets loaded into arbitrary applications. I don't have a suggestion for an alternative, but I don't think we should do this. ########## csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs: ########## @@ -103,52 +113,85 @@ await ActionWithRetryAsync<IOException>(() => } } - private async Task WriteLinesAsync(IAsyncEnumerable<Stream> streams) + private async Task WriteSingleLineAsync(Stream stream) { - bool hasMoreData; + if ((_currentFileStream!.Length + stream.Length) >= (_maxFileSizeKb * KbInByes)) + { + // If tracing file is maxxed-out, start a new tracing file. + await OpenNewTracingFileAsync(); + } + _currentFileStream!.Position = _currentFileStream.Length; + await stream.CopyToAsync(_currentFileStream); + } + + private async Task OpenNewTracingFileAsync() + { + _currentFileStream?.Dispose(); + _currentFileStream = null; + const int maxAttempts = 20; + int attempts = 0; + Exception? lastException; + do { - bool newFileRequired = false; - _currentTraceFileInfo!.Refresh(); - using (FileStream fileStream = _currentTraceFileInfo!.OpenWrite()) + lastException = null; + _currentTraceFileInfo = new FileInfo(NewFileName()); + try { - fileStream.Position = fileStream.Length; - hasMoreData = false; - await foreach (Stream stream in streams) + attempts++; + if (!_tracingDirectory.Exists) { - if (fileStream.Length >= _maxFileSizeKb * 1024) - { - hasMoreData = true; - newFileRequired = true; - break; - } - - await stream.CopyToAsync(fileStream); + _tracingDirectory.Create(); } + _currentFileStream = _currentTraceFileInfo.Open(FileMode.CreateNew, FileAccess.Write, FileShare.Read); + } + catch (IOException ioEx) + when ((uint)ioEx.HResult == 0x80070020 // ERROR_SHARING_VIOLATION + || ((uint)ioEx.HResult == 0x80070050)) // ERROR_ALREADY_EXISTS) Review Comment: These seem Windows-specific. How would this work on other platforms? ########## csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs: ########## @@ -103,52 +113,85 @@ await ActionWithRetryAsync<IOException>(() => } } - private async Task WriteLinesAsync(IAsyncEnumerable<Stream> streams) + private async Task WriteSingleLineAsync(Stream stream) { - bool hasMoreData; + if ((_currentFileStream!.Length + stream.Length) >= (_maxFileSizeKb * KbInByes)) + { + // If tracing file is maxxed-out, start a new tracing file. + await OpenNewTracingFileAsync(); + } + _currentFileStream!.Position = _currentFileStream.Length; + await stream.CopyToAsync(_currentFileStream); + } + + private async Task OpenNewTracingFileAsync() + { + _currentFileStream?.Dispose(); + _currentFileStream = null; + const int maxAttempts = 20; + int attempts = 0; + Exception? lastException; + do { - bool newFileRequired = false; - _currentTraceFileInfo!.Refresh(); - using (FileStream fileStream = _currentTraceFileInfo!.OpenWrite()) + lastException = null; + _currentTraceFileInfo = new FileInfo(NewFileName()); + try { - fileStream.Position = fileStream.Length; - hasMoreData = false; - await foreach (Stream stream in streams) + attempts++; + if (!_tracingDirectory.Exists) { - if (fileStream.Length >= _maxFileSizeKb * 1024) - { - hasMoreData = true; - newFileRequired = true; - break; - } - - await stream.CopyToAsync(fileStream); + _tracingDirectory.Create(); } + _currentFileStream = _currentTraceFileInfo.Open(FileMode.CreateNew, FileAccess.Write, FileShare.Read); + } + catch (IOException ioEx) + when ((uint)ioEx.HResult == 0x80070020 // ERROR_SHARING_VIOLATION + || ((uint)ioEx.HResult == 0x80070050)) // ERROR_ALREADY_EXISTS) + { + lastException = ioEx; + // If we can't open the file, just set to null. + _currentFileStream = null; + int delayMs = ThreadLocalRandom.Next(5, 10 * attempts); + await Task.Delay(delayMs).ConfigureAwait(false); Review Comment: What's the logic by which some of the `await`s in this file have `ConfigureAwait` and some don't? ########## csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs: ########## @@ -252,12 +252,23 @@ internal async Task CanTraceMultipleConcurrentWriters() if (Directory.Exists(traceFolder)) Directory.Delete(traceFolder, true); try { + TracerProvider provider1 = Sdk.CreateTracerProviderBuilder() + .AddSource(_activitySourceName) + .AddAdbcFileExporter(_activitySourceName, traceFolder) + .Build(); + TracerProvider provider2 = Sdk.CreateTracerProviderBuilder() + .AddSource(_activitySourceName) + .AddAdbcFileExporter(_activitySourceName, traceFolder) + .Build(); var tasks = new Task[] { - Task.Run(async () => await TraceActivities(traceFolder, "activity1", writeCount)), - Task.Run(async () => await TraceActivities(traceFolder, "activity2", writeCount)), + Task.Run(async () => await TraceActivities(traceFolder, "activity1", writeCount, provider1)), + Task.Run(async () => await TraceActivities(traceFolder, "activity2", writeCount, provider1)), Review Comment: I'm assuming, though it's not clear from the context why two providers are needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org