eric-wang-1990 commented on code in PR #3624:
URL: https://github.com/apache/arrow-adbc/pull/3624#discussion_r2466848333
##########
csharp/src/Drivers/Databricks/Telemetry/prompts.txt:
##########
@@ -0,0 +1,11 @@
+1. "can you understand the content present in this google doc:
{telemetry-design-doc-url}"
+
+2. "can you use google mcp"
Review Comment:
This is internal google doc mcp right?
##########
csharp/src/Drivers/Databricks/Telemetry/telemetry-lld-summary.md:
##########
@@ -0,0 +1,280 @@
+****# Analysis: File Locations in Telemetry LLD
+
+Based on my analysis of the design document, **ALL changes are contained
within the Databricks driver folder**
(`/Users/sreekanth.vadigi/Desktop/projects/arrow-adbc/csharp/src/Drivers/Databricks`).
Here's the complete breakdown:
+
+## ✅ New Files to Create (All in Databricks folder)
+
+### 1. Telemetry Core Components
+
+```
+/Databricks/Telemetry/
+├── TelemetryCollector.cs (New - event aggregation)
+├── TelemetryExporter.cs (New - HTTP export)
+├── ITelemetryExporter.cs (New - interface)
+├── CircuitBreaker.cs (New - resilience)
+├── TelemetryConfiguration.cs (New - config)
+└── Models/
+ ├── TelemetryEvent.cs (New - event model)
+ ├── TelemetryRequest.cs (New - request payload)
+ ├── TelemetryResponse.cs (New - response payload)
+ ├── TelemetryFrontendLog.cs (New - log wrapper)
+ ├── FrontendLogContext.cs (New - context)
+ ├── FrontendLogEntry.cs (New - entry)
+ ├── SqlDriverLog.cs (New - driver log)
+ ├── DriverConfiguration.cs (New - config snapshot)
+ ├── SqlOperationData.cs (New - SQL metrics)
+ ├── ChunkDownloadData.cs (New - chunk metrics)
+ ├── DriverErrorInfo.cs (New - error info)
+ ├── TelemetryClientContext.cs (New - client context)
+ └── StatementTelemetryData.cs (New - aggregated data)
+```
+
+### 2. Test Files
+
+```
+/Databricks.Tests/Telemetry/
+├── TelemetryCollectorTests.cs (New - unit tests)
+├── TelemetryExporterTests.cs (New - unit tests)
+├── CircuitBreakerTests.cs (New - unit tests)
+├── TelemetryIntegrationTests.cs (New - integration tests)
+├── TelemetryPerformanceTests.cs (New - perf tests)
+└── MockTelemetryEndpointTests.cs (New - mock tests)
+```
+
+## ✅ Existing Files to Modify (All in Databricks folder)
+
+### 1. DatabricksParameters.cs
+
+**Location:** `/Databricks/DatabricksParameters.cs`
+**Changes:** Add telemetry configuration constants
+
+```csharp
+public const string TelemetryEnabled = "adbc.databricks.telemetry.enabled";
+public const string TelemetryBatchSize =
"adbc.databricks.telemetry.batch_size";
+public const string TelemetryFlushIntervalMs =
"adbc.databricks.telemetry.flush_interval_ms";
+// ... 7 more parameters
+```
+
+### 2. DatabricksConnection.cs
+
+**Location:** `/Databricks/DatabricksConnection.cs`
+**Changes:**
+- Add TelemetryCollector field
+- Initialize telemetry in `OpenAsync()`
+- Record connection configuration
+- Flush telemetry in `Dispose()`
+- Check server-side feature flag in `ApplyServerSidePropertiesAsync()`
+
+```csharp
+private TelemetryCollector? _telemetryCollector;
+private TelemetryConfiguration? _telemetryConfig;
+
+public override async Task OpenAsync(CancellationToken cancellationToken =
default)
+{
+ // ... existing code ...
+ InitializeTelemetry();
+ _telemetryCollector?.RecordConnectionOpen(latency, driverConfig);
+}
+
+public override void Dispose()
+{
+ _telemetryCollector?.FlushAllPendingAsync().Wait();
+ _telemetryCollector?.Dispose();
+ base.Dispose();
+}
+```
+
+### 3. DatabricksStatement.cs
+
+**Location:** `/Databricks/DatabricksStatement.cs`
+**Changes:**
+- Record statement execution metrics
+- Track result format
+- Mark statement complete on dispose
+
+```csharp
+protected override async Task<QueryResult> ExecuteQueryAsync(...)
+{
+ var sw = Stopwatch.StartNew();
+ // ... execute ...
+ Connection.TelemetryCollector?.RecordStatementExecute(
+ statementId, sw.Elapsed, resultFormat);
+}
+
+public override void Dispose()
+{
+ Connection.TelemetryCollector?.RecordStatementComplete(_statementId);
+ base.Dispose();
+}
+```
+
+### 4. CloudFetchDownloader.cs
+
+**Location:** `/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs`
+**Changes:**
+- Record chunk download latency
+- Track retry attempts
+- Report download errors
+
+```csharp
+private async Task DownloadFileAsync(IDownloadResult downloadResult, ...)
+{
+ var sw = Stopwatch.StartNew();
Review Comment:
Why do we need a separate stopwatch? The existing cloudfetch telemetry
already capture this using ActivityTrace.
##########
csharp/src/Drivers/Databricks/Telemetry/telemetry-integration-lld-design.md:
##########
@@ -0,0 +1,3565 @@
+# Databricks ADBC Driver: Client Telemetry Integration
+
+## Executive Summary
+
+This document outlines the design for integrating client-side telemetry into
the Databricks ADBC driver for C#. The telemetry system will collect
operational metrics, performance data, and error information from the driver to
enable proactive monitoring, usage analytics, and faster issue resolution.
+
+**Key Objectives:**
+- Enable comprehensive observability of driver operations without impacting
performance
+- Collect usage insights (CloudFetch vs inline, driver configurations, error
patterns)
+- Track adoption of new features and configurations
+- Provide proactive error monitoring to identify issues before customer reports
+- Maintain compatibility with existing OpenTelemetry/Activity-based tracing
+
+**Design Principles:**
+- **Non-blocking**: Telemetry operations must never block driver functionality
+- **Privacy-first**: No PII or query data collected; schema curated for data
residency compliance
+- **Opt-out capable**: Users can disable telemetry via configuration
+- **Server-controlled**: Feature flag support for server-side enable/disable
+- **Backward compatible**: No breaking changes to existing driver API
+- **OpenTelemetry aligned**: Leverage existing Activity infrastructure where
possible
+
+---
+
+## Table of Contents
+
+1. [Background & Motivation](#1-background--motivation)
+2. [Requirements](#2-requirements)
+3. [Architecture Overview](#3-architecture-overview)
+4. [Telemetry Components](#4-telemetry-components)
+5. [Data Schema](#5-data-schema)
+6. [Collection Points](#6-collection-points)
+7. [Export Mechanism](#7-export-mechanism)
+8. [Configuration](#8-configuration)
+9. [Privacy & Data Residency](#9-privacy--data-residency)
+10. [Error Handling](#10-error-handling)
+11. [Testing Strategy](#11-testing-strategy)
+12. [Migration & Rollout](#12-migration--rollout)
+13. [Alternatives Considered](#13-alternatives-considered)
+14. [Open Questions](#14-open-questions)
+15. [References](#15-references)
+
+---
+
+## 1. Background & Motivation
+
+### 1.1 Current State
+
+The Databricks ADBC driver currently implements:
+- **Activity-based tracing** via `ActivityTrace` and `ActivitySource`
+- **W3C Trace Context propagation** for distributed tracing
+- **Local file exporter** for debugging traces
+
+However, this approach has limitations:
+- **No centralized aggregation**: Traces are local-only unless connected to
external APM
+- **Limited usage insights**: No visibility into driver configuration patterns
+- **Reactive debugging**: Relies on customer-reported issues with trace files
+- **No feature adoption metrics**: Cannot track usage of CloudFetch, Direct
Results, etc.
+
+### 1.2 JDBC Driver Precedent
+
+The Databricks JDBC driver successfully implemented client telemetry with:
+- **Comprehensive metrics**: Operation latency, chunk downloads, error rates
+- **Configuration tracking**: Driver settings, auth types, proxy usage
+- **Server-side control**: Feature flag to enable/disable telemetry
+- **Centralized storage**: Data flows to
`main.eng_lumberjack.prod_frontend_log_sql_driver_log`
+- **Privacy compliance**: No PII, curated schema, Lumberjack data residency
+
+### 1.3 Key Gaps to Address
+
+1. **Proactive Monitoring**: Identify errors before customer escalation
+2. **Usage Analytics**: Understand driver configuration patterns across
customer base
+3. **Feature Adoption**: Track uptake of CloudFetch, Direct Results, OAuth
flows
+4. **Performance Insights**: Client-side latency vs server-side metrics
+5. **Error Patterns**: Common configuration mistakes, auth failures, network
issues
+
+---
+
+## 2. Requirements
+
+### 2.1 Functional Requirements
+
+| ID | Requirement | Priority |
+|:---|:---|:---:|
+| FR-1 | Collect driver configuration metadata (auth type, CloudFetch
settings, etc.) | P0 |
+| FR-2 | Track operation latency (connection open, statement execution, result
fetching) | P0 |
+| FR-3 | Record error events with error codes and context | P0 |
+| FR-4 | Capture CloudFetch metrics (chunk downloads, retries, compression
status) | P0 |
+| FR-5 | Track result format usage (inline vs CloudFetch) | P1 |
+| FR-6 | Support server-side feature flag to enable/disable telemetry | P0 |
+| FR-7 | Provide client-side opt-out mechanism | P1 |
+| FR-8 | Batch telemetry events to reduce network overhead | P0 |
+| FR-9 | Export telemetry to Databricks telemetry service | P0 |
+| FR-10 | Support both authenticated and unauthenticated telemetry endpoints |
P0 |
+
+### 2.2 Non-Functional Requirements
+
+| ID | Requirement | Target | Priority |
+|:---|:---|:---:|:---:|
+| NFR-1 | Telemetry overhead < 1% of operation latency | < 1% | P0 |
+| NFR-2 | Memory overhead < 10MB per connection | < 10MB | P0 |
+| NFR-3 | Zero impact on driver operation if telemetry fails | 0 failures | P0
|
+| NFR-4 | Telemetry export success rate | > 95% | P1 |
+| NFR-5 | Batch flush latency | < 5s | P1 |
+| NFR-6 | Support workspace-level disable | 100% | P0 |
+| NFR-7 | No PII or query data collected | 0 PII | P0 |
+| NFR-8 | Compatible with existing Activity tracing | 100% | P0 |
+
+### 2.3 Out of Scope
+
+- Distributed tracing (already covered by Activity/OpenTelemetry)
+- Query result data collection
+- Real-time alerting (server-side responsibility)
+- Custom telemetry endpoints (only Databricks service)
+
+---
+
+## 3. Architecture Overview
+
+### 3.1 High-Level Design
+
+```
+┌─────────────────────────────────────────────────────────────────┐
+│ ADBC Driver Operations │
+│ (Connection, Statement Execution, Result Fetching) │
+└─────────────────────────────────────────────────────────────────┘
+ │
+ │ Emit Events
+ ▼
+┌─────────────────────────────────────────────────────────────────┐
+│ TelemetryCollector │
+│ - Per-connection singleton │
+│ - Aggregates events by statement ID │
+│ - Non-blocking event ingestion │
+└─────────────────────────────────────────────────────────────────┘
+ │
+ │ Batch Events
+ ▼
+┌─────────────────────────────────────────────────────────────────┐
+│ TelemetryExporter │
+│ - Background export worker │
+│ - Periodic flush (configurable interval) │
+│ - Size-based flush (batch threshold) │
+│ - Connection close flush │
+└─────────────────────────────────────────────────────────────────┘
+ │
+ │ HTTP POST
+ ▼
+┌─────────────────────────────────────────────────────────────────┐
+│ Databricks Telemetry Service │
+│ Endpoints: │
+│ - /telemetry-ext (authenticated) │
+│ - /telemetry-unauth (unauthenticated - connection errors) │
+└─────────────────────────────────────────────────────────────────┘
+ │
+ ▼
+┌─────────────────────────────────────────────────────────────────┐
+│ Lumberjack Pipeline │
+│ Table: main.eng_lumberjack.prod_frontend_log_sql_driver_log │
+└─────────────────────────────────────────────────────────────────┘
+```
+
+### 3.2 Component Interaction Flow
+
+```mermaid
+sequenceDiagram
+ participant App as Application
+ participant Conn as DatabricksConnection
+ participant Stmt as DatabricksStatement
+ participant TC as TelemetryCollector
+ participant TE as TelemetryExporter
+ participant TS as Telemetry Service
+
+ App->>Conn: OpenAsync()
+ Conn->>TC: Initialize(config)
+ TC->>TE: Start background worker
+ Conn->>TC: RecordConnectionOpen(latency, config)
+
+ App->>Stmt: ExecuteQueryAsync()
+ Stmt->>TC: RecordStatementExecution(statementId, latency)
+
+ loop CloudFetch Downloads
+ Stmt->>TC: RecordChunkDownload(chunkIndex, latency, size)
+ end
+
+ Stmt->>TC: RecordStatementComplete(statementId)
+
+ alt Batch size reached
+ TC->>TE: Flush batch
+ TE->>TS: POST /telemetry-ext
+ end
+
+ App->>Conn: CloseAsync()
+ Conn->>TC: Flush all pending
+ TC->>TE: Force flush
+ TE->>TS: POST /telemetry-ext
+ TE->>TE: Stop worker
+```
+
+### 3.3 Integration with Existing Components
+
+The telemetry system will integrate with existing driver components:
+
+1. **DatabricksConnection**:
+ - Initialize telemetry collector on open
+ - Record connection configuration
+ - Flush telemetry on close
+ - Handle feature flag from server
+
+2. **DatabricksStatement**:
+ - Record statement execution metrics
+ - Track result format (inline vs CloudFetch)
+ - Capture operation latency
+
+3. **CloudFetchDownloader**:
+ - Record chunk download latency
+ - Track retry attempts
+ - Report compression status
+
+4. **Activity Infrastructure**:
+ - Leverage existing Activity context for correlation
+ - Add telemetry as Activity events for unified observability
+ - Maintain W3C trace context propagation
+
+---
+
+## 4. Telemetry Components
+
+### 4.1 TelemetryCollector
+
+**Purpose**: Aggregate and buffer telemetry events per connection.
+
+**Location**:
`Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.TelemetryCollector`
+
+**Responsibilities**:
+- Accept telemetry events from driver operations
+- Aggregate events by statement ID
+- Buffer events for batching
+- Provide non-blocking event ingestion
+- Trigger flush on batch size or time threshold
+
+**Interface**:
+```csharp
+namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
+{
+ /// <summary>
+ /// Collects and aggregates telemetry events for a connection.
+ /// Thread-safe and non-blocking.
+ /// </summary>
+ internal sealed class TelemetryCollector : IDisposable
+ {
+ // Constructor
+ public TelemetryCollector(
+ DatabricksConnection connection,
+ ITelemetryExporter exporter,
+ TelemetryConfiguration config);
+
+ // Event recording methods
+ public void RecordConnectionOpen(
+ TimeSpan latency,
+ DriverConfiguration driverConfig);
+
+ public void RecordStatementExecute(
+ string statementId,
+ TimeSpan latency,
+ ExecutionResultFormat resultFormat);
+
+ public void RecordChunkDownload(
+ string statementId,
+ int chunkIndex,
+ TimeSpan latency,
+ long bytesDownloaded,
+ bool compressed);
+
+ public void RecordOperationStatus(
+ string statementId,
+ int pollCount,
+ TimeSpan totalLatency);
+
+ public void RecordStatementComplete(string statementId);
+
+ public void RecordError(
+ string errorCode,
+ string errorMessage,
+ string? statementId = null,
+ int? chunkIndex = null);
+
+ // Flush methods
+ public Task FlushAsync(CancellationToken cancellationToken = default);
+
+ public Task FlushAllPendingAsync();
+
+ // IDisposable
+ public void Dispose();
+ }
+}
+```
+
+**Implementation Details**:
+
+```csharp
+internal sealed class TelemetryCollector : IDisposable
+{
+ private readonly DatabricksConnection _connection;
+ private readonly ITelemetryExporter _exporter;
+ private readonly TelemetryConfiguration _config;
+ private readonly ConcurrentDictionary<string, StatementTelemetryData>
_statementData;
+ private readonly ConcurrentQueue<TelemetryEvent> _eventQueue;
+ private readonly Timer _flushTimer;
+ private readonly SemaphoreSlim _flushLock;
+ private long _lastFlushTime;
+ private int _eventCount;
+ private bool _disposed;
+
+ public TelemetryCollector(
+ DatabricksConnection connection,
+ ITelemetryExporter exporter,
+ TelemetryConfiguration config)
+ {
+ _connection = connection ?? throw new
ArgumentNullException(nameof(connection));
+ _exporter = exporter ?? throw new
ArgumentNullException(nameof(exporter));
+ _config = config ?? throw new ArgumentNullException(nameof(config));
+
+ _statementData = new ConcurrentDictionary<string,
StatementTelemetryData>();
+ _eventQueue = new ConcurrentQueue<TelemetryEvent>();
+ _flushLock = new SemaphoreSlim(1, 1);
+ _lastFlushTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+
+ // Start periodic flush timer
+ if (_config.FlushIntervalMilliseconds > 0)
+ {
+ _flushTimer = new Timer(
+ OnTimerFlush,
+ null,
+ _config.FlushIntervalMilliseconds,
+ _config.FlushIntervalMilliseconds);
+ }
+ }
+
+ public void RecordConnectionOpen(TimeSpan latency, DriverConfiguration
driverConfig)
+ {
+ if (!_config.Enabled) return;
+
+ var telemetryEvent = new TelemetryEvent
+ {
+ EventType = TelemetryEventType.ConnectionOpen,
+ Timestamp = DateTimeOffset.UtcNow,
+ OperationLatencyMs = (long)latency.TotalMilliseconds,
+ DriverConfig = driverConfig,
+ SessionId = _connection.SessionId,
+ WorkspaceId = _connection.WorkspaceId
+ };
+
+ EnqueueEvent(telemetryEvent);
+ }
+
+ public void RecordStatementExecute(
+ string statementId,
+ TimeSpan latency,
+ ExecutionResultFormat resultFormat)
+ {
+ if (!_config.Enabled || string.IsNullOrEmpty(statementId)) return;
+
+ var stmtData = _statementData.GetOrAdd(
+ statementId,
+ _ => new StatementTelemetryData { StatementId = statementId });
+
+ stmtData.ExecutionLatencyMs = (long)latency.TotalMilliseconds;
+ stmtData.ResultFormat = resultFormat;
+ stmtData.Timestamp = DateTimeOffset.UtcNow;
+ }
+
+ public void RecordChunkDownload(
+ string statementId,
+ int chunkIndex,
+ TimeSpan latency,
+ long bytesDownloaded,
+ bool compressed)
+ {
+ if (!_config.Enabled || string.IsNullOrEmpty(statementId)) return;
+
+ var stmtData = _statementData.GetOrAdd(
+ statementId,
+ _ => new StatementTelemetryData { StatementId = statementId });
+
+ stmtData.ChunkDownloads.Add(new ChunkDownloadData
+ {
+ ChunkIndex = chunkIndex,
+ LatencyMs = (long)latency.TotalMilliseconds,
+ BytesDownloaded = bytesDownloaded,
+ Compressed = compressed
+ });
+
+ stmtData.TotalChunks = Math.Max(stmtData.TotalChunks, chunkIndex + 1);
+ }
+
+ public void RecordStatementComplete(string statementId)
+ {
+ if (!_config.Enabled || string.IsNullOrEmpty(statementId)) return;
+
+ if (_statementData.TryRemove(statementId, out var stmtData))
+ {
+ // Convert statement data to telemetry event
+ var telemetryEvent = CreateStatementEvent(stmtData);
+ EnqueueEvent(telemetryEvent);
+ }
+ }
+
+ public void RecordError(
+ string errorCode,
+ string errorMessage,
+ string? statementId = null,
+ int? chunkIndex = null)
+ {
+ if (!_config.Enabled) return;
+
+ var telemetryEvent = new TelemetryEvent
+ {
+ EventType = TelemetryEventType.Error,
+ Timestamp = DateTimeOffset.UtcNow,
+ ErrorCode = errorCode,
+ ErrorMessage = errorMessage,
+ StatementId = statementId,
+ ChunkIndex = chunkIndex,
+ SessionId = _connection.SessionId,
+ WorkspaceId = _connection.WorkspaceId
+ };
+
+ EnqueueEvent(telemetryEvent);
+ }
+
+ private void EnqueueEvent(TelemetryEvent telemetryEvent)
+ {
+ _eventQueue.Enqueue(telemetryEvent);
+ var count = Interlocked.Increment(ref _eventCount);
+
+ // Trigger flush if batch size reached
+ if (count >= _config.BatchSize)
+ {
+ _ = Task.Run(() => FlushAsync(CancellationToken.None));
+ }
+ }
+
+ public async Task FlushAsync(CancellationToken cancellationToken = default)
+ {
+ if (_eventCount == 0) return;
+
+ await _flushLock.WaitAsync(cancellationToken);
+ try
+ {
+ var events = DequeueEvents();
+ if (events.Count > 0)
+ {
+ await _exporter.ExportAsync(events, cancellationToken);
+ _lastFlushTime =
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+ }
+ }
+ catch (Exception ex)
+ {
+ // Log but don't throw - telemetry must not break driver
+ Debug.WriteLine($"Telemetry flush failed: {ex.Message}");
+ }
+ finally
+ {
+ _flushLock.Release();
+ }
+ }
+
+ public async Task FlushAllPendingAsync()
+ {
+ // Export all pending statement data
+ foreach (var kvp in _statementData)
+ {
+ if (_statementData.TryRemove(kvp.Key, out var stmtData))
+ {
+ var telemetryEvent = CreateStatementEvent(stmtData);
+ EnqueueEvent(telemetryEvent);
+ }
+ }
+
+ // Flush event queue
+ await FlushAsync(CancellationToken.None);
+ }
+
+ private List<TelemetryEvent> DequeueEvents()
+ {
+ var events = new List<TelemetryEvent>(_config.BatchSize);
+ while (_eventQueue.TryDequeue(out var telemetryEvent) && events.Count
< _config.BatchSize)
+ {
+ events.Add(telemetryEvent);
+ Interlocked.Decrement(ref _eventCount);
+ }
+ return events;
+ }
+
+ private void OnTimerFlush(object? state)
+ {
+ var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+ if (now - _lastFlushTime >= _config.FlushIntervalMilliseconds &&
_eventCount > 0)
+ {
+ _ = Task.Run(() => FlushAsync(CancellationToken.None));
+ }
+ }
+
+ public void Dispose()
+ {
+ if (_disposed) return;
+ _disposed = true;
+
+ _flushTimer?.Dispose();
+
+ // Flush all pending data synchronously on dispose
+ FlushAllPendingAsync().GetAwaiter().GetResult();
+
+ _flushLock?.Dispose();
+ }
+}
+```
+
+### 4.2 TelemetryExporter
+
+**Purpose**: Export telemetry events to Databricks telemetry service.
+
+**Location**:
`Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.TelemetryExporter`
+
+**Responsibilities**:
+- Serialize telemetry events to JSON
+- Send HTTP POST requests to telemetry endpoints
+- Handle authentication (OAuth tokens)
+- Implement retry logic for transient failures
+- Support circuit breaker pattern
+
+**Interface**:
+```csharp
+namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
+{
+ /// <summary>
+ /// Exports telemetry events to Databricks telemetry service.
+ /// </summary>
+ internal interface ITelemetryExporter
+ {
+ Task ExportAsync(
+ IReadOnlyList<TelemetryEvent> events,
+ CancellationToken cancellationToken = default);
+ }
+
+ internal sealed class TelemetryExporter : ITelemetryExporter
+ {
+ public TelemetryExporter(
+ HttpClient httpClient,
+ DatabricksConnection connection,
+ TelemetryConfiguration config);
+
+ public Task ExportAsync(
+ IReadOnlyList<TelemetryEvent> events,
+ CancellationToken cancellationToken = default);
+ }
+}
+```
+
+**Implementation Details**:
+
+```csharp
+internal sealed class TelemetryExporter : ITelemetryExporter
+{
+ private readonly HttpClient _httpClient;
+ private readonly DatabricksConnection _connection;
+ private readonly TelemetryConfiguration _config;
+ private readonly JsonSerializerOptions _jsonOptions;
+ private readonly CircuitBreaker? _circuitBreaker;
+
+ private const string AuthenticatedPath = "/telemetry-ext";
+ private const string UnauthenticatedPath = "/telemetry-unauth";
+
+ public TelemetryExporter(
+ HttpClient httpClient,
+ DatabricksConnection connection,
+ TelemetryConfiguration config)
+ {
+ _httpClient = httpClient ?? throw new
ArgumentNullException(nameof(httpClient));
+ _connection = connection ?? throw new
ArgumentNullException(nameof(connection));
+ _config = config ?? throw new ArgumentNullException(nameof(config));
+
+ _jsonOptions = new JsonSerializerOptions
+ {
+ PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
+ DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
+ WriteIndented = false
+ };
+
+ if (_config.CircuitBreakerEnabled)
+ {
+ _circuitBreaker = new CircuitBreaker(
+ _config.CircuitBreakerThreshold,
+ _config.CircuitBreakerTimeout);
+ }
+ }
+
+ public async Task ExportAsync(
+ IReadOnlyList<TelemetryEvent> events,
+ CancellationToken cancellationToken = default)
+ {
+ if (events == null || events.Count == 0) return;
+
+ try
+ {
+ // Check circuit breaker
+ if (_circuitBreaker != null && _circuitBreaker.IsOpen)
+ {
+ Debug.WriteLine("Telemetry circuit breaker is open, dropping
events");
+ return;
+ }
+
+ // Determine endpoint based on authentication status
+ var isAuthenticated = _connection.IsAuthenticated;
+ var path = isAuthenticated ? AuthenticatedPath :
UnauthenticatedPath;
+ var uri = new Uri(_connection.Host, path);
+
+ // Create request payload
+ var request = CreateTelemetryRequest(events);
+ var json = JsonSerializer.Serialize(request, _jsonOptions);
+ var content = new StringContent(json, Encoding.UTF8,
"application/json");
+
+ // Create HTTP request
+ using var httpRequest = new HttpRequestMessage(HttpMethod.Post,
uri)
+ {
+ Content = content
+ };
+
+ // Add authentication headers if authenticated
+ if (isAuthenticated)
+ {
+ await AddAuthenticationHeadersAsync(httpRequest,
cancellationToken);
+ }
+
+ // Send request with retry
+ var response = await SendWithRetryAsync(httpRequest,
cancellationToken);
+
+ // Handle response
+ if (response.IsSuccessStatusCode)
+ {
+ _circuitBreaker?.RecordSuccess();
+
+ // Parse response for partial failures
+ var responseContent = await
response.Content.ReadAsStringAsync(cancellationToken);
+ var telemetryResponse =
JsonSerializer.Deserialize<TelemetryResponse>(
+ responseContent,
+ _jsonOptions);
+
+ if (telemetryResponse?.Errors?.Count > 0)
+ {
+ Debug.WriteLine(
+ $"Telemetry partial failure:
{telemetryResponse.Errors.Count} errors");
+ }
+ }
+ else
+ {
+ _circuitBreaker?.RecordFailure();
+ Debug.WriteLine(
+ $"Telemetry export failed: {response.StatusCode} -
{response.ReasonPhrase}");
+ }
+ }
+ catch (Exception ex)
+ {
+ _circuitBreaker?.RecordFailure();
+ Debug.WriteLine($"Telemetry export exception: {ex.Message}");
+ // Don't rethrow - telemetry must not break driver operations
+ }
+ }
+
+ private TelemetryRequest
CreateTelemetryRequest(IReadOnlyList<TelemetryEvent> events)
+ {
+ var protoLogs = events.Select(e => new TelemetryFrontendLog
+ {
+ WorkspaceId = e.WorkspaceId,
+ FrontendLogEventId = Guid.NewGuid().ToString(),
+ Context = new FrontendLogContext
+ {
+ ClientContext = new TelemetryClientContext
+ {
+ TimestampMillis = e.Timestamp.ToUnixTimeMilliseconds(),
+ UserAgent = _connection.UserAgent
+ }
+ },
+ Entry = new FrontendLogEntry
+ {
+ SqlDriverLog = CreateSqlDriverLog(e)
+ }
+ }).ToList();
+
+ return new TelemetryRequest
+ {
+ ProtoLogs = protoLogs
+ };
+ }
+
+ private SqlDriverLog CreateSqlDriverLog(TelemetryEvent e)
+ {
+ var log = new SqlDriverLog
+ {
+ SessionId = e.SessionId,
+ SqlStatementId = e.StatementId,
+ OperationLatencyMs = e.OperationLatencyMs,
+ SystemConfiguration = e.DriverConfig != null
+ ? CreateSystemConfiguration(e.DriverConfig)
+ : null,
+ DriverConnectionParams = e.DriverConfig != null
+ ? CreateConnectionParameters(e.DriverConfig)
+ : null
+ };
+
+ // Add SQL operation data if present
+ if (e.SqlOperationData != null)
+ {
+ log.SqlOperation = new SqlExecutionEvent
+ {
+ ExecutionResult = e.SqlOperationData.ResultFormat.ToString(),
+ RetryCount = e.SqlOperationData.RetryCount,
+ ChunkDetails = e.SqlOperationData.ChunkDownloads?.Count > 0
+ ? CreateChunkDetails(e.SqlOperationData.ChunkDownloads)
+ : null
+ };
+ }
+
+ // Add error info if present
+ if (!string.IsNullOrEmpty(e.ErrorCode))
+ {
+ log.ErrorInfo = new DriverErrorInfo
+ {
+ ErrorName = e.ErrorCode,
+ StackTrace = e.ErrorMessage
+ };
+ }
+
+ return log;
+ }
+
+ private async Task<HttpResponseMessage> SendWithRetryAsync(
+ HttpRequestMessage request,
+ CancellationToken cancellationToken)
+ {
+ var retryCount = 0;
+ var maxRetries = _config.MaxRetries;
+
+ while (true)
+ {
+ try
+ {
+ var response = await _httpClient.SendAsync(
+ request,
+ HttpCompletionOption.ResponseHeadersRead,
+ cancellationToken);
+
+ // Don't retry on client errors (4xx)
+ if ((int)response.StatusCode < 500)
+ {
+ return response;
+ }
+
+ // Retry on server errors (5xx) if retries remaining
+ if (retryCount >= maxRetries)
+ {
+ return response;
+ }
+ }
+ catch (HttpRequestException) when (retryCount < maxRetries)
+ {
+ // Retry on network errors
+ }
+ catch (TaskCanceledException) when
(!cancellationToken.IsCancellationRequested && retryCount < maxRetries)
+ {
+ // Retry on timeout (not user cancellation)
+ }
+
+ retryCount++;
+ var delay = TimeSpan.FromMilliseconds(_config.RetryDelayMs *
Math.Pow(2, retryCount - 1));
+ await Task.Delay(delay, cancellationToken);
+ }
+ }
+
+ private async Task AddAuthenticationHeadersAsync(
+ HttpRequestMessage request,
+ CancellationToken cancellationToken)
+ {
+ // Use connection's authentication mechanism
+ var authHeaders = await
_connection.GetAuthenticationHeadersAsync(cancellationToken);
+ foreach (var header in authHeaders)
+ {
+ request.Headers.TryAddWithoutValidation(header.Key, header.Value);
+ }
+ }
+}
+```
+
+### 4.3 CircuitBreaker
+
+**Purpose**: Prevent telemetry storms when service is unavailable.
+
+**Location**: `Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.CircuitBreaker`
+
+**Implementation**:
+```csharp
+namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
+{
+ /// <summary>
+ /// Circuit breaker to prevent telemetry storms.
+ /// </summary>
+ internal sealed class CircuitBreaker
+ {
+ private readonly int _failureThreshold;
+ private readonly TimeSpan _timeout;
+ private int _failureCount;
+ private DateTime _lastFailureTime;
+ private CircuitState _state;
+ private readonly object _lock = new object();
+
+ private enum CircuitState
+ {
+ Closed, // Normal operation
+ Open, // Blocking requests
+ HalfOpen // Testing if service recovered
+ }
+
+ public CircuitBreaker(int failureThreshold, TimeSpan timeout)
+ {
+ _failureThreshold = failureThreshold;
+ _timeout = timeout;
+ _state = CircuitState.Closed;
+ }
+
+ public bool IsOpen
+ {
+ get
+ {
+ lock (_lock)
+ {
+ // Auto-transition from Open to HalfOpen after timeout
+ if (_state == CircuitState.Open)
+ {
+ if (DateTime.UtcNow - _lastFailureTime > _timeout)
+ {
+ _state = CircuitState.HalfOpen;
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+ }
+
+ public void RecordSuccess()
+ {
+ lock (_lock)
+ {
+ _failureCount = 0;
+ _state = CircuitState.Closed;
+ }
+ }
+
+ public void RecordFailure()
+ {
+ lock (_lock)
+ {
+ _failureCount++;
+ _lastFailureTime = DateTime.UtcNow;
+
+ if (_failureCount >= _failureThreshold)
+ {
+ _state = CircuitState.Open;
+ }
+ }
+ }
+ }
+}
+```
+
+### 4.4 TelemetryConfiguration
+
+**Purpose**: Centralize all telemetry configuration.
+
+**Location**:
`Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.TelemetryConfiguration`
+
+**Implementation**:
+```csharp
+namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
+{
+ /// <summary>
+ /// Configuration for telemetry collection and export.
+ /// </summary>
+ public sealed class TelemetryConfiguration
+ {
+ // Enable/disable flags
+ public bool Enabled { get; set; } = true;
+ public bool ForceEnable { get; set; } = false; // Bypass feature flag
+
+ // Batch configuration
+ public int BatchSize { get; set; } = 50;
+ public int FlushIntervalMilliseconds { get; set; } = 30000; // 30
seconds
+
+ // Retry configuration
+ public int MaxRetries { get; set; } = 3;
+ public int RetryDelayMs { get; set; } = 500;
+
+ // Circuit breaker configuration
+ public bool CircuitBreakerEnabled { get; set; } = true;
+ public int CircuitBreakerThreshold { get; set; } = 5;
+ public TimeSpan CircuitBreakerTimeout { get; set; } =
TimeSpan.FromMinutes(1);
+
+ // Log level filtering
+ public TelemetryLogLevel LogLevel { get; set; } =
TelemetryLogLevel.Info;
+
+ // Feature flag name
+ public const string FeatureFlagName =
+
"databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForAdbc";
+
+ // Create from connection properties
+ public static TelemetryConfiguration FromProperties(
+ IReadOnlyDictionary<string, string> properties)
+ {
+ var config = new TelemetryConfiguration();
+
+ if (properties.TryGetValue(DatabricksParameters.TelemetryEnabled,
out var enabled))
+ {
+ config.Enabled = bool.Parse(enabled);
+ }
+
+ if
(properties.TryGetValue(DatabricksParameters.TelemetryBatchSize, out var
batchSize))
+ {
+ config.BatchSize = int.Parse(batchSize);
+ }
+
+ if
(properties.TryGetValue(DatabricksParameters.TelemetryFlushIntervalMs, out var
flushInterval))
+ {
+ config.FlushIntervalMilliseconds = int.Parse(flushInterval);
+ }
+
+ return config;
+ }
+ }
+
+ public enum TelemetryLogLevel
+ {
+ Off = 0,
+ Error = 1,
+ Warn = 2,
+ Info = 3,
+ Debug = 4,
+ Trace = 5
+ }
+}
+```
+
+---
+
+## 5. Data Schema
+
+### 5.1 Telemetry Event Model
+
+```csharp
+namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Models
+{
+ /// <summary>
+ /// Base telemetry event.
+ /// </summary>
+ internal sealed class TelemetryEvent
+ {
+ public TelemetryEventType EventType { get; set; }
+ public DateTimeOffset Timestamp { get; set; }
+ public long? WorkspaceId { get; set; }
+ public string? SessionId { get; set; }
+ public string? StatementId { get; set; }
+ public long? OperationLatencyMs { get; set; }
+
+ // Driver configuration (connection events only)
+ public DriverConfiguration? DriverConfig { get; set; }
+
+ // SQL operation data (statement events only)
+ public SqlOperationData? SqlOperationData { get; set; }
+
+ // Error information (error events only)
+ public string? ErrorCode { get; set; }
+ public string? ErrorMessage { get; set; }
+ public int? ChunkIndex { get; set; }
+ }
+
+ public enum TelemetryEventType
+ {
+ ConnectionOpen,
+ StatementExecution,
+ Error
+ }
+}
+```
+
+### 5.2 Driver Configuration Model
+
+```csharp
+namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Models
+{
+ /// <summary>
+ /// Driver configuration snapshot (collected once per connection).
+ /// </summary>
+ internal sealed class DriverConfiguration
+ {
+ // System information
+ public string? DriverName { get; set; } = "Databricks.ADBC.CSharp";
+ public string? DriverVersion { get; set; }
+ public string? OsName { get; set; }
+ public string? OsVersion { get; set; }
+ public string? RuntimeVersion { get; set; }
+ public string? ProcessName { get; set; }
+
+ // Connection configuration
+ public string? AuthType { get; set; }
+ public string? HostUrl { get; set; }
+ public string? HttpPath { get; set; }
+
+ // Feature flags
+ public bool CloudFetchEnabled { get; set; }
+ public bool Lz4DecompressionEnabled { get; set; }
+ public bool DirectResultsEnabled { get; set; }
+ public bool TracePropagationEnabled { get; set; }
+ public bool MultipleCatalogSupport { get; set; }
+ public bool PrimaryKeyForeignKeyEnabled { get; set; }
+
+ // CloudFetch configuration
+ public long MaxBytesPerFile { get; set; }
+ public long MaxBytesPerFetchRequest { get; set; }
+ public int MaxParallelDownloads { get; set; }
+ public int PrefetchCount { get; set; }
+ public int MemoryBufferSizeMb { get; set; }
+
+ // Proxy configuration
+ public bool UseProxy { get; set; }
+ public string? ProxyHost { get; set; }
+ public int? ProxyPort { get; set; }
+
+ // Statement configuration
+ public long BatchSize { get; set; }
+ public int PollTimeMs { get; set; }
+
+ // Direct results limits
+ public long DirectResultMaxBytes { get; set; }
+ public long DirectResultMaxRows { get; set; }
+ }
+}
+```
+
+### 5.3 SQL Operation Data Model
+
+```csharp
+namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Models
+{
+ /// <summary>
+ /// SQL operation metrics.
+ /// </summary>
+ internal sealed class SqlOperationData
+ {
+ public string? StatementId { get; set; }
+ public ExecutionResultFormat ResultFormat { get; set; }
+ public long ExecutionLatencyMs { get; set; }
+ public int RetryCount { get; set; }
+ public int PollCount { get; set; }
+ public long TotalPollLatencyMs { get; set; }
+
+ // CloudFetch specific
+ public List<ChunkDownloadData>? ChunkDownloads { get; set; }
+ public int TotalChunks { get; set; }
+ }
+
+ public enum ExecutionResultFormat
+ {
+ Unknown = 0,
+ InlineArrow = 1,
+ InlineJson = 2,
+ ExternalLinks = 3 // CloudFetch
+ }
+
+ internal sealed class ChunkDownloadData
+ {
+ public int ChunkIndex { get; set; }
+ public long LatencyMs { get; set; }
+ public long BytesDownloaded { get; set; }
+ public bool Compressed { get; set; }
+ }
+}
+```
+
+### 5.4 Server Payload Schema
+
+The exported JSON payload matches JDBC format for consistency:
+
+```json
+{
+ "proto_logs": [
+ {
+ "workspace_id": 1234567890,
+ "frontend_log_event_id": "550e8400-e29b-41d4-a716-446655440000",
+ "context": {
+ "client_context": {
+ "timestamp_millis": 1698765432000,
+ "user_agent": "Databricks-ADBC-CSharp/1.0.0"
+ }
+ },
+ "entry": {
+ "sql_driver_log": {
+ "session_id": "01234567-89ab-cdef-0123-456789abcdef",
+ "sql_statement_id": "01234567-89ab-cdef-0123-456789abcdef",
+ "operation_latency_ms": 1234,
+ "system_configuration": {
+ "driver_name": "Databricks.ADBC.CSharp",
+ "driver_version": "1.0.0",
+ "os_name": "Windows",
+ "os_version": "10.0.19042",
+ "runtime_version": ".NET 8.0.0",
+ "process_name": "PowerBI.Desktop"
+ },
+ "driver_connection_params": {
+ "auth_type": "oauth_client_credentials",
+ "cloudfetch_enabled": true,
+ "lz4_decompression_enabled": true,
+ "direct_results_enabled": true,
+ "max_bytes_per_file": 20971520,
+ "max_parallel_downloads": 3,
+ "batch_size": 2000000
+ },
+ "sql_operation": {
+ "execution_result": "EXTERNAL_LINKS",
+ "retry_count": 0,
+ "chunk_details": {
+ "total_chunks": 10,
+ "chunks_downloaded": 10,
+ "total_download_latency_ms": 5432,
+ "avg_chunk_size_bytes": 15728640,
+ "compressed": true
+ }
+ },
+ "error_info": null
+ }
+ }
+ }
+ ]
+}
+```
+
+---
+
+## 6. Collection Points
+
+### 6.1 Connection Lifecycle Events
+
+#### 6.1.1 Connection Open
+
+**Location**: `DatabricksConnection.OpenAsync()`
+
+**What to Collect**:
+- Connection open latency
+- Driver configuration snapshot
+- Session ID
+- Workspace ID
+
+**Implementation**:
+```csharp
+public override async Task OpenAsync(CancellationToken cancellationToken =
default)
+{
+ var sw = Stopwatch.StartNew();
+
+ try
+ {
+ await base.OpenAsync(cancellationToken);
+
+ // Initialize telemetry after successful connection
+ InitializeTelemetry();
+
+ sw.Stop();
+
+ // Record connection open event
+ _telemetryCollector?.RecordConnectionOpen(
+ sw.Elapsed,
+ CreateDriverConfiguration());
+ }
+ catch (Exception)
+ {
+ sw.Stop();
+ // Error will be recorded by exception handler
+ throw;
+ }
+}
+
+private DriverConfiguration CreateDriverConfiguration()
+{
+ return new DriverConfiguration
+ {
+ DriverName = "Databricks.ADBC.CSharp",
+ DriverVersion = GetType().Assembly.GetName().Version?.ToString(),
+ OsName = Environment.OSVersion.Platform.ToString(),
+ OsVersion = Environment.OSVersion.Version.ToString(),
+ RuntimeVersion = Environment.Version.ToString(),
+ ProcessName = Process.GetCurrentProcess().ProcessName,
+
+ AuthType = DetermineAuthType(),
+ HostUrl = Host?.Host,
+ HttpPath = HttpPath,
+
+ CloudFetchEnabled = UseCloudFetch,
+ Lz4DecompressionEnabled = CanDecompressLz4,
+ DirectResultsEnabled = _enableDirectResults,
+ TracePropagationEnabled = _tracePropagationEnabled,
+ MultipleCatalogSupport = _enableMultipleCatalogSupport,
+ PrimaryKeyForeignKeyEnabled = _enablePKFK,
+
+ MaxBytesPerFile = _maxBytesPerFile,
+ MaxBytesPerFetchRequest = _maxBytesPerFetchRequest,
+ MaxParallelDownloads = GetIntProperty(
+ DatabricksParameters.CloudFetchParallelDownloads,
+ 3),
+ PrefetchCount = GetIntProperty(
+ DatabricksParameters.CloudFetchPrefetchCount,
+ 2),
+ MemoryBufferSizeMb = GetIntProperty(
+ DatabricksParameters.CloudFetchMemoryBufferSizeMb,
+ 200),
+
+ UseProxy = Properties.ContainsKey(ApacheParameters.ProxyHost),
+ ProxyHost = Properties.TryGetValue(ApacheParameters.ProxyHost, out var
host)
+ ? host
+ : null,
+ ProxyPort = Properties.TryGetValue(ApacheParameters.ProxyPort, out var
port)
+ ? int.Parse(port)
+ : (int?)null,
+
+ BatchSize = DatabricksStatement.DatabricksBatchSizeDefault,
+ PollTimeMs = GetIntProperty(
+ ApacheParameters.PollTimeMilliseconds,
+ DatabricksConstants.DefaultAsyncExecPollIntervalMs),
+
+ DirectResultMaxBytes = _directResultMaxBytes,
+ DirectResultMaxRows = _directResultMaxRows
+ };
+}
+```
+
+#### 6.1.2 Connection Close
+
+**Location**: `DatabricksConnection.Dispose()`
+
+**What to Do**:
+- Flush all pending telemetry
+- Dispose telemetry collector
+
+**Implementation**:
+```csharp
+public override void Dispose()
+{
+ try
+ {
+ // Flush telemetry before closing connection
+ _telemetryCollector?.FlushAllPendingAsync().GetAwaiter().GetResult();
+ }
+ catch (Exception ex)
+ {
+ Debug.WriteLine($"Error flushing telemetry on connection close:
{ex.Message}");
+ }
+ finally
+ {
+ _telemetryCollector?.Dispose();
+ _telemetryCollector = null;
+
+ base.Dispose();
+ }
+}
+```
+
+### 6.2 Statement Execution Events
+
+#### 6.2.1 Statement Execute
+
+**Location**: `DatabricksStatement.ExecuteQueryAsync()`
+
+**What to Collect**:
+- Statement execution latency
+- Result format (inline vs CloudFetch)
+- Statement ID
+
+**Implementation**:
+```csharp
+protected override async Task<QueryResult> ExecuteQueryAsync(
+ string? sqlQuery,
+ CancellationToken cancellationToken = default)
+{
+ var sw = Stopwatch.StartNew();
+ string? statementId = null;
+
+ try
+ {
+ var result = await base.ExecuteQueryAsync(sqlQuery, cancellationToken);
+
+ sw.Stop();
+ statementId = result.StatementHandle?.ToSQLExecStatementId();
+
+ // Determine result format
+ var resultFormat = DetermineResultFormat(result);
+
+ // Record statement execution
+ Connection.TelemetryCollector?.RecordStatementExecute(
+ statementId ?? Guid.NewGuid().ToString(),
+ sw.Elapsed,
+ resultFormat);
+
+ return result;
+ }
+ catch (Exception ex)
+ {
+ sw.Stop();
+
+ // Record error
+ Connection.TelemetryCollector?.RecordError(
+ DetermineErrorCode(ex),
+ ex.Message,
+ statementId);
+
+ throw;
+ }
+}
+
+private ExecutionResultFormat DetermineResultFormat(QueryResult result)
+{
+ if (result.DirectResult != null)
+ {
+ return ExecutionResultFormat.InlineArrow;
+ }
+ else if (result.ResultLinks != null && result.ResultLinks.Count > 0)
+ {
+ return ExecutionResultFormat.ExternalLinks;
+ }
+ else
+ {
+ return ExecutionResultFormat.Unknown;
+ }
+}
+```
+
+#### 6.2.2 Statement Close
+
+**Location**: `DatabricksStatement.Dispose()`
+
+**What to Do**:
+- Mark statement as complete in telemetry
+
+**Implementation**:
+```csharp
+public override void Dispose()
+{
+ try
+ {
+ // Mark statement complete (triggers export of aggregated metrics)
+ if (!string.IsNullOrEmpty(_statementId))
+ {
+
Connection.TelemetryCollector?.RecordStatementComplete(_statementId);
+ }
+ }
+ finally
+ {
+ base.Dispose();
+ }
+}
+```
+
+### 6.3 CloudFetch Events
+
+#### 6.3.1 Chunk Download
+
+**Location**: `CloudFetchDownloader.DownloadFileAsync()`
+
+**What to Collect**:
+- Download latency per chunk
+- Bytes downloaded
+- Compression status
+- Retry attempts
+
+**Implementation**:
+```csharp
+private async Task DownloadFileAsync(
+ IDownloadResult downloadResult,
+ CancellationToken cancellationToken)
+{
+ var sw = Stopwatch.StartNew();
+ var retryCount = 0;
+
+ while (retryCount <= _maxRetries)
+ {
+ try
+ {
+ using var response = await _httpClient.GetAsync(
+ downloadResult.Url,
+ HttpCompletionOption.ResponseHeadersRead,
+ cancellationToken);
+
+ response.EnsureSuccessStatusCode();
+
+ var contentLength = response.Content.Headers.ContentLength ?? 0;
+ var stream = await
response.Content.ReadAsStreamAsync(cancellationToken);
+
+ // Decompress if needed
+ if (_isLz4Compressed)
+ {
+ stream = LZ4Stream.Decode(stream);
+ }
+
+ // Copy to memory buffer
+ await _memoryManager.ReserveAsync(contentLength,
cancellationToken);
+ var memoryStream = new MemoryStream();
+ await stream.CopyToAsync(memoryStream, cancellationToken);
+
+ sw.Stop();
+
+ // Record successful download
+ _statement.Connection.TelemetryCollector?.RecordChunkDownload(
+ _statement.StatementId,
+ downloadResult.ChunkIndex,
+ sw.Elapsed,
+ contentLength,
+ _isLz4Compressed);
+
+ downloadResult.SetData(memoryStream);
+ return;
+ }
+ catch (Exception ex)
+ {
+ retryCount++;
+
+ if (retryCount > _maxRetries)
+ {
+ sw.Stop();
+
+ // Record download error
+ _statement.Connection.TelemetryCollector?.RecordError(
+ "CHUNK_DOWNLOAD_ERROR",
+ ex.Message,
+ _statement.StatementId,
+ downloadResult.ChunkIndex);
+
+ downloadResult.SetError(ex);
+ throw;
+ }
+
+ await Task.Delay(_retryDelayMs * retryCount, cancellationToken);
+ }
+ }
+}
+```
+
+#### 6.3.2 Operation Status Polling
+
+**Location**: `DatabricksOperationStatusPoller.PollForCompletionAsync()`
+
+**What to Collect**:
+- Number of polls
+- Total polling latency
+
+**Implementation**:
+```csharp
+public async Task<TGetOperationStatusResp> PollForCompletionAsync(
+ TOperationHandle operationHandle,
+ CancellationToken cancellationToken = default)
+{
+ var sw = Stopwatch.StartNew();
+ var pollCount = 0;
+
+ try
+ {
+ TGetOperationStatusResp? statusResp = null;
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ statusResp = await GetOperationStatusAsync(operationHandle,
cancellationToken);
+ pollCount++;
+
+ if (IsComplete(statusResp.OperationState))
+ {
+ break;
+ }
+
+ await Task.Delay(_pollIntervalMs, cancellationToken);
+ }
+
+ sw.Stop();
+
+ // Record polling metrics
+ _connection.TelemetryCollector?.RecordOperationStatus(
+ operationHandle.OperationId?.Guid.ToString() ?? string.Empty,
+ pollCount,
+ sw.Elapsed);
+
+ return statusResp!;
+ }
+ catch (Exception)
+ {
+ sw.Stop();
+ throw;
+ }
+}
+```
+
+### 6.4 Error Events
+
+#### 6.4.1 Exception Handler Integration
+
+**Location**: Throughout driver code
+
+**What to Collect**:
+- Error code/type
+- Error message (sanitized)
+- Statement ID (if available)
+- Chunk index (for download errors)
+
+**Implementation Pattern**:
+```csharp
+try
+{
+ // Driver operation
+}
+catch (DatabricksException ex)
+{
+ Connection.TelemetryCollector?.RecordError(
+ ex.ErrorCode,
+ SanitizeErrorMessage(ex.Message),
+ statementId,
+ chunkIndex);
+
+ throw;
+}
+catch (AdbcException ex)
+{
+ Connection.TelemetryCollector?.RecordError(
+ ex.Status.ToString(),
+ SanitizeErrorMessage(ex.Message),
+ statementId);
+
+ throw;
+}
+catch (Exception ex)
+{
+ Connection.TelemetryCollector?.RecordError(
+ "UNKNOWN_ERROR",
+ SanitizeErrorMessage(ex.Message),
+ statementId);
+
+ throw;
+}
+
+private static string SanitizeErrorMessage(string message)
+{
+ // Remove potential PII from error messages
+ // - Remove connection strings
+ // - Remove auth tokens
+ // - Remove file paths containing usernames
+ // - Keep only first 500 characters
+
+ var sanitized = message;
+
+ // Remove anything that looks like a connection string
+ sanitized = Regex.Replace(
+ sanitized,
+ @"token=[^;]+",
+ "token=***",
+ RegexOptions.IgnoreCase);
+
+ // Remove Bearer tokens
+ sanitized = Regex.Replace(
+ sanitized,
+ @"Bearer\s+[A-Za-z0-9\-._~+/]+=*",
+ "Bearer ***",
+ RegexOptions.IgnoreCase);
+
+ // Truncate to 500 characters
+ if (sanitized.Length > 500)
+ {
+ sanitized = sanitized.Substring(0, 500) + "...";
+ }
+
+ return sanitized;
+}
+```
+
+---
+
+## 7. Export Mechanism
+
+### 7.1 Export Flow
+
+```
+┌─────────────────────────────────────────────────────────────────┐
+│ Driver Operations │
+│ (Emit events to TelemetryCollector) │
+└─────────────────────────────────────────────────────────────────┘
+ │
+ ▼
+┌─────────────────────────────────────────────────────────────────┐
+│ TelemetryCollector │
+│ - Buffer events in ConcurrentQueue │
+│ - Aggregate statement metrics in ConcurrentDictionary │
+│ - Track batch size and time since last flush │
+└─────────────────────────────────────────────────────────────────┘
+ │
+ ┌─────────────┼─────────────┐
+ │ │ │
+ ▼ ▼ ▼
+ Batch Size Time Based Connection Close
+ Threshold Periodic Flush
+ Reached Flush
+ │ │ │
+ └─────────────┼─────────────┘
+ ▼
+┌─────────────────────────────────────────────────────────────────┐
+│ TelemetryExporter │
+│ 1. Check circuit breaker state │
+│ 2. Serialize events to JSON │
+│ 3. Create HTTP POST request │
+│ 4. Add authentication headers (if authenticated) │
+│ 5. Send with retry logic │
+│ 6. Update circuit breaker on success/failure │
+└─────────────────────────────────────────────────────────────────┘
+ │
+ ▼ HTTP POST
+┌─────────────────────────────────────────────────────────────────┐
+│ Databricks Telemetry Service │
+│ Endpoints: │
+│ - POST /telemetry-ext (authenticated) │
+│ Auth: OAuth token from connection │
+│ - POST /telemetry-unauth (unauthenticated) │
+│ For pre-authentication errors only │
+└─────────────────────────────────────────────────────────────────┘
+ │
+ ▼
+┌─────────────────────────────────────────────────────────────────┐
+│ Lumberjack Pipeline │
+│ - Regional Logfood │
+│ - Central Logfood │
+│ - Table: main.eng_lumberjack.prod_frontend_log_sql_driver_log │
+└─────────────────────────────────────────────────────────────────┘
+```
+
+### 7.2 Export Triggers
+
+#### 7.2.1 Batch Size Threshold
+
+```csharp
+private void EnqueueEvent(TelemetryEvent telemetryEvent)
+{
+ _eventQueue.Enqueue(telemetryEvent);
+ var count = Interlocked.Increment(ref _eventCount);
+
+ // Trigger flush if batch size reached
+ if (count >= _config.BatchSize)
+ {
+ _ = Task.Run(() => FlushAsync(CancellationToken.None));
+ }
+}
+```
+
+**Default**: 50 events per batch
+**Rationale**: Balance between export frequency and network overhead
+
+#### 7.2.2 Time-Based Periodic Flush
+
+```csharp
+private void OnTimerFlush(object? state)
+{
+ var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+ if (now - _lastFlushTime >= _config.FlushIntervalMilliseconds &&
_eventCount > 0)
+ {
+ _ = Task.Run(() => FlushAsync(CancellationToken.None));
+ }
+}
+```
+
+**Default**: 30 seconds
+**Rationale**: Ensure events are exported even with low event rate
+
+#### 7.2.3 Connection Close Flush
+
+```csharp
+public void Dispose()
+{
+ if (_disposed) return;
+ _disposed = true;
+
+ _flushTimer?.Dispose();
+
+ // Flush all pending data synchronously on dispose
+ FlushAllPendingAsync().GetAwaiter().GetResult();
+
+ _flushLock?.Dispose();
+}
+```
+
+**Behavior**: Synchronous flush to ensure no data loss on connection close
+
+### 7.3 Retry Strategy
+
+**Exponential Backoff with Jitter**:
+
+```csharp
+private async Task<HttpResponseMessage> SendWithRetryAsync(
+ HttpRequestMessage request,
+ CancellationToken cancellationToken)
+{
+ var retryCount = 0;
+ var maxRetries = _config.MaxRetries;
+ var random = new Random();
+
+ while (true)
+ {
+ try
+ {
+ var response = await _httpClient.SendAsync(
+ request,
+ HttpCompletionOption.ResponseHeadersRead,
+ cancellationToken);
+
+ // Don't retry on client errors (4xx)
+ if ((int)response.StatusCode < 500)
+ {
+ return response;
+ }
+
+ // Retry on server errors (5xx) if retries remaining
+ if (retryCount >= maxRetries)
+ {
+ return response;
+ }
+ }
+ catch (HttpRequestException) when (retryCount < maxRetries)
+ {
+ // Retry on network errors
+ }
+ catch (TaskCanceledException) when
(!cancellationToken.IsCancellationRequested && retryCount < maxRetries)
+ {
+ // Retry on timeout (not user cancellation)
+ }
+
+ retryCount++;
+
+ // Exponential backoff with jitter
+ var baseDelay = _config.RetryDelayMs * Math.Pow(2, retryCount - 1);
+ var jitter = random.Next(0, (int)(baseDelay * 0.1)); // 10% jitter
+ var delay = TimeSpan.FromMilliseconds(baseDelay + jitter);
+
+ await Task.Delay(delay, cancellationToken);
+ }
+}
+```
+
+**Parameters**:
+- Base delay: 500ms
+- Max retries: 3
+- Exponential multiplier: 2
+- Jitter: 10% of base delay
+
+**Retry Conditions**:
+- ✅ 5xx server errors
+- ✅ Network errors (HttpRequestException)
+- ✅ Timeouts (TaskCanceledException, not user cancellation)
+- ❌ 4xx client errors (don't retry)
+- ❌ User cancellation
+
+### 7.4 Circuit Breaker
+
+**Purpose**: Prevent telemetry storms when service is degraded
+
+**State Transitions**:
+
+```
+ Closed ──────────────────┐
+ │ │
+ │ Failure threshold │ Success
+ │ reached │
+ ▼ │
+ Open ◄────┐ │
+ │ │ │
+ │ │ Failure │
+ │ │ during │
+ │ │ half-open │
+ │ │ │
+ │ Timeout │
+ │ expired │
+ ▼ │ │
+ HalfOpen ──┴──────────────┘
+```
+
+**Configuration**:
+- Failure threshold: 5 consecutive failures
+- Timeout: 60 seconds
+- State check: On every export attempt
+
+**Behavior**:
+- **Closed**: Normal operation, all exports attempted
+- **Open**: Drop all events, no export attempts
+- **HalfOpen**: Allow one export to test if service recovered
+
+---
+
+## 8. Configuration
+
+### 8.1 Connection Parameters
+
+Add new ADBC connection parameters in `DatabricksParameters.cs`:
+
+```csharp
+namespace Apache.Arrow.Adbc.Drivers.Databricks
+{
+ public static partial class DatabricksParameters
+ {
+ // Telemetry enable/disable
+ public const string TelemetryEnabled =
"adbc.databricks.telemetry.enabled";
+
+ // Force enable (bypass feature flag)
+ public const string TelemetryForceEnable =
"adbc.databricks.telemetry.force_enable";
+
+ // Batch configuration
+ public const string TelemetryBatchSize =
"adbc.databricks.telemetry.batch_size";
+ public const string TelemetryFlushIntervalMs =
"adbc.databricks.telemetry.flush_interval_ms";
+
+ // Retry configuration
+ public const string TelemetryMaxRetries =
"adbc.databricks.telemetry.max_retries";
+ public const string TelemetryRetryDelayMs =
"adbc.databricks.telemetry.retry_delay_ms";
+
+ // Circuit breaker configuration
+ public const string TelemetryCircuitBreakerEnabled =
"adbc.databricks.telemetry.circuit_breaker.enabled";
+ public const string TelemetryCircuitBreakerThreshold =
"adbc.databricks.telemetry.circuit_breaker.threshold";
+ public const string TelemetryCircuitBreakerTimeoutSec =
"adbc.databricks.telemetry.circuit_breaker.timeout_sec";
+
+ // Log level filtering
+ public const string TelemetryLogLevel =
"adbc.databricks.telemetry.log_level";
+ }
+}
+```
+
+### 8.2 Default Values
+
+| Parameter | Default | Description |
+|:---|:---|:---|
+| `adbc.databricks.telemetry.enabled` | `true` | Enable/disable telemetry
collection |
+| `adbc.databricks.telemetry.force_enable` | `false` | Bypass server-side
feature flag |
+| `adbc.databricks.telemetry.batch_size` | `50` | Number of events per batch |
+| `adbc.databricks.telemetry.flush_interval_ms` | `30000` | Flush interval in
milliseconds |
+| `adbc.databricks.telemetry.max_retries` | `3` | Maximum retry attempts |
+| `adbc.databricks.telemetry.retry_delay_ms` | `500` | Base retry delay in
milliseconds |
+| `adbc.databricks.telemetry.circuit_breaker.enabled` | `true` | Enable
circuit breaker |
+| `adbc.databricks.telemetry.circuit_breaker.threshold` | `5` | Failure
threshold |
+| `adbc.databricks.telemetry.circuit_breaker.timeout_sec` | `60` | Open state
timeout in seconds |
+| `adbc.databricks.telemetry.log_level` | `Info` | Minimum log level
(Off/Error/Warn/Info/Debug/Trace) |
+
+### 8.3 Example Configuration
+
+#### JSON Configuration File
+
+```json
+{
+ "adbc.connection.host": "https://my-workspace.databricks.com",
+ "adbc.connection.auth_type": "oauth",
+ "adbc.databricks.oauth.client_id": "my-client-id",
+ "adbc.databricks.oauth.client_secret": "my-secret",
+
+ "adbc.databricks.telemetry.enabled": "true",
+ "adbc.databricks.telemetry.batch_size": "100",
+ "adbc.databricks.telemetry.flush_interval_ms": "60000",
+ "adbc.databricks.telemetry.log_level": "Info"
+}
+```
+
+#### Programmatic Configuration
+
+```csharp
+var properties = new Dictionary<string, string>
+{
+ [DatabricksParameters.HostName] = "https://my-workspace.databricks.com",
+ [DatabricksParameters.AuthType] = "oauth",
+ [DatabricksParameters.OAuthClientId] = "my-client-id",
+ [DatabricksParameters.OAuthClientSecret] = "my-secret",
+
+ [DatabricksParameters.TelemetryEnabled] = "true",
+ [DatabricksParameters.TelemetryBatchSize] = "100",
+ [DatabricksParameters.TelemetryFlushIntervalMs] = "60000",
+ [DatabricksParameters.TelemetryLogLevel] = "Info"
+};
+
+using var driver = new DatabricksDriver();
+using var database = driver.Open(properties);
+using var connection = database.Connect();
+```
+
+#### Disable Telemetry
+
+```csharp
+var properties = new Dictionary<string, string>
+{
+ // ... other properties ...
+ [DatabricksParameters.TelemetryEnabled] = "false"
+};
+```
+
+### 8.4 Server-Side Feature Flag
+
+**Feature Flag Name**:
`databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForAdbc`
+
+**Checking Logic**:
+
+```csharp
+private async Task<bool> IsTelemetryEnabledByServerAsync(CancellationToken
cancellationToken)
+{
+ // Check client-side force enable first
+ if (_config.ForceEnable)
+ {
+ return true;
+ }
+
+ try
+ {
+ // Query server for feature flag
+ // This happens during ApplyServerSidePropertiesAsync()
+ var query = $"SELECT * FROM databricks_client_config WHERE key =
'{TelemetryConfiguration.FeatureFlagName}'";
+
+ using var statement = Connection.CreateStatement();
+ using var reader = await statement.ExecuteQueryAsync(query,
cancellationToken);
+
+ if (await reader.ReadAsync(cancellationToken))
+ {
+ var value = reader.GetString(1); // value column
+ return bool.TryParse(value, out var enabled) && enabled;
+ }
+ }
+ catch (Exception ex)
+ {
+ Debug.WriteLine($"Failed to check telemetry feature flag:
{ex.Message}");
+ // Default to enabled if check fails
+ return true;
Review Comment:
Default to enable? Is this expected? Any side-effect(perf) for this exporter?
##########
csharp/src/Drivers/Databricks/Telemetry/prompts.txt:
##########
@@ -0,0 +1,11 @@
+1. "can you understand the content present in this google doc:
{telemetry-design-doc-url}"
Review Comment:
How do you plugin the actual url? Is this a databricks internal google doc
link?
##########
csharp/src/Drivers/Databricks/Telemetry/prompts.txt:
##########
@@ -0,0 +1,11 @@
+1. "can you understand the content present in this google doc:
{telemetry-design-doc-url}"
+
+2. "can you use google mcp"
+
+4. "can you check the databricks jdbc repo and understand how it is currently
implemented"
+
+5. "now, lets go through the arrow adbc driver for databricks present at
{project-location}/arrow-adbc/csharp/src/Drivers/Databricks, and understand its
flow"
Review Comment:
Can we ask llm to also go through this PR of how existing tracing/exporter
framework is implemented: https://github.com/apache/arrow-adbc/pull/3315.
I believe this can potentially reduce this complexity of our design and we
can reuse some of the existing components.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]