eric-wang-1990 commented on code in PR #3637:
URL: https://github.com/apache/arrow-adbc/pull/3637#discussion_r2471426230
##########
csharp/src/Drivers/Databricks/Reader/CloudFetch/straggler-mitigation-integration-v2.md:
##########
@@ -0,0 +1,607 @@
+# Straggler Download Mitigation - Integration Guide
+
+## Overview
+
+This document provides integration guidance for straggler download mitigation
in the ADBC CloudFetch system. It focuses on **class contracts, interfaces, and
interaction patterns** rather than implementation details.
+
+**Design Principle:** Minimal changes to existing architecture - integrate
seamlessly with CloudFetchDownloader's existing retry mechanism.
+
+---
+
+## 1. Architecture Overview
+
+### 1.1 Component Diagram
+
+```mermaid
+classDiagram
+ class ICloudFetchDownloader {
+ <<interface>>
+ +StartAsync(CancellationToken) Task
+ +StopAsync() Task
+ +GetNextDownloadedFileAsync(CancellationToken) Task~IDownloadResult~
+ }
+
+ class CloudFetchDownloader {
+ -ITracingStatement _statement
+ -SemaphoreSlim _downloadSemaphore
+ -int _maxRetries
+ -StragglerDownloadDetector _stragglerDetector
+ -ConcurrentDictionary~long,FileDownloadMetrics~ _activeDownloadMetrics
+ -ConcurrentDictionary~long,CancellationTokenSource~ _perFileTokens
+ +StartAsync(CancellationToken) Task
+ -DownloadFileAsync(IDownloadResult, CancellationToken) Task
+ -MonitorForStragglerDownloadsAsync(CancellationToken) Task
+ }
+
+ class FileDownloadMetrics {
+ +long FileOffset
+ +long FileSizeBytes
+ +DateTime DownloadStartTime
+ +DateTime? DownloadEndTime
+ +bool IsDownloadCompleted
+ +bool WasCancelledAsStragler
+ +CalculateThroughputBytesPerSecond() double?
+ +MarkDownloadCompleted() void
+ +MarkCancelledAsStragler() void
+ }
+
+ class StragglerDownloadDetector {
+ -double _stragglerThroughputMultiplier
+ -double _minimumCompletionQuantile
+ -TimeSpan _stragglerDetectionPadding
+ -int _maxStragglersBeforeFallback
+ -int _totalStragglersDetectedInQuery
+ +bool ShouldFallbackToSequentialDownloads
+ +IdentifyStragglerDownloads(IReadOnlyList~FileDownloadMetrics~,
DateTime) IEnumerable~long~
+ +GetTotalStragglersDetectedInQuery() int
+ }
+
+ class IActivityTracer {
+ <<interface>>
+ +ActivityTrace Trace
+ +string? TraceParent
+ }
+
+ ICloudFetchDownloader <|.. CloudFetchDownloader
+ IActivityTracer <|.. CloudFetchDownloader
+ CloudFetchDownloader --> FileDownloadMetrics : tracks
+ CloudFetchDownloader --> StragglerDownloadDetector : uses
+```
+
+### 1.2 Key Integration Points
+
+| Component | Change Type | Description |
+|-----------|-------------|-------------|
+| **DatabricksParameters** | New constants | Add 6 configuration parameters |
+| **CloudFetchDownloader** | Modified | Add straggler tracking and monitoring |
+| **FileDownloadMetrics** | New class | Track per-file download performance |
+| **StragglerDownloadDetector** | New class | Identify stragglers using median
throughput |
+
+---
+
+## 2. Class Contracts
+
+### 2.1 FileDownloadMetrics
+
+**Purpose:** Track timing and throughput for individual file downloads.
+
+**Public Contract:**
+```csharp
+internal class FileDownloadMetrics
+{
+ // Read-only properties
+ public long FileOffset { get; }
+ public long FileSizeBytes { get; }
+ public DateTime DownloadStartTime { get; }
+ public DateTime? DownloadEndTime { get; }
+ public bool IsDownloadCompleted { get; }
+ public bool WasCancelledAsStragler { get; }
+
+ // Constructor
+ public FileDownloadMetrics(long fileOffset, long fileSizeBytes);
+
+ // Methods
+ public double? CalculateThroughputBytesPerSecond();
+ public void MarkDownloadCompleted();
+ public void MarkCancelledAsStragler();
+}
+```
+
+**Behavior:**
+- Captures start time on construction
+- Calculates throughput as `fileSize / elapsedSeconds`
+- Immutable file metadata (offset, size)
+- State transitions: In Progress → Completed OR Cancelled
+
+---
+
+### 2.2 StragglerDownloadDetector
+
+**Purpose:** Encapsulate straggler identification logic.
+
+**Public Contract:**
+```csharp
+internal class StragglerDownloadDetector
+{
+ // Read-only property
+ public bool ShouldFallbackToSequentialDownloads { get; }
+
+ // Constructor
+ public StragglerDownloadDetector(
+ double stragglerThroughputMultiplier,
+ double minimumCompletionQuantile,
+ TimeSpan stragglerDetectionPadding,
+ int maxStragglersBeforeFallback);
+
+ // Core detection method
+ public IEnumerable<long> IdentifyStragglerDownloads(
+ IReadOnlyList<FileDownloadMetrics> allDownloadMetrics,
+ DateTime currentTime);
+
+ // Query metrics
+ public int GetTotalStragglersDetectedInQuery();
+}
+```
+
+**Detection Algorithm:**
+```
+1. Wait for minimumCompletionQuantile (e.g., 60%) to complete
+2. Calculate median throughput from completed downloads
+3. For each active download:
+ - Calculate expected time: (multiplier × fileSize / medianThroughput) +
padding
+ - If elapsed > expected: mark as straggler
+4. Track total stragglers for fallback decision
+```
+
+---
+
+### 2.3 CloudFetchDownloader Modifications
+
+**New Fields:**
+```csharp
+// Straggler mitigation state
+private readonly bool _isStragglerMitigationEnabled;
+private readonly StragglerDownloadDetector? _stragglerDetector;
+private readonly ConcurrentDictionary<long, FileDownloadMetrics>?
_activeDownloadMetrics;
+private readonly ConcurrentDictionary<long, CancellationTokenSource>?
_perFileTokens;
+
+// Background monitoring
+private Task? _stragglerMonitoringTask;
+private CancellationTokenSource? _stragglerMonitoringCts;
+
+// Fallback state
+private volatile bool _hasTriggeredSequentialDownloadFallback;
+```
+
+**Modified Methods:**
+- `StartAsync()` - Start background monitoring task
+- `StopAsync()` - Stop and cleanup monitoring task
+- `DownloadFileAsync()` - Integrate straggler cancellation handling into retry
loop
+
+**New Methods:**
+- `MonitorForStragglerDownloadsAsync()` - Background task checking for
stragglers every 2s
+- `TriggerSequentialDownloadFallback()` - Reduce parallelism to 1
+
+---
+
+## 3. Interaction Flows
+
+### 3.1 Initialization Sequence
+
+```mermaid
+sequenceDiagram
+ participant CM as CloudFetchDownloadManager
+ participant CD as CloudFetchDownloader
+ participant SD as StragglerDownloadDetector
+ participant MT as MonitoringTask
+
+ CM->>CD: new CloudFetchDownloader(...)
+ CD->>CD: Parse straggler config params
+ alt Mitigation Enabled
+ CD->>SD: new StragglerDownloadDetector(...)
+ CD->>CD: Initialize _activeDownloadMetrics
+ CD->>CD: Initialize _perFileTokens
+ end
+
+ CM->>CD: StartAsync()
+ CD->>CD: Start download task
+ alt Mitigation Enabled
+ CD->>MT: Start MonitorForStragglerDownloadsAsync()
+ activate MT
+ MT->>MT: Loop every 2s
+ end
+```
+
+### 3.2 Download with Straggler Detection
+
+```mermaid
+sequenceDiagram
+ participant DT as DownloadTask
+ participant FM as FileDownloadMetrics
+ participant HTTP as HttpClient
+ participant MT as MonitorTask
+ participant SD as StragglerDetector
+ participant CTS as CancellationTokenSource
+
+ DT->>FM: new FileDownloadMetrics(offset, size)
+ DT->>CTS: CreateLinkedTokenSource()
+ DT->>DT: Add to _activeDownloadMetrics
+
+ loop Retry Loop (0 to maxRetries)
+ DT->>HTTP: GetAsync(url, effectiveToken)
+
+ par Background Monitoring
+ MT->>SD: IdentifyStragglerDownloads(metrics, now)
+ SD->>SD: Calculate median throughput
+ SD->>SD: Check if download exceeds threshold
+ alt Is Straggler
+ SD-->>MT: Return straggler offsets
+ MT->>CTS: Cancel(stragglerOffset)
+ end
+ end
+
+ alt Download Succeeds
+ HTTP-->>DT: Success
+ DT->>FM: MarkDownloadCompleted()
+ DT->>DT: Break from retry loop
+ else Straggler Cancelled
+ HTTP-->>DT: OperationCanceledException
+ DT->>FM: MarkCancelledAsStragler()
+ DT->>CTS: Dispose old, create new token
+ DT->>DT: Refresh URL if needed
+ DT->>DT: Apply retry delay
+ DT->>DT: Continue to next retry
+ else Other Error
+ HTTP-->>DT: Exception
+ DT->>DT: Apply retry delay
+ DT->>DT: Continue to next retry
+ end
+ end
+
+ DT->>DT: Remove from _activeDownloadMetrics
+```
+
+### 3.3 Straggler Detection Flow
+
+```mermaid
+flowchart TD
+ A[Monitor Wakes Every 2s] --> B{Active Downloads?}
+ B -->|No| A
+ B -->|Yes| C[Snapshot Active Metrics]
+ C --> D[Count Completed Downloads]
+ D --> E{Completed ≥<br/>Quantile × Total?}
+ E -->|No| A
+ E -->|Yes| F[Calculate Median Throughput]
+ F --> G[For Each Active Download]
+ G --> H[Calculate Elapsed Time]
+ H --> I[Calculate Expected Time]
+ I --> J{Elapsed > Expected<br/>+ Padding?}
+ J -->|Yes| K[Add to Stragglers]
+ J -->|No| L[Next Download]
+ K --> M[Increment Counter]
+ M --> L
+ L --> N{More Downloads?}
+ N -->|Yes| G
+ N -->|No| O[Cancel Straggler Tokens]
+ O --> P{Total ≥ Threshold?}
+ P -->|Yes| Q[Trigger Fallback]
+ P -->|No| A
+ Q --> A
+```
+
+---
+
+## 4. Configuration
+
+### 4.1 DatabricksParameters Additions
+
+```csharp
+public class DatabricksParameters : SparkParameters
+{
+ /// <summary>
+ /// Whether to enable straggler download detection and mitigation for
CloudFetch operations.
+ /// Default value is false if not specified.
+ /// </summary>
+ public const string CloudFetchStragglerMitigationEnabled =
+ "adbc.databricks.cloudfetch.straggler_mitigation_enabled";
+
+ /// <summary>
+ /// Multiplier used to determine straggler threshold based on median
throughput.
+ /// Default value is 1.5 if not specified.
+ /// </summary>
+ public const string CloudFetchStragglerMultiplier =
+ "adbc.databricks.cloudfetch.straggler_multiplier";
+
+ /// <summary>
+ /// Fraction of downloads that must complete before straggler detection
begins.
+ /// Valid range: 0.0 to 1.0. Default value is 0.6 (60%) if not specified.
+ /// </summary>
+ public const string CloudFetchStragglerQuantile =
+ "adbc.databricks.cloudfetch.straggler_quantile";
+
+ /// <summary>
+ /// Extra buffer time in seconds added to the straggler threshold
calculation.
+ /// Default value is 5 seconds if not specified.
+ /// </summary>
+ public const string CloudFetchStragglerPaddingSeconds =
+ "adbc.databricks.cloudfetch.straggler_padding_seconds";
+
+ /// <summary>
+ /// Maximum number of stragglers detected per query before triggering
sequential download fallback.
+ /// Default value is 10 if not specified.
+ /// </summary>
+ public const string CloudFetchMaxStragglersPerQuery =
+ "adbc.databricks.cloudfetch.max_stragglers_per_query";
+
+ /// <summary>
+ /// Whether to automatically fall back to sequential downloads when max
stragglers threshold is exceeded.
+ /// Default value is false if not specified.
+ /// </summary>
+ public const string CloudFetchSynchronousFallbackEnabled =
+ "adbc.databricks.cloudfetch.synchronous_fallback_enabled";
+}
+```
+
+**Default Values:**
+| Parameter | Default | Rationale |
+|-----------|---------|-----------|
+| Mitigation Enabled | `false` | Conservative rollout |
+| Multiplier | `1.5` | Download 50% slower than median |
+| Quantile | `0.6` | 60% completion for stable median |
+| Padding | `5s` | Buffer for small file variance |
+| Max Stragglers | `10` | Fallback if systemic issue |
+| Fallback Enabled | `false` | Sequential mode is last resort |
+
+---
+
+## 5. Observability
+
+### 5.1 Activity Tracing Integration
+
+CloudFetchDownloader implements `IActivityTracer` and uses the extension
method pattern:
+
+**Wrap Methods:**
+```csharp
+await this.TraceActivityAsync(async activity =>
+{
+ // Method implementation
+ activity?.SetTag("key", value);
+}, activityName: "MethodName");
+```
+
+**Add Events:**
+```csharp
+activity?.AddEvent("cloudfetch.straggler_cancelled", [
+ new("offset", offset),
+ new("file_size_mb", sizeMb),
+ new("elapsed_seconds", elapsed)
+]);
+```
+
+### 5.2 Key Events
+
+| Event Name | When Emitted | Key Tags |
+|------------|-------------|----------|
+| `cloudfetch.straggler_check` | When stragglers identified |
`active_downloads`, `completed_downloads`, `stragglers_identified` |
+| `cloudfetch.straggler_cancelling` | Before cancelling straggler | `offset` |
Review Comment:
I think we just need the cancelled tag.
##########
csharp/src/Drivers/Databricks/Reader/CloudFetch/straggler-mitigation-integration-v2.md:
##########
@@ -0,0 +1,607 @@
+# Straggler Download Mitigation - Integration Guide
+
+## Overview
+
+This document provides integration guidance for straggler download mitigation
in the ADBC CloudFetch system. It focuses on **class contracts, interfaces, and
interaction patterns** rather than implementation details.
+
+**Design Principle:** Minimal changes to existing architecture - integrate
seamlessly with CloudFetchDownloader's existing retry mechanism.
+
+---
+
+## 1. Architecture Overview
+
+### 1.1 Component Diagram
+
+```mermaid
+classDiagram
+ class ICloudFetchDownloader {
+ <<interface>>
+ +StartAsync(CancellationToken) Task
+ +StopAsync() Task
+ +GetNextDownloadedFileAsync(CancellationToken) Task~IDownloadResult~
+ }
+
+ class CloudFetchDownloader {
+ -ITracingStatement _statement
+ -SemaphoreSlim _downloadSemaphore
+ -int _maxRetries
+ -StragglerDownloadDetector _stragglerDetector
+ -ConcurrentDictionary~long,FileDownloadMetrics~ _activeDownloadMetrics
+ -ConcurrentDictionary~long,CancellationTokenSource~ _perFileTokens
+ +StartAsync(CancellationToken) Task
+ -DownloadFileAsync(IDownloadResult, CancellationToken) Task
+ -MonitorForStragglerDownloadsAsync(CancellationToken) Task
+ }
+
+ class FileDownloadMetrics {
+ +long FileOffset
+ +long FileSizeBytes
+ +DateTime DownloadStartTime
+ +DateTime? DownloadEndTime
+ +bool IsDownloadCompleted
+ +bool WasCancelledAsStragler
+ +CalculateThroughputBytesPerSecond() double?
+ +MarkDownloadCompleted() void
+ +MarkCancelledAsStragler() void
+ }
+
+ class StragglerDownloadDetector {
+ -double _stragglerThroughputMultiplier
+ -double _minimumCompletionQuantile
+ -TimeSpan _stragglerDetectionPadding
+ -int _maxStragglersBeforeFallback
+ -int _totalStragglersDetectedInQuery
+ +bool ShouldFallbackToSequentialDownloads
+ +IdentifyStragglerDownloads(IReadOnlyList~FileDownloadMetrics~,
DateTime) IEnumerable~long~
+ +GetTotalStragglersDetectedInQuery() int
+ }
+
+ class IActivityTracer {
+ <<interface>>
+ +ActivityTrace Trace
+ +string? TraceParent
+ }
+
+ ICloudFetchDownloader <|.. CloudFetchDownloader
+ IActivityTracer <|.. CloudFetchDownloader
+ CloudFetchDownloader --> FileDownloadMetrics : tracks
+ CloudFetchDownloader --> StragglerDownloadDetector : uses
+```
+
+### 1.2 Key Integration Points
+
+| Component | Change Type | Description |
+|-----------|-------------|-------------|
+| **DatabricksParameters** | New constants | Add 6 configuration parameters |
+| **CloudFetchDownloader** | Modified | Add straggler tracking and monitoring |
+| **FileDownloadMetrics** | New class | Track per-file download performance |
+| **StragglerDownloadDetector** | New class | Identify stragglers using median
throughput |
+
+---
+
+## 2. Class Contracts
+
+### 2.1 FileDownloadMetrics
+
+**Purpose:** Track timing and throughput for individual file downloads.
+
+**Public Contract:**
+```csharp
+internal class FileDownloadMetrics
+{
+ // Read-only properties
+ public long FileOffset { get; }
+ public long FileSizeBytes { get; }
+ public DateTime DownloadStartTime { get; }
+ public DateTime? DownloadEndTime { get; }
+ public bool IsDownloadCompleted { get; }
+ public bool WasCancelledAsStragler { get; }
+
+ // Constructor
+ public FileDownloadMetrics(long fileOffset, long fileSizeBytes);
+
+ // Methods
+ public double? CalculateThroughputBytesPerSecond();
+ public void MarkDownloadCompleted();
+ public void MarkCancelledAsStragler();
+}
+```
+
+**Behavior:**
+- Captures start time on construction
+- Calculates throughput as `fileSize / elapsedSeconds`
+- Immutable file metadata (offset, size)
+- State transitions: In Progress → Completed OR Cancelled
+
+---
+
+### 2.2 StragglerDownloadDetector
+
+**Purpose:** Encapsulate straggler identification logic.
+
+**Public Contract:**
+```csharp
+internal class StragglerDownloadDetector
+{
+ // Read-only property
+ public bool ShouldFallbackToSequentialDownloads { get; }
+
+ // Constructor
+ public StragglerDownloadDetector(
+ double stragglerThroughputMultiplier,
+ double minimumCompletionQuantile,
+ TimeSpan stragglerDetectionPadding,
+ int maxStragglersBeforeFallback);
+
+ // Core detection method
+ public IEnumerable<long> IdentifyStragglerDownloads(
+ IReadOnlyList<FileDownloadMetrics> allDownloadMetrics,
+ DateTime currentTime);
+
+ // Query metrics
+ public int GetTotalStragglersDetectedInQuery();
+}
+```
+
+**Detection Algorithm:**
+```
+1. Wait for minimumCompletionQuantile (e.g., 60%) to complete
+2. Calculate median throughput from completed downloads
+3. For each active download:
+ - Calculate expected time: (multiplier × fileSize / medianThroughput) +
padding
+ - If elapsed > expected: mark as straggler
+4. Track total stragglers for fallback decision
+```
+
+---
+
+### 2.3 CloudFetchDownloader Modifications
+
+**New Fields:**
+```csharp
+// Straggler mitigation state
+private readonly bool _isStragglerMitigationEnabled;
+private readonly StragglerDownloadDetector? _stragglerDetector;
+private readonly ConcurrentDictionary<long, FileDownloadMetrics>?
_activeDownloadMetrics;
+private readonly ConcurrentDictionary<long, CancellationTokenSource>?
_perFileTokens;
+
+// Background monitoring
+private Task? _stragglerMonitoringTask;
+private CancellationTokenSource? _stragglerMonitoringCts;
+
+// Fallback state
+private volatile bool _hasTriggeredSequentialDownloadFallback;
+```
+
+**Modified Methods:**
+- `StartAsync()` - Start background monitoring task
+- `StopAsync()` - Stop and cleanup monitoring task
+- `DownloadFileAsync()` - Integrate straggler cancellation handling into retry
loop
+
+**New Methods:**
+- `MonitorForStragglerDownloadsAsync()` - Background task checking for
stragglers every 2s
+- `TriggerSequentialDownloadFallback()` - Reduce parallelism to 1
+
+---
+
+## 3. Interaction Flows
+
+### 3.1 Initialization Sequence
+
+```mermaid
+sequenceDiagram
+ participant CM as CloudFetchDownloadManager
+ participant CD as CloudFetchDownloader
+ participant SD as StragglerDownloadDetector
+ participant MT as MonitoringTask
+
+ CM->>CD: new CloudFetchDownloader(...)
+ CD->>CD: Parse straggler config params
+ alt Mitigation Enabled
+ CD->>SD: new StragglerDownloadDetector(...)
+ CD->>CD: Initialize _activeDownloadMetrics
+ CD->>CD: Initialize _perFileTokens
+ end
+
+ CM->>CD: StartAsync()
+ CD->>CD: Start download task
+ alt Mitigation Enabled
+ CD->>MT: Start MonitorForStragglerDownloadsAsync()
+ activate MT
+ MT->>MT: Loop every 2s
+ end
+```
+
+### 3.2 Download with Straggler Detection
+
+```mermaid
+sequenceDiagram
+ participant DT as DownloadTask
+ participant FM as FileDownloadMetrics
+ participant HTTP as HttpClient
+ participant MT as MonitorTask
+ participant SD as StragglerDetector
+ participant CTS as CancellationTokenSource
+
+ DT->>FM: new FileDownloadMetrics(offset, size)
+ DT->>CTS: CreateLinkedTokenSource()
+ DT->>DT: Add to _activeDownloadMetrics
+
+ loop Retry Loop (0 to maxRetries)
+ DT->>HTTP: GetAsync(url, effectiveToken)
+
+ par Background Monitoring
+ MT->>SD: IdentifyStragglerDownloads(metrics, now)
+ SD->>SD: Calculate median throughput
+ SD->>SD: Check if download exceeds threshold
+ alt Is Straggler
+ SD-->>MT: Return straggler offsets
+ MT->>CTS: Cancel(stragglerOffset)
+ end
+ end
+
+ alt Download Succeeds
+ HTTP-->>DT: Success
+ DT->>FM: MarkDownloadCompleted()
+ DT->>DT: Break from retry loop
+ else Straggler Cancelled
+ HTTP-->>DT: OperationCanceledException
+ DT->>FM: MarkCancelledAsStragler()
+ DT->>CTS: Dispose old, create new token
+ DT->>DT: Refresh URL if needed
+ DT->>DT: Apply retry delay
+ DT->>DT: Continue to next retry
+ else Other Error
+ HTTP-->>DT: Exception
+ DT->>DT: Apply retry delay
+ DT->>DT: Continue to next retry
+ end
+ end
+
+ DT->>DT: Remove from _activeDownloadMetrics
+```
+
+### 3.3 Straggler Detection Flow
+
+```mermaid
+flowchart TD
+ A[Monitor Wakes Every 2s] --> B{Active Downloads?}
+ B -->|No| A
+ B -->|Yes| C[Snapshot Active Metrics]
+ C --> D[Count Completed Downloads]
+ D --> E{Completed ≥<br/>Quantile × Total?}
+ E -->|No| A
+ E -->|Yes| F[Calculate Median Throughput]
+ F --> G[For Each Active Download]
+ G --> H[Calculate Elapsed Time]
+ H --> I[Calculate Expected Time]
+ I --> J{Elapsed > Expected<br/>+ Padding?}
+ J -->|Yes| K[Add to Stragglers]
+ J -->|No| L[Next Download]
+ K --> M[Increment Counter]
+ M --> L
+ L --> N{More Downloads?}
+ N -->|Yes| G
+ N -->|No| O[Cancel Straggler Tokens]
+ O --> P{Total ≥ Threshold?}
+ P -->|Yes| Q[Trigger Fallback]
+ P -->|No| A
+ Q --> A
+```
+
+---
+
+## 4. Configuration
+
+### 4.1 DatabricksParameters Additions
+
+```csharp
+public class DatabricksParameters : SparkParameters
+{
+ /// <summary>
+ /// Whether to enable straggler download detection and mitigation for
CloudFetch operations.
+ /// Default value is false if not specified.
+ /// </summary>
+ public const string CloudFetchStragglerMitigationEnabled =
+ "adbc.databricks.cloudfetch.straggler_mitigation_enabled";
+
+ /// <summary>
+ /// Multiplier used to determine straggler threshold based on median
throughput.
+ /// Default value is 1.5 if not specified.
+ /// </summary>
+ public const string CloudFetchStragglerMultiplier =
+ "adbc.databricks.cloudfetch.straggler_multiplier";
+
+ /// <summary>
+ /// Fraction of downloads that must complete before straggler detection
begins.
+ /// Valid range: 0.0 to 1.0. Default value is 0.6 (60%) if not specified.
+ /// </summary>
+ public const string CloudFetchStragglerQuantile =
+ "adbc.databricks.cloudfetch.straggler_quantile";
+
+ /// <summary>
+ /// Extra buffer time in seconds added to the straggler threshold
calculation.
+ /// Default value is 5 seconds if not specified.
+ /// </summary>
+ public const string CloudFetchStragglerPaddingSeconds =
+ "adbc.databricks.cloudfetch.straggler_padding_seconds";
+
+ /// <summary>
+ /// Maximum number of stragglers detected per query before triggering
sequential download fallback.
+ /// Default value is 10 if not specified.
+ /// </summary>
+ public const string CloudFetchMaxStragglersPerQuery =
+ "adbc.databricks.cloudfetch.max_stragglers_per_query";
+
+ /// <summary>
+ /// Whether to automatically fall back to sequential downloads when max
stragglers threshold is exceeded.
+ /// Default value is false if not specified.
+ /// </summary>
+ public const string CloudFetchSynchronousFallbackEnabled =
+ "adbc.databricks.cloudfetch.synchronous_fallback_enabled";
+}
+```
+
+**Default Values:**
+| Parameter | Default | Rationale |
+|-----------|---------|-----------|
+| Mitigation Enabled | `false` | Conservative rollout |
+| Multiplier | `1.5` | Download 50% slower than median |
+| Quantile | `0.6` | 60% completion for stable median |
+| Padding | `5s` | Buffer for small file variance |
+| Max Stragglers | `10` | Fallback if systemic issue |
+| Fallback Enabled | `false` | Sequential mode is last resort |
+
+---
+
+## 5. Observability
+
+### 5.1 Activity Tracing Integration
+
+CloudFetchDownloader implements `IActivityTracer` and uses the extension
method pattern:
+
+**Wrap Methods:**
+```csharp
+await this.TraceActivityAsync(async activity =>
+{
+ // Method implementation
+ activity?.SetTag("key", value);
+}, activityName: "MethodName");
+```
+
+**Add Events:**
+```csharp
+activity?.AddEvent("cloudfetch.straggler_cancelled", [
+ new("offset", offset),
+ new("file_size_mb", sizeMb),
+ new("elapsed_seconds", elapsed)
+]);
+```
+
+### 5.2 Key Events
+
+| Event Name | When Emitted | Key Tags |
+|------------|-------------|----------|
+| `cloudfetch.straggler_check` | When stragglers identified |
`active_downloads`, `completed_downloads`, `stragglers_identified` |
+| `cloudfetch.straggler_cancelling` | Before cancelling straggler | `offset` |
+| `cloudfetch.straggler_cancelled` | In download retry loop | `offset`,
`file_size_mb`, `elapsed_seconds`, `attempt` |
+| `cloudfetch.url_refreshed_for_straggler_retry` | URL refreshed for retry |
`offset`, `sanitized_url` |
+| `cloudfetch.sequential_fallback_triggered` | Fallback triggered |
`total_stragglers_in_query`, `fallback_threshold` |
+
+---
+
+## 6. Testing Strategy
+
+### 6.1 Test Structure
+
+Following existing CloudFetch test patterns:
+
+```
+test/Drivers/Databricks/
+├── Unit/CloudFetch/
+│ ├── FileDownloadMetricsTests.cs # Test metrics calculation
+│ ├── StragglerDownloadDetectorTests.cs # Test detection logic
+│ └── CloudFetchDownloaderStragglerTests.cs # Test integration with
downloader
+└── E2E/CloudFetch/
+ └── CloudFetchStragglerE2ETests.cs # End-to-end scenarios
+```
+
+### 6.2 Unit Test Coverage
+
+#### FileDownloadMetricsTests
+
+**Test Cases:**
+- `Constructor_InitializesCorrectly` - Verify properties set correctly
+- `CalculateThroughputBytesPerSecond_ReturnsNull_WhenNotCompleted`
+- `CalculateThroughputBytesPerSecond_ReturnsCorrectValue_WhenCompleted`
+- `MarkDownloadCompleted_SetsEndTime`
+- `MarkCancelledAsStragler_SetsFlag`
+
+**Pattern:**
+```csharp
+[Fact]
+public void
CalculateThroughputBytesPerSecond_ReturnsCorrectValue_WhenCompleted()
+{
+ // Arrange
+ var metrics = new FileDownloadMetrics(offset: 0, fileSizeBytes: 1024 *
1024); // 1MB
+
+ // Act
+ metrics.MarkDownloadCompleted();
+ var throughput = metrics.CalculateThroughputBytesPerSecond();
+
+ // Assert
+ Assert.NotNull(throughput);
+ Assert.True(throughput > 0);
+}
+```
+
+#### StragglerDownloadDetectorTests
Review Comment:
It seems too much details on the testing part, can we show some skeleton
code for the stragger monitor? Like how does it monitor all download threads,
how does it restart, is it on the same straggle thread? I think the purpose for
restarting is we establish a new http connection with the cloud provider, does
it guarantee that?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]