CurtHagenlocher commented on code in PR #2312:
URL: https://github.com/apache/arrow-adbc/pull/2312#discussion_r1868110983


##########
csharp/src/Client/AdbcCommand.cs:
##########
@@ -114,10 +115,31 @@ public override CommandType CommandType
             }
         }
 
+        /// <summary>
+        /// Gets or setts the name of the command timeout property for the 
underlying ADBC driver.

Review Comment:
   nit: `sets`
   
   That said, I feel like this is a dubious design choice. If there were a 
standard for command timeout properties in ADBC then we could hook into that 
from the wrapper. But if the consumer needs to have custom code anyway in order 
to set the command timeout, then I think we're better off just exposing the 
ability to set statement options through DbCommand -- it's equally usable for 
this case, and more generally useful for other unanticipated cases.



##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs:
##########
@@ -88,11 +114,11 @@ internal async Task OpenAsync()
 
         protected internal HiveServer2TlsOption TlsOptions { get; set; } = 
HiveServer2TlsOption.Empty;
 
-        protected internal int HttpRequestTimeout { get; set; } = 30000;
+        protected internal int ConnectTimeoutMilliseconds { get; set; } = 
ConnectTimeoutMillisecondDefault;
 
-        protected abstract Task<TTransport> CreateTransportAsync();
+        protected abstract Task<TTransport> 
CreateTransportAsync(CancellationToken cancellationToken = default);

Review Comment:
   It doesn't look like any of the implementations of `CreateTransportAsync` 
make use of the `CancellationToken`. Is this aspirational? `TTransport.Open` 
isn't called until `CreateProtocolAsync`.
   
   Hmm... it looks like `TSocketTransport` itself isn't very async-friendly :(. 
It's doing DNS lookup and opening sockets in its constructors.



