birschick-bq commented on code in PR #3191:
URL: https://github.com/apache/arrow-adbc/pull/3191#discussion_r2258263885


##########
csharp/src/Drivers/Databricks/Telemetry/TelemetryHelper.cs:
##########
@@ -0,0 +1,174 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Reflection;
+using System.Diagnostics;
+using System.Collections.Concurrent;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Model;
+using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Enums;
+using Apache.Arrow.Adbc.Drivers.Apache.Spark;
+using System.Text.Json;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
+{
+    internal class TelemetryHelper
+    {
+        private static readonly ConcurrentQueue<TelemetryFrontendLog> 
EventsBatch = new ConcurrentQueue<TelemetryFrontendLog>();
+        private long _lastFlushTimeMillis;
+        private readonly Timer _flushTimer;
+
+        private TelemetryClient? _telemetryClient;
+
+        private ClientContext? _clientContext;
+        private string? _accessToken;
+        private string? _hostUrl;
+        private DriverConnectionParameters? _connectionParameters;
+        private readonly DriverSystemConfiguration _systemConfiguration;
+
+        public TelemetryHelper(string? hostUrl, string? accessToken)
+        {
+            _hostUrl = hostUrl;

Review Comment:
   This `_hostUrl` is never used. It's duplicated in 
`DriverConnectionParameters`.



##########
csharp/src/Drivers/Databricks/DatabricksConnection.cs:
##########
@@ -76,15 +86,16 @@ internal class DatabricksConnection : SparkHttpConnection
 
         public DatabricksConnection(IReadOnlyDictionary<string, string> 
properties) : base(properties)
         {
+            _guid = Guid.NewGuid();
             ValidateProperties();
+            _databricksActivityListener = new 
DatabricksActivityListener(_telemetryHelper, this.AssemblyName, _guid);
         }
 
         public override IEnumerable<KeyValuePair<string, object?>>? 
GetActivitySourceTags(IReadOnlyDictionary<string, string> properties)
         {
             IEnumerable<KeyValuePair<string, object?>>? tags = 
base.GetActivitySourceTags(properties);
-            // TODO: Add any additional tags specific to Databricks connection
-            //tags ??= [];
-            //tags.Concat([new("key", "value")]);
+            tags ??= [];
+            tags.Concat([new("guid",_guid)]);

Review Comment:
   Need to use the returned collection.
   ```suggestion
               tags = tags.Concat([new("guid",_guid)]);
   ```



##########
csharp/src/Drivers/Databricks/DatabricksConnection.cs:
##########
@@ -281,6 +292,43 @@ private void ValidateProperties()
             {
                 _identityFederationClientId = identityFederationClientId;
             }
+
+            //Telemetry
+            var connectionParams = new DriverConnectionParameters();
+            var hostDetails = new HostDetails();
+            var clientContext = new ClientContext();
+            string? token = null;
+
+            if (Properties.TryGetValue(SparkParameters.AuthType, out string? 
authType))
+            {
+                connectionParams.AuthMech = Util.StringToAuthMech(authType);
+            }
+
+            if (Properties.TryGetValue(SparkParameters.HostName, out string? 
host))
+            {
+                hostDetails.HostUrl = host;
+            }
+            if (Properties.TryGetValue(SparkParameters.Port, out string? port))
+            {
+                hostDetails.Port = Int32.Parse(port);
+            }
+            connectionParams.HostInfo = hostDetails;
+
+            if (Properties.TryGetValue(SparkParameters.UserAgentEntry, out 
string? userAgent))
+            {
+                clientContext.UserAgent = userAgent;
+            }
+            
+            if(Properties.TryGetValue(SparkParameters.AccessToken, out string? 
accessToken))
+            {
+                token = accessToken;
+            }   
+            else if(Properties.TryGetValue(SparkParameters.Token, out string? 
accesstoken))
+            {
+                token = accesstoken;
+            }
+            _telemetryHelper = new TelemetryHelper(hostDetails.HostUrl, token);

Review Comment:
   You don't need to send `hostDetails.HostUrl`.



##########
csharp/src/Drivers/Databricks/DatabricksConnection.cs:
##########
@@ -281,6 +292,43 @@ private void ValidateProperties()
             {
                 _identityFederationClientId = identityFederationClientId;
             }
+
+            //Telemetry
+            var connectionParams = new DriverConnectionParameters();
+            var hostDetails = new HostDetails();
+            var clientContext = new ClientContext();
+            string? token = null;
+
+            if (Properties.TryGetValue(SparkParameters.AuthType, out string? 
authType))
+            {
+                connectionParams.AuthMech = Util.StringToAuthMech(authType);
+            }
+
+            if (Properties.TryGetValue(SparkParameters.HostName, out string? 
host))
+            {
+                hostDetails.HostUrl = host;
+            }
+            if (Properties.TryGetValue(SparkParameters.Port, out string? port))
+            {
+                hostDetails.Port = Int32.Parse(port);
+            }
+            connectionParams.HostInfo = hostDetails;
+
+            if (Properties.TryGetValue(SparkParameters.UserAgentEntry, out 
string? userAgent))
+            {
+                clientContext.UserAgent = userAgent;
+            }
+            
+            if(Properties.TryGetValue(SparkParameters.AccessToken, out string? 
accessToken))
+            {
+                token = accessToken;
+            }   
+            else if(Properties.TryGetValue(SparkParameters.Token, out string? 
accesstoken))
+            {
+                token = accesstoken;
+            }
+            _telemetryHelper = new TelemetryHelper(hostDetails.HostUrl, token);
+            _telemetryHelper.SetParameters(connectionParams, clientContext);

Review Comment:
   You should just pass these in the constructor, above.



##########
csharp/src/Drivers/Databricks/DatabricksConnection.cs:
##########
@@ -757,6 +806,16 @@ protected override void Dispose(bool disposing)
             if (disposing)
             {
                 _authHttpClient?.Dispose();
+                _databricksActivityListener?.Dispose();
+                
+                try
+                {
+                    
_telemetryHelper?.ForceFlushAsync().Wait(TimeSpan.FromSeconds(2));

Review Comment:
   I would have thought that you want to flush before disposing 
`_authHttpClient`?



##########
csharp/src/Drivers/Databricks/Telemetry/DatabricksActivityListener.cs:
##########
@@ -0,0 +1,69 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using Apache.Arrow.Adbc.Tracing;
+using Apache.Arrow.Adbc.Drivers.Apache;
+using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Model;
+using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Enums;
+
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
+{
+    internal class DatabricksActivityListener : IDisposable
+    {
+        private readonly ActivityListener _activityListener;
+        private TelemetryHelper? _telemetryHelper;
+
+        public DatabricksActivityListener(TelemetryHelper? telemetryHelper, 
string sourceName, Guid guid)
+        {
+            this._telemetryHelper = telemetryHelper;
+            this._activityListener = new ActivityListener
+            {
+                ShouldListenTo = (activitySource) => 
activitySource.Tags?.Any(kvp => kvp.Key == "guid" && kvp.Value?.Equals(guid) == 
true) == true,
+                Sample = (ref ActivityCreationOptions<ActivityContext> _) => 
ActivitySamplingResult.AllData,
+                ActivityStopped = OnActivityStopped,
+            };
+            ActivitySource.AddActivityListener(_activityListener);
+        }
+
+        private void OnActivityStopped(Activity activity)
+        {       
+            if(_telemetryHelper == null)
+            {
+                return;
+            }
+
+            if(activity.OperationName?.EndsWith("ExecuteStatementAsync") == 
true)
+            {
+                var sqlExecutionEvent = new SqlExecutionEvent();
+                var operationDetail = new OperationDetail();
+                operationDetail.OperationType = 
Util.StringToOperationType("EXECUTE_STATEMENT_ASYNC");

Review Comment:
   Why not use the `enum` directly? (`OperationType.EXECUTE_STATEMENT_ASYNC`)



##########
csharp/src/Drivers/Databricks/DatabricksConnection.cs:
##########
@@ -76,15 +86,16 @@ internal class DatabricksConnection : SparkHttpConnection
 
         public DatabricksConnection(IReadOnlyDictionary<string, string> 
properties) : base(properties)
         {
+            _guid = Guid.NewGuid();
             ValidateProperties();
+            _databricksActivityListener = new 
DatabricksActivityListener(_telemetryHelper, this.AssemblyName, _guid);
         }
 
         public override IEnumerable<KeyValuePair<string, object?>>? 
GetActivitySourceTags(IReadOnlyDictionary<string, string> properties)
         {
             IEnumerable<KeyValuePair<string, object?>>? tags = 
base.GetActivitySourceTags(properties);
-            // TODO: Add any additional tags specific to Databricks connection
-            //tags ??= [];
-            //tags.Concat([new("key", "value")]);
+            tags ??= [];
+            tags.Concat([new("guid",_guid)]);

Review Comment:
   I would recommend a key with "databricks. ..." in the namespace so as not to 
accidentally be confused with a tag from another trace source. Also use a 
constant to reduce chance of spelling mistake.



##########
csharp/src/Drivers/Databricks/Telemetry/TelemetryClient.cs:
##########
@@ -0,0 +1,124 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Text.Json;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Model;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
+{
+    internal class TelemetryClient
+    {
+        private readonly HttpClient _httpClient;
+        private readonly string? _telemetryUrl;
+        private readonly string? _accessToken;
+
+        public TelemetryClient(HttpClient httpClient, string? hostUrl, string? 
accessToken)
+        {
+            _httpClient = httpClient;
+            _accessToken = accessToken;
+            _telemetryUrl = !string.IsNullOrEmpty(hostUrl) ? accessToken != 
null ? $"https://{hostUrl}/telemetry-ext"; : 
$"https://{hostUrl}/telemetry-unauth"; : null;
+        }
+
+        /// <summary>
+        /// Sends a batch of telemetry events asynchronously
+        /// </summary>
+        /// <param name="telemetryBatch">List of telemetry events to 
send</param>
+        /// <returns>Task representing the async operation</returns>
+        public async Task<bool> 
SendTelemetryBatchAsync(List<TelemetryFrontendLog> telemetryBatch)
+        {
+            if (string.IsNullOrEmpty(_telemetryUrl) || telemetryBatch.Count == 
0)
+            {
+                return false;
+            }
+
+            try
+            {
+                var request = new HttpRequestMessage(HttpMethod.Post, 
_telemetryUrl);
+                
+                // Serialize the batch to JSON
+                var telemetryRequest = new TelemetryRequest();
+                telemetryRequest.UploadTime = 
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+                telemetryRequest.ProtoLogs = telemetryBatch.Select(x => 
JsonSerializer.Serialize(x)).ToList();
+                request.Content = new 
StringContent(JsonSerializer.Serialize(telemetryRequest));
+                
+                // Set headers
+                request.Headers.Accept.Add(new 
MediaTypeWithQualityHeaderValue("application/json"));
+                request.Content.Headers.ContentType = new 
MediaTypeHeaderValue("application/json");
+                if(_accessToken != null)
+                {
+                    request.Headers.Authorization = new 
AuthenticationHeaderValue("Bearer", _accessToken);
+                }
+
+                var response = await _httpClient.SendAsync(request);
+                return response.IsSuccessStatusCode;
+            }
+            catch (Exception ex)
+            {
+                // Log the exception but don't throw to prevent telemetry 
failures from affecting main functionality
+                System.Diagnostics.Debug.WriteLine($"Failed to send telemetry: 
{ex.Message}");
+                return false;
+            }
+        }
+
+        /// <summary>
+        /// Sends a single telemetry event asynchronously
+        /// </summary>
+        /// <param name="telemetryEvent">Single telemetry event to send</param>
+        /// <returns>Task representing the async operation</returns>
+        public async Task<bool> SendTelemetryAsync(TelemetryFrontendLog 
telemetryEvent)

Review Comment:
   This seems to be same as `SendTelemetryBatchAsync` but is not actually used 
anywhere. I don't think you need it.



##########
csharp/src/Drivers/Databricks/Telemetry/Util.cs:
##########
@@ -0,0 +1,107 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Diagnostics;
+using Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.Enums;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
+{
+    internal class Util
+    {
+        private static string DRIVER_VERSION = 
GetAssemblyVersion(typeof(DatabricksConnection));
+
+        private static string DRIVER_NAME = 
GetAssemblyName(typeof(DatabricksConnection));
+
+        private static string GetAssemblyName(Type type) => 
type.Assembly.GetName().Name!;
+
+        private static string GetAssemblyVersion(Type type) => 
FileVersionInfo.GetVersionInfo(type.Assembly.Location).ProductVersion ?? 
string.Empty;
+

Review Comment:
   nit. extra line
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to