This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new a7c1cc6b2 feat(csharp/src/Drivers/Apache): add implementation for
AdbcStatement.SetOption on Spark driver (#1849)
a7c1cc6b2 is described below
commit a7c1cc6b2c70ed085af00693cbb110ab7f43c7b5
Author: Bruce Irschick <[email protected]>
AuthorDate: Mon May 13 13:10:16 2024 -0700
feat(csharp/src/Drivers/Apache): add implementation for
AdbcStatement.SetOption on Spark driver (#1849)
Implement AdbcStatement.SetOption on Spark driver
* `"adbc.statement.polltime_milliseconds"` -> sets the poll time to
check for results to execute a statement.
* `"adbc.statement.batch_size"` -> sets the maximum size of a single
batch to receive.
---
.../Drivers/Apache/Hive2/HiveServer2Statement.cs | 37 ++++++-
.../src/Drivers/Apache/Impala/ImpalaStatement.cs | 8 ++
csharp/src/Drivers/Apache/Spark/SparkStatement.cs | 8 ++
csharp/test/Drivers/Apache/Spark/StatementTests.cs | 106 +++++++++++++++++++++
4 files changed, 157 insertions(+), 2 deletions(-)
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
index cc4abb964..4879bdecb 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
@@ -91,6 +91,21 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
return new UpdateResult(affectedRows ?? -1);
}
+ public override void SetOption(string key, string value)
+ {
+ switch (key)
+ {
+ case Options.PollTimeMilliseconds:
+ UpdatePollTimeIfValid(key, value);
+ break;
+ case Options.BatchSize:
+ UpdateBatchSizeIfValid(key, value);
+ break;
+ default:
+ throw AdbcException.NotImplemented($"Option '{key}' is not
implemented.");
+ }
+ }
+
protected async Task ExecuteStatementAsync()
{
TExecuteStatementReq executeRequest = new
TExecuteStatementReq(this.connection.sessionHandle, this.SqlQuery);
@@ -123,9 +138,27 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
return SchemaParser.GetArrowSchema(response.Schema);
}
- protected internal int PollTimeMilliseconds { get; } =
PollTimeMillisecondsDefault;
+ protected internal int PollTimeMilliseconds { get; private set; } =
PollTimeMillisecondsDefault;
+
+ protected internal int BatchSize { get; private set; } =
BatchSizeDefault;
+
+ /// <summary>
+ /// Provides the constant string key values to the <see
cref="AdbcStatement.SetOption(string, string)" /> method.
+ /// </summary>
+ public class Options
+ {
+ // Options common to all HiveServer2Statement-derived drivers go
here
+ public const string PollTimeMilliseconds =
"adbc.statement.polltime_milliseconds";
+ public const string BatchSize = "adbc.statement.batch_size";
+ }
+
+ private void UpdatePollTimeIfValid(string key, string value) =>
PollTimeMilliseconds = !string.IsNullOrEmpty(key) && int.TryParse(value,
result: out int pollTimeMilliseconds) && pollTimeMilliseconds >= 0
+ ? pollTimeMilliseconds
+ : throw new ArgumentException($"The value '{value}' for option
'{key}' is invalid. Must be a numeric value greater than or equal to zero.",
nameof(value));
- protected internal int BatchSize { get; } = BatchSizeDefault;
+ private void UpdateBatchSizeIfValid(string key, string value) =>
BatchSize = !string.IsNullOrEmpty(value) && int.TryParse(value, out int
batchSize) && batchSize > 0
+ ? batchSize
+ : throw new ArgumentException($"The value '{value}' for option
'{key}' is invalid. Must be a numeric value greater than zero.", nameof(value));
public override void Dispose()
{
diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaStatement.cs
b/csharp/src/Drivers/Apache/Impala/ImpalaStatement.cs
index 5fe408c38..17b15b952 100644
--- a/csharp/src/Drivers/Apache/Impala/ImpalaStatement.cs
+++ b/csharp/src/Drivers/Apache/Impala/ImpalaStatement.cs
@@ -42,6 +42,14 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
protected override IArrowArrayStream NewReader<T>(T statement, Schema
schema) => new HiveServer2Reader(statement, schema);
+ /// <summary>
+ /// Provides the constant string key values to the <see
cref="AdbcStatement.SetOption(string, string)" /> method.
+ /// </summary>
+ public new sealed class Options : HiveServer2Statement.Options
+ {
+ // options specific to Impala go here
+ }
+
class HiveServer2Reader : IArrowArrayStream
{
HiveServer2Statement? statement;
diff --git a/csharp/src/Drivers/Apache/Spark/SparkStatement.cs
b/csharp/src/Drivers/Apache/Spark/SparkStatement.cs
index d36c355c8..4ab67bdca 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkStatement.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkStatement.cs
@@ -57,6 +57,14 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
protected override IArrowArrayStream NewReader<T>(T statement, Schema
schema) => new SparkReader(statement, schema);
+ /// <summary>
+ /// Provides the constant string key values to the <see
cref="AdbcStatement.SetOption(string, string)" /> method.
+ /// </summary>
+ public new sealed class Options : HiveServer2Statement.Options
+ {
+ // options specific to Spark go here
+ }
+
sealed class SparkReader : IArrowArrayStream
{
HiveServer2Statement? statement;
diff --git a/csharp/test/Drivers/Apache/Spark/StatementTests.cs
b/csharp/test/Drivers/Apache/Spark/StatementTests.cs
new file mode 100644
index 000000000..6a1e4c00a
--- /dev/null
+++ b/csharp/test/Drivers/Apache/Spark/StatementTests.cs
@@ -0,0 +1,106 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Spark;
+using Apache.Arrow.Adbc.Tests.Xunit;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
+{
+ /// <summary>
+ /// Class for testing the Snowflake ADBC driver connection tests.
+ /// </summary>
+ /// <remarks>
+ /// Tests are ordered to ensure data is created for the other
+ /// queries to run.
+ /// </remarks>
+ [TestCaseOrderer("Apache.Arrow.Adbc.Tests.Xunit.TestOrderer",
"Apache.Arrow.Adbc.Tests")]
+ public class StatementTests : SparkTestBase
+ {
+ private static List<string> DefaultTableTypes => new() { "BASE TABLE",
"VIEW" };
+
+ public StatementTests(ITestOutputHelper? outputHelper) :
base(outputHelper)
+ {
+ Skip.IfNot(Utils.CanExecuteTestConfig(TestConfigVariable));
+ }
+
+ /// <summary>
+ /// Validates if the SetOption handle valid/invalid data correctly for
the PollTime option.
+ /// </summary>
+ [SkippableTheory]
+ [InlineData("-1", true)]
+ [InlineData("zero", true)]
+ [InlineData("-2147483648", true)]
+ [InlineData("2147483648", true)]
+ [InlineData("0")]
+ [InlineData("1")]
+ [InlineData("2147483647")]
+ public void CanSetOptionPollTime(string value, bool throws = false)
+ {
+ AdbcStatement statement = NewConnection().CreateStatement();
+ if (throws)
+ {
+ Assert.Throws<ArgumentException>(() =>
statement.SetOption(SparkStatement.Options.PollTimeMilliseconds, value));
+ }
+ else
+ {
+
statement.SetOption(SparkStatement.Options.PollTimeMilliseconds, value);
+ }
+ }
+
+ /// <summary>
+ /// Validates if the SetOption handle valid/invalid data correctly for
the BatchSize option.
+ /// </summary>
+ [SkippableTheory]
+ [InlineData("-1", true)]
+ [InlineData("one", true)]
+ [InlineData("-2147483648", true)]
+ [InlineData("2147483648", true)]
+ [InlineData("0", true)]
+ [InlineData("1")]
+ [InlineData("2147483647")]
+ public void CanSetOptionBatchSize(string value, bool throws = false)
+ {
+ AdbcStatement statement = NewConnection().CreateStatement();
+ if (throws)
+ {
+ Assert.Throws<ArgumentException>(() =>
statement.SetOption(SparkStatement.Options.BatchSize, value));
+ }
+ else
+ {
+ statement.SetOption(SparkStatement.Options.BatchSize, value);
+ }
+ }
+
+ /// <summary>
+ /// Validates if the driver can execute update statements.
+ /// </summary>
+ [SkippableFact, Order(1)]
+ public async Task CanInteractUsingSetOptions()
+ {
+ const string columnName = "INDEX";
+ Statement.SetOption(SparkStatement.Options.PollTimeMilliseconds,
"100");
+ Statement.SetOption(SparkStatement.Options.BatchSize, "10");
+ using TemporaryTable temporaryTable = await
NewTemporaryTableAsync(Statement, $"{columnName} INT");
+ await
ValidateInsertSelectDeleteSingleValueAsync(temporaryTable.TableName,
columnName, 1);
+ }
+ }
+}