##########
csharp/src/Drivers/Apache/ApacheParameters.cs:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Arrow.Adbc.Drivers.Apache
+{
+    /// <summary>
+    /// Options common to all Apache drivers.
+    /// </summary>
+    public class ApacheParameters
+        {

Review Comment:
   nit: indentation looks off



##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs:
##########
@@ -89,19 +90,32 @@ public HiveServer2Reader(
                 return null;
             }
 
-            // Await the fetch response
-            TFetchResultsResp response = await FetchNext(_statement, 
cancellationToken);
+            try
+            {
+                // Await the fetch response
+                TFetchResultsResp response = await FetchNext(_statement, 
cancellationToken);
+
+                int columnCount = GetColumnCount(response);
+                int rowCount = GetRowCount(response, columnCount);
+                if ((_statement.BatchSize > 0 && rowCount < 
_statement.BatchSize) || rowCount == 0)
+                {
+                    // This is the last batch
+                    _statement = null;
+                }
 
-            int columnCount = GetColumnCount(response);
-            int rowCount = GetRowCount(response, columnCount);
-            if ((_statement.BatchSize > 0 && rowCount < _statement.BatchSize) 
|| rowCount == 0)
+                // Build the current batch, if any data exists
+                return rowCount > 0 ? CreateBatch(response, columnCount, 
rowCount) : null;
+            }
+            catch (Exception ex)
+                when (ApacheUtility.ContainsException(ex, out 
OperationCanceledException? _) ||
+                     (ApacheUtility.ContainsException(ex, out 
TTransportException? _) && cancellationToken.IsCancellationRequested))
             {
-                // This is the last batch
-                _statement = null;
+                throw new TimeoutException("The metadata query execution timed 
out. Consider increasing the query timeout value.", ex);

Review Comment:
   How do we know this is a metadata query execution?



##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -32,33 +33,89 @@ protected HiveServer2Statement(HiveServer2Connection 
connection)
 
         protected virtual void SetStatementProperties(TExecuteStatementReq 
statement)
         {
+            statement.QueryTimeout = QueryTimeoutSeconds;
         }
 
-        public override QueryResult ExecuteQuery() => 
ExecuteQueryAsync().AsTask().Result;
+        public override QueryResult ExecuteQuery()
+        {
+            CancellationToken cancellationToken = 
ApacheUtility.GetCancellationToken(QueryTimeoutSeconds, 
ApacheUtility.TimeUnit.Seconds);
+            try
+            {
+                return ExecuteQueryAsyncInternal(cancellationToken).Result;
+            }
+            catch (Exception ex)
+                when (ApacheUtility.ContainsException(ex, out 
OperationCanceledException? _) ||
+                     (ApacheUtility.ContainsException(ex, out 
TTransportException? _) && cancellationToken.IsCancellationRequested))
+            {
+                throw new TimeoutException("The metadata query execution timed 
out. Consider increasing the query timeout value.", ex);

Review Comment:
   (same comment about "metadata query")



##########
csharp/src/Drivers/Apache/Apache.Arrow.Adbc.Drivers.Apache.csproj:
##########
@@ -6,6 +6,7 @@
 
   <ItemGroup>
     <PackageReference Include="ApacheThrift" Version="0.21.0" />
+    <PackageReference Include="System.Net.Http" Version="4.3.4" />

Review Comment:
   I've already updated the repo separately to unbreak the build.



##########
csharp/test/Drivers/Apache/Spark/ClientTests.cs:
##########
@@ -216,6 +217,28 @@ public void VerifySchemaTables()
             }
         }
 
+        [SkippableFact]
+        public void VerifyTimeoutsSet()
+        {
+            using (Adbc.Client.AdbcConnection adbcConnection = 
GetAdbcConnection())
+            {
+                int timeout = 99;
+                AdbcCommand cmd = adbcConnection.CreateCommand();
+
+                // setting the timout before the property value
+                Assert.Throws<InvalidOperationException>(() =>
+                {
+                    cmd.CommandTimeout = 1;
+                });
+
+                cmd.CommandTimeoutProperty = 
"adbc.apache.statement.query_timeout_s";
+                cmd.CommandTimeout = timeout;
+
+                Assert.True(cmd.CommandTimeout == timeout, $"ConnectionTimeout 
is not set to {timeout}");
+

Review Comment:
   nit: extra blank line



##########
csharp/src/Drivers/Apache/ApacheUtility.cs:
##########
@@ -0,0 +1,141 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Threading;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache
+{
+    internal class ApacheUtility
+    {
+        internal const int QueryTimeoutSecondsDefault = 60;
+
+        public enum TimeUnit
+        {
+            Seconds,
+            Milliseconds
+        }
+
+        public static CancellationToken GetCancellationToken(int timeout, 
TimeUnit timeUnit)
+        {
+            TimeSpan span;
+
+            if (timeout == 0 || timeout == int.MaxValue)
+            {
+                // the max TimeSpan for CancellationTokenSource is 
int.MaxValue in milliseconds (not TimeSpan.MaxValue)
+                // no matter what the unit is
+                span = TimeSpan.FromMilliseconds(int.MaxValue);
+            }
+            else
+            {
+                if (timeUnit == TimeUnit.Seconds)
+                {
+                    span = TimeSpan.FromSeconds(timeout);
+                }
+                else
+                {
+                    span = TimeSpan.FromMilliseconds(timeout);
+                }
+            }
+
+            return GetCancellationToken(span);
+        }
+
+        private static CancellationToken GetCancellationToken(TimeSpan 
timeSpan)
+        {
+            var cts = new CancellationTokenSource(timeSpan);
+            return cts.Token;
+        }
+
+        public static bool QueryTimeoutIsValid(string key, string value, out 
int queryTimeoutSeconds)
+        {
+            if (!string.IsNullOrEmpty(value) && int.TryParse(value, out int 
queryTimeout) && (queryTimeout >= 0))
+            {
+                queryTimeoutSeconds = queryTimeout;
+                return true;
+            }
+            else
+            {
+                throw new ArgumentOutOfRangeException(key, value, $"The value 
'{value}' for option '{key}' is invalid. Must be a numeric value of 0 
(infinite) or greater.");
+            }
+        }
+
+        public static bool ContainsException<T>(Exception? exception, out T? 
containedException) where T : Exception
+        {
+            if (exception is AggregateException aggregateException)
+            {
+                foreach (Exception? ex in aggregateException.InnerExceptions)
+                {
+                    if (ex is T ce)
+                    {
+                        containedException = ce;
+                        return true;
+                    }
+                }
+            }
+
+            Exception? e = exception;
+            while (e != null)
+            {
+                if (e is T ce)
+                {
+                    containedException = ce;
+                    return true;
+                }
+                e = e.InnerException;
+            }
+
+            containedException = null;
+            return false;
+        }
+
+        public static bool ContainsException(Exception? exception, Type? 
exceptionType, out Exception? containedException)

Review Comment:
   Under what circumstances can the input `exception` or the input 
`exceptionType` be null?



##########
csharp/src/Drivers/Apache/ApacheUtility.cs:
##########
@@ -0,0 +1,141 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Threading;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache
+{
+    internal class ApacheUtility
+    {
+        internal const int QueryTimeoutSecondsDefault = 60;
+
+        public enum TimeUnit
+        {
+            Seconds,
+            Milliseconds
+        }
+
+        public static CancellationToken GetCancellationToken(int timeout, 
TimeUnit timeUnit)
+        {
+            TimeSpan span;
+
+            if (timeout == 0 || timeout == int.MaxValue)
+            {
+                // the max TimeSpan for CancellationTokenSource is 
int.MaxValue in milliseconds (not TimeSpan.MaxValue)
+                // no matter what the unit is
+                span = TimeSpan.FromMilliseconds(int.MaxValue);
+            }
+            else
+            {
+                if (timeUnit == TimeUnit.Seconds)
+                {
+                    span = TimeSpan.FromSeconds(timeout);
+                }
+                else
+                {
+                    span = TimeSpan.FromMilliseconds(timeout);
+                }
+            }
+
+            return GetCancellationToken(span);
+        }
+
+        private static CancellationToken GetCancellationToken(TimeSpan 
timeSpan)
+        {
+            var cts = new CancellationTokenSource(timeSpan);
+            return cts.Token;
+        }
+
+        public static bool QueryTimeoutIsValid(string key, string value, out 
int queryTimeoutSeconds)
+        {
+            if (!string.IsNullOrEmpty(value) && int.TryParse(value, out int 
queryTimeout) && (queryTimeout >= 0))
+            {
+                queryTimeoutSeconds = queryTimeout;
+                return true;
+            }
+            else
+            {
+                throw new ArgumentOutOfRangeException(key, value, $"The value 
'{value}' for option '{key}' is invalid. Must be a numeric value of 0 
(infinite) or greater.");
+            }
+        }
+
+        public static bool ContainsException<T>(Exception? exception, out T? 
containedException) where T : Exception
+        {
+            if (exception is AggregateException aggregateException)
+            {
+                foreach (Exception? ex in aggregateException.InnerExceptions)
+                {
+                    if (ex is T ce)
+                    {
+                        containedException = ce;
+                        return true;
+                    }
+                }
+            }
+
+            Exception? e = exception;
+            while (e != null)
+            {
+                if (e is T ce)
+                {
+                    containedException = ce;
+                    return true;
+                }
+                e = e.InnerException;
+            }
+
+            containedException = null;
+            return false;
+        }
+
+        public static bool ContainsException(Exception? exception, Type? 
exceptionType, out Exception? containedException)
+        {
+            if ( exception == null || exceptionType == null)

Review Comment:
   nit: extra space after parenthesis
   
   If you're using Visual Studio, you can trigger the VS autoformat by doing 
Control-K Control-D.



##########
csharp/src/Drivers/Apache/Spark/SparkConnection.cs:
##########
@@ -420,26 +419,42 @@ public override IArrowArrayStream GetTableTypes()
                 SessionHandle = SessionHandle ?? throw new 
InvalidOperationException("session not created"),
                 GetDirectResults = sparkGetDirectResults
             };
-            TGetTableTypesResp resp = Client.GetTableTypes(req).Result;
-            if (resp.Status.StatusCode == TStatusCode.ERROR_STATUS)
+
+            CancellationToken cancellationToken = 
ApacheUtility.GetCancellationToken(QueryTimeoutSeconds, 
ApacheUtility.TimeUnit.Seconds);
+            try
             {
-                throw new HiveServer2Exception(resp.Status.ErrorMessage)
-                    .SetNativeError(resp.Status.ErrorCode)
-                    .SetSqlState(resp.Status.SqlState);
-            }
+                TGetTableTypesResp resp = Client.GetTableTypes(req, 
cancellationToken).Result;
 
-            TRowSet rowSet = GetRowSetAsync(resp).Result;
-            StringArray tableTypes = rowSet.Columns[0].StringVal.Values;
+                if (resp.Status.StatusCode == TStatusCode.ERROR_STATUS)
+                {
+                    throw new HiveServer2Exception(resp.Status.ErrorMessage)
+                        .SetNativeError(resp.Status.ErrorCode)
+                        .SetSqlState(resp.Status.SqlState);
+                }
 
-            StringArray.Builder tableTypesBuilder = new StringArray.Builder();
-            tableTypesBuilder.AppendRange(tableTypes);
+                TRowSet rowSet = GetRowSetAsync(resp, 
cancellationToken).Result;
+                StringArray tableTypes = rowSet.Columns[0].StringVal.Values;
 
-            IArrowArray[] dataArrays = new IArrowArray[]
-            {
+                StringArray.Builder tableTypesBuilder = new 
StringArray.Builder();
+                tableTypesBuilder.AppendRange(tableTypes);
+
+                IArrowArray[] dataArrays = new IArrowArray[]
+                {
                 tableTypesBuilder.Build()
-            };
+                };
 
-            return new SparkInfoArrowStream(StandardSchemas.TableTypesSchema, 
dataArrays);
+                return new 
SparkInfoArrowStream(StandardSchemas.TableTypesSchema, dataArrays);
+            }
+            catch (Exception ex)
+                when (ApacheUtility.ContainsException(ex, out 
OperationCanceledException? _) ||
+                     (ApacheUtility.ContainsException(ex, out 
TTransportException? _) && cancellationToken.IsCancellationRequested))
+            {
+                throw new TimeoutException("The metadata query execution timed 
out. Consider increasing the query timeout value.", ex);
+            }
+            catch (Exception ex) when (ex is not HiveServer2Exception)
+            {
+                throw new HiveServer2Exception($"An unexpected error occurred 
while running metadata query. '{ex.Message}'", ex);
+            }

Review Comment:
   I'll stop pointing these out; just search globally for `metadata query` and 
fix as appropriate in places where we don't specifically know we're running a 
metadata query.



##########
csharp/src/Drivers/Apache/ApacheUtility.cs:
##########
@@ -0,0 +1,141 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Threading;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache
+{
+    internal class ApacheUtility
+    {
+        internal const int QueryTimeoutSecondsDefault = 60;
+
+        public enum TimeUnit
+        {
+            Seconds,
+            Milliseconds
+        }
+
+        public static CancellationToken GetCancellationToken(int timeout, 
TimeUnit timeUnit)
+        {
+            TimeSpan span;
+
+            if (timeout == 0 || timeout == int.MaxValue)
+            {
+                // the max TimeSpan for CancellationTokenSource is 
int.MaxValue in milliseconds (not TimeSpan.MaxValue)
+                // no matter what the unit is
+                span = TimeSpan.FromMilliseconds(int.MaxValue);
+            }
+            else
+            {
+                if (timeUnit == TimeUnit.Seconds)
+                {
+                    span = TimeSpan.FromSeconds(timeout);
+                }
+                else
+                {
+                    span = TimeSpan.FromMilliseconds(timeout);
+                }
+            }
+
+            return GetCancellationToken(span);
+        }
+
+        private static CancellationToken GetCancellationToken(TimeSpan 
timeSpan)
+        {
+            var cts = new CancellationTokenSource(timeSpan);
+            return cts.Token;
+        }
+
+        public static bool QueryTimeoutIsValid(string key, string value, out 
int queryTimeoutSeconds)
+        {
+            if (!string.IsNullOrEmpty(value) && int.TryParse(value, out int 
queryTimeout) && (queryTimeout >= 0))
+            {
+                queryTimeoutSeconds = queryTimeout;
+                return true;
+            }
+            else
+            {
+                throw new ArgumentOutOfRangeException(key, value, $"The value 
'{value}' for option '{key}' is invalid. Must be a numeric value of 0 
(infinite) or greater.");
+            }
+        }
+
+        public static bool ContainsException<T>(Exception? exception, out T? 
containedException) where T : Exception

Review Comment:
   Under what circumstances can the input `exception` be null?



##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs:
##########
@@ -30,7 +30,7 @@ internal abstract class HiveServer2Connection : AdbcConnection
     {
         internal const long BatchSizeDefault = 50000;
         internal const int PollTimeMillisecondsDefault = 500;
-
+        private const int ConnectTimeoutMillisecondDefault = 30000;

Review Comment:
   nit: `ConnectTimeoutMillisecondsDefault` for consistency with 
`PollTimeMillisecondsDefault`



##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -32,33 +33,89 @@ protected HiveServer2Statement(HiveServer2Connection 
connection)
 
         protected virtual void SetStatementProperties(TExecuteStatementReq 
statement)
         {
+            statement.QueryTimeout = QueryTimeoutSeconds;
         }
 
-        public override QueryResult ExecuteQuery() => 
ExecuteQueryAsync().AsTask().Result;
+        public override QueryResult ExecuteQuery()
+        {
+            CancellationToken cancellationToken = 
ApacheUtility.GetCancellationToken(QueryTimeoutSeconds, 
ApacheUtility.TimeUnit.Seconds);
+            try
+            {
+                return ExecuteQueryAsyncInternal(cancellationToken).Result;
+            }
+            catch (Exception ex)
+                when (ApacheUtility.ContainsException(ex, out 
OperationCanceledException? _) ||
+                     (ApacheUtility.ContainsException(ex, out 
TTransportException? _) && cancellationToken.IsCancellationRequested))
+            {
+                throw new TimeoutException("The metadata query execution timed 
out. Consider increasing the query timeout value.", ex);
+            }
+            catch (Exception ex) when (ex is not HiveServer2Exception)
+            {
+                throw new HiveServer2Exception($"An unexpected error occurred 
while fetching results. '{ex.Message}'", ex);
+            }
+        }
 
-        public override UpdateResult ExecuteUpdate() => 
ExecuteUpdateAsync().Result;
+        public override UpdateResult ExecuteUpdate()
+        {
+            CancellationToken cancellationToken = 
ApacheUtility.GetCancellationToken(QueryTimeoutSeconds, 
ApacheUtility.TimeUnit.Seconds);
+            try
+            {
+                return ExecuteUpdateAsyncInternal(cancellationToken).Result;
+            }
+            catch (Exception ex)
+                when (ApacheUtility.ContainsException(ex, out 
OperationCanceledException? _) ||
+                     (ApacheUtility.ContainsException(ex, out 
TTransportException? _) && cancellationToken.IsCancellationRequested))
+            {
+                throw new TimeoutException("The metadata query execution timed 
out. Consider increasing the query timeout value.", ex);

Review Comment:
   (again)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to