This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 9afc04c24 feat(csharp): Implement support for transactions, isolation
level and read-only flag (#1784)
9afc04c24 is described below
commit 9afc04c2405dced49ceafc502faa6df7a9852854
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Mon Apr 29 05:17:33 2024 -0700
feat(csharp): Implement support for transactions, isolation level and
read-only flag (#1784)
Implements support for transactions, isolation level and read-only flags
on imported and exported drivers.
Closes #1782
---
csharp/src/Apache.Arrow.Adbc/AdbcOptions.cs | 77 ++++++++++++
.../src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs | 28 ++++-
.../src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs | 57 +++++++++
csharp/src/Client/AdbcCommand.cs | 5 +-
csharp/src/Client/AdbcConnection.cs | 90 +++++++++++++-
csharp/src/Client/AdbcDataReader.cs | 13 ++-
csharp/src/Client/Properties/AssemblyInfo.cs | 20 ++++
.../Client/DuckDbClientTests.cs | 130 +++++++++++++++++++++
.../test/Apache.Arrow.Adbc.Tests/DuckDbFixture.cs | 17 +++
.../Apache.Arrow.Adbc.Tests/ImportedDuckDbTests.cs | 110 +++++++++++++++++
10 files changed, 536 insertions(+), 11 deletions(-)
diff --git a/csharp/src/Apache.Arrow.Adbc/AdbcOptions.cs
b/csharp/src/Apache.Arrow.Adbc/AdbcOptions.cs
new file mode 100644
index 000000000..8fc261aa2
--- /dev/null
+++ b/csharp/src/Apache.Arrow.Adbc/AdbcOptions.cs
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.Arrow.Adbc
+{
+ public static class AdbcOptions
+ {
+ public const string Enabled = "true";
+ public const string Disabled = "false";
+
+ public const string Autocommit = "adbc.connection.autocommit";
+ public const string ReadOnly = "adbc.connection.readonly";
+ public const string IsolationLevel =
"adbc.connection.transaction.isolation_level";
+
+ public static class IsolationLevels
+ {
+ public const string Default =
"adbc.connection.transaction.isolation.default";
+ public const string ReadUncommitted =
"adbc.connection.transaction.isolation.read_uncommitted";
+ public const string ReadCommitted =
"adbc.connection.transaction.isolation.read_committed";
+ public const string RepeatableRead =
"adbc.connection.transaction.isolation.repeatable_read";
+ public const string Snapshot =
"adbc.connection.transaction.isolation.snapshot";
+ public const string Serializable =
"adbc.connection.transaction.isolation.serializable";
+ public const string Linearizable =
"adbc.connection.transaction.isolation.linearizable";
+ }
+
+ public static string GetEnabled(bool value) => value ? Enabled :
Disabled;
+ public static bool GetEnabled(string value)
+ {
+ if (StringComparer.OrdinalIgnoreCase.Equals(value, Enabled)) {
return true; }
+ if (StringComparer.OrdinalIgnoreCase.Equals(value, Disabled)) {
return false; }
+ throw new NotSupportedException("unknown enabled flag");
+ }
+
+ public static string GetIsolationLevel(IsolationLevel value)
+ {
+ return value switch
+ {
+ Adbc.IsolationLevel.Default => IsolationLevels.Default,
+ Adbc.IsolationLevel.ReadUncommitted =>
IsolationLevels.ReadUncommitted,
+ Adbc.IsolationLevel.ReadCommitted =>
IsolationLevels.ReadCommitted,
+ Adbc.IsolationLevel.RepeatableRead =>
IsolationLevels.RepeatableRead,
+ Adbc.IsolationLevel.Snapshot => IsolationLevels.Snapshot,
+ Adbc.IsolationLevel.Serializable =>
IsolationLevels.Serializable,
+ Adbc.IsolationLevel.Linearizable =>
IsolationLevels.Linearizable,
+ _ => throw new NotSupportedException("unknown isolation
level"),
+ };
+ }
+
+ public static IsolationLevel GetIsolationLevel(string value)
+ {
+ if (StringComparer.OrdinalIgnoreCase.Equals(value,
IsolationLevels.Default)) { return Adbc.IsolationLevel.Default; }
+ if (StringComparer.OrdinalIgnoreCase.Equals(value,
IsolationLevels.ReadUncommitted)) { return Adbc.IsolationLevel.ReadUncommitted;
}
+ if (StringComparer.OrdinalIgnoreCase.Equals(value,
IsolationLevels.ReadCommitted)) { return Adbc.IsolationLevel.ReadCommitted; }
+ if (StringComparer.OrdinalIgnoreCase.Equals(value,
IsolationLevels.RepeatableRead)) { return Adbc.IsolationLevel.RepeatableRead; }
+ if (StringComparer.OrdinalIgnoreCase.Equals(value,
IsolationLevels.Snapshot)) { return Adbc.IsolationLevel.Snapshot; }
+ if (StringComparer.OrdinalIgnoreCase.Equals(value,
IsolationLevels.Serializable)) { return Adbc.IsolationLevel.Serializable; }
+ if (StringComparer.OrdinalIgnoreCase.Equals(value,
IsolationLevels.Linearizable)) { return Adbc.IsolationLevel.Linearizable; }
+ throw new NotSupportedException("unknown isolation level");
+ }
+ }
+}
diff --git a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
index 0a119910f..d36b15f70 100644
--- a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
+++ b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
@@ -856,7 +856,33 @@ namespace Apache.Arrow.Adbc.C
public unsafe void SetOption(byte* name, byte* value)
{
- options[MarshalExtensions.PtrToStringUTF8(name)] =
MarshalExtensions.PtrToStringUTF8(value);
+ string stringName = MarshalExtensions.PtrToStringUTF8(name);
+ string stringValue = MarshalExtensions.PtrToStringUTF8(value);
+
+ if (connection == null)
+ {
+ options[stringName] = stringValue;
+ }
+ else
+ {
+ // TODO: how best to normalize this?
+ if (StringComparer.OrdinalIgnoreCase.Equals(stringName,
AdbcOptions.Autocommit))
+ {
+ connection.AutoCommit =
AdbcOptions.GetEnabled(stringValue);
+ }
+ else if
(StringComparer.OrdinalIgnoreCase.Equals(stringName,
AdbcOptions.IsolationLevel))
+ {
+ connection.IsolationLevel =
AdbcOptions.GetIsolationLevel(stringValue);
+ }
+ else if
(StringComparer.OrdinalIgnoreCase.Equals(stringName, AdbcOptions.ReadOnly))
+ {
+ connection.ReadOnly =
AdbcOptions.GetEnabled(stringValue);
+ }
+ else
+ {
+ connection.SetOption(stringName, stringValue);
+ }
+ }
}
public void Rollback() { this.connection.Rollback(); }
diff --git a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
index 07398b8f9..158417812 100644
--- a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
+++ b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
@@ -200,6 +200,9 @@ namespace Apache.Arrow.Adbc.C
{
private CAdbcDriver _nativeDriver;
private CAdbcConnection _nativeConnection;
+ private bool? _autoCommit;
+ private IsolationLevel? _isolationLevel;
+ private bool? _readOnly;
public AdbcConnectionNative(CAdbcDriver nativeDriver,
CAdbcConnection nativeConnection)
{
@@ -207,6 +210,36 @@ namespace Apache.Arrow.Adbc.C
_nativeConnection = nativeConnection;
}
+ public override bool AutoCommit
+ {
+ get => _autoCommit.Value;
+ set
+ {
+ SetOption(AdbcOptions.Autocommit,
AdbcOptions.GetEnabled(value));
+ _autoCommit = value;
+ }
+ }
+
+ public override IsolationLevel IsolationLevel
+ {
+ get => _isolationLevel.Value;
+ set
+ {
+ SetOption(AdbcOptions.IsolationLevel,
AdbcOptions.GetIsolationLevel(value));
+ _isolationLevel = value;
+ }
+ }
+
+ public override bool ReadOnly
+ {
+ get => _readOnly.Value;
+ set
+ {
+ SetOption(AdbcOptions.ReadOnly,
AdbcOptions.GetEnabled(value));
+ _readOnly = value;
+ }
+ }
+
public unsafe override AdbcStatement CreateStatement()
{
CAdbcStatement nativeStatement = new CAdbcStatement();
@@ -330,6 +363,30 @@ namespace Apache.Arrow.Adbc.C
}
}
}
+
+ public unsafe override void Commit()
+ {
+ using (CallHelper caller = new CallHelper())
+ {
+ caller.Call(_nativeDriver.ConnectionCommit, ref
_nativeConnection);
+ }
+ }
+
+ public unsafe override void Rollback()
+ {
+ using (CallHelper caller = new CallHelper())
+ {
+ caller.Call(_nativeDriver.ConnectionRollback, ref
_nativeConnection);
+ }
+ }
+
+ public unsafe override void SetOption(string key, string value)
+ {
+ using (CallHelper caller = new CallHelper())
+ {
+ caller.Call(_nativeDriver.ConnectionSetOption, ref
_nativeConnection, key, value);
+ }
+ }
}
/// <summary>
diff --git a/csharp/src/Client/AdbcCommand.cs b/csharp/src/Client/AdbcCommand.cs
index e9906813d..e25465b6c 100644
--- a/csharp/src/Client/AdbcCommand.cs
+++ b/csharp/src/Client/AdbcCommand.cs
@@ -169,12 +169,13 @@ namespace Apache.Arrow.Adbc.Client
/// <returns><see cref="AdbcDataReader"/></returns>
public new AdbcDataReader ExecuteReader(CommandBehavior behavior)
{
- switch (behavior)
+ bool closeConnection = (behavior &
CommandBehavior.CloseConnection) != 0;
+ switch (behavior & ~CommandBehavior.CloseConnection)
{
case CommandBehavior.SchemaOnly: // The schema is not known
until a read happens
case CommandBehavior.Default:
QueryResult result = this.ExecuteQuery();
- return new AdbcDataReader(this, result,
this.DecimalBehavior);
+ return new AdbcDataReader(this, result,
this.DecimalBehavior, closeConnection);
default:
throw new InvalidOperationException($"{behavior} is not
supported with this provider");
diff --git a/csharp/src/Client/AdbcConnection.cs
b/csharp/src/Client/AdbcConnection.cs
index 0ac735cca..4e22c3968 100644
--- a/csharp/src/Client/AdbcConnection.cs
+++ b/csharp/src/Client/AdbcConnection.cs
@@ -38,6 +38,7 @@ namespace Apache.Arrow.Adbc.Client
private readonly Dictionary<string, string> adbcConnectionOptions;
private AdbcStatement adbcStatement;
+ private AdbcTransaction currentTransaction;
/// <summary>
/// Overloaded. Intializes an <see cref="AdbcConnection"/>.
@@ -92,6 +93,14 @@ namespace Apache.Arrow.Adbc.Client
this.adbcConnectionOptions = options;
}
+ // For testing
+ internal AdbcConnection(AdbcDriver driver, AdbcDatabase database,
Adbc.AdbcConnection connection)
+ {
+ this.AdbcDriver = driver;
+ this.adbcDatabase = database;
+ this.adbcConnectionInternal = connection;
+ }
+
/// <summary>
/// Creates a new <see cref="AdbcCommand"/>.
/// </summary>
@@ -258,6 +267,65 @@ namespace Apache.Arrow.Adbc.Client
return collection.GetSchema(this.Connection, restrictionValues);
}
+ protected override DbTransaction
BeginDbTransaction(System.Data.IsolationLevel isolationLevel)
+ {
+ if (this.currentTransaction != null) throw new
InvalidOperationException("connection is already enlisted in a transaction");
+
+ this.Connection.AutoCommit = false;
+
+ if (isolationLevel != System.Data.IsolationLevel.Unspecified)
+ {
+ this.Connection.IsolationLevel =
GetIsolationLevel(isolationLevel);
+ }
+
+ this.currentTransaction = new AdbcTransaction(this,
isolationLevel);
+ return this.currentTransaction;
+ }
+
+ private static Adbc.IsolationLevel
GetIsolationLevel(System.Data.IsolationLevel isolationLevel)
+ {
+ return isolationLevel switch
+ {
+ System.Data.IsolationLevel.Unspecified =>
Adbc.IsolationLevel.Default,
+ System.Data.IsolationLevel.ReadUncommitted =>
Adbc.IsolationLevel.ReadUncommitted,
+ System.Data.IsolationLevel.ReadCommitted =>
Adbc.IsolationLevel.ReadCommitted,
+ System.Data.IsolationLevel.RepeatableRead =>
Adbc.IsolationLevel.RepeatableRead,
+ System.Data.IsolationLevel.Snapshot =>
Adbc.IsolationLevel.Snapshot,
+ System.Data.IsolationLevel.Serializable =>
Adbc.IsolationLevel.Serializable,
+ _ => throw new NotSupportedException("unknown isolation
level"),
+ };
+ }
+
+ private void Commit()
+ {
+ if (this.currentTransaction == null) throw new
InvalidOperationException("connection is not enlisted in a transaction");
+ System.Data.IsolationLevel isolationLevel =
this.currentTransaction.IsolationLevel;
+
+ this.Connection.Commit();
+
+ this.currentTransaction = null;
+ this.Connection.AutoCommit = true;
+ if (isolationLevel != System.Data.IsolationLevel.Unspecified)
+ {
+ this.adbcConnectionInternal.IsolationLevel =
IsolationLevel.Default;
+ }
+ }
+
+ private void Rollback()
+ {
+ if (this.currentTransaction == null) throw new
InvalidOperationException("connection is not enlisted in a transaction");
+ System.Data.IsolationLevel isolationLevel =
this.currentTransaction.IsolationLevel;
+
+ this.Connection.Rollback();
+
+ this.currentTransaction = null;
+ this.Connection.AutoCommit = true;
+ if (isolationLevel != System.Data.IsolationLevel.Unspecified)
+ {
+ this.adbcConnectionInternal.IsolationLevel =
IsolationLevel.Default;
+ }
+ }
+
#region NOT_IMPLEMENTED
public override string Database => throw new NotImplementedException();
@@ -271,12 +339,26 @@ namespace Apache.Arrow.Adbc.Client
throw new NotImplementedException();
}
- protected override DbTransaction
BeginDbTransaction(System.Data.IsolationLevel isolationLevel)
+ #endregion
+
+ sealed class AdbcTransaction : DbTransaction
{
- throw new NotImplementedException();
- }
+ readonly AdbcConnection connection;
+ readonly System.Data.IsolationLevel isolationLevel;
- #endregion
+ public AdbcTransaction(AdbcConnection connection,
System.Data.IsolationLevel isolationLevel)
+ {
+ this.connection = connection;
+ this.isolationLevel = isolationLevel;
+ }
+
+ public override System.Data.IsolationLevel IsolationLevel =>
this.isolationLevel;
+
+ protected override DbConnection DbConnection => this.connection;
+
+ public override void Commit() => this.connection.Commit();
+ public override void Rollback() => this.connection.Rollback();
+ }
abstract class SchemaCollection
{
diff --git a/csharp/src/Client/AdbcDataReader.cs
b/csharp/src/Client/AdbcDataReader.cs
index 552ae8128..fe12dd858 100644
--- a/csharp/src/Client/AdbcDataReader.cs
+++ b/csharp/src/Client/AdbcDataReader.cs
@@ -36,14 +36,15 @@ namespace Apache.Arrow.Adbc.Client
public sealed class AdbcDataReader : DbDataReader, IDbColumnSchemaGenerator
{
private readonly AdbcCommand adbcCommand;
+ private readonly bool closeConnection;
private QueryResult adbcQueryResult;
private RecordBatch recordBatch;
private int currentRowInRecordBatch;
private Schema schema = null;
private bool isClosed;
- private int recordsEffected = -1;
+ private int recordsAffected = -1;
- internal AdbcDataReader(AdbcCommand adbcCommand, QueryResult
adbcQueryResult, DecimalBehavior decimalBehavior)
+ internal AdbcDataReader(AdbcCommand adbcCommand, QueryResult
adbcQueryResult, DecimalBehavior decimalBehavior, bool closeConnection)
{
if (adbcCommand == null)
throw new ArgumentNullException(nameof(adbcCommand));
@@ -58,6 +59,7 @@ namespace Apache.Arrow.Adbc.Client
if (this.schema == null)
throw new ArgumentException("A Schema must be set for the
AdbcQueryResult.Stream property");
+ this.closeConnection = closeConnection;
this.isClosed = false;
this.DecimalBehavior = decimalBehavior;
}
@@ -81,7 +83,7 @@ namespace Apache.Arrow.Adbc.Client
public DecimalBehavior DecimalBehavior { get; set; }
- public override int RecordsAffected => this.recordsEffected;
+ public override int RecordsAffected => this.recordsAffected;
/// <summary>
/// The total number of record batches in the result.
@@ -90,7 +92,10 @@ namespace Apache.Arrow.Adbc.Client
public override void Close()
{
- this.adbcCommand?.Connection?.Close();
+ if (this.closeConnection)
+ {
+ this.adbcCommand?.Connection?.Close();
+ }
this.adbcQueryResult = null;
this.isClosed = true;
}
diff --git a/csharp/src/Client/Properties/AssemblyInfo.cs
b/csharp/src/Client/Properties/AssemblyInfo.cs
new file mode 100644
index 000000000..ccb1fe5c7
--- /dev/null
+++ b/csharp/src/Client/Properties/AssemblyInfo.cs
@@ -0,0 +1,20 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests,
PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
diff --git a/csharp/test/Apache.Arrow.Adbc.Tests/Client/DuckDbClientTests.cs
b/csharp/test/Apache.Arrow.Adbc.Tests/Client/DuckDbClientTests.cs
new file mode 100644
index 000000000..915fef37d
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Adbc.Tests/Client/DuckDbClientTests.cs
@@ -0,0 +1,130 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using Apache.Arrow.Adbc.Client;
+using Xunit;
+
+namespace Apache.Arrow.Adbc.Tests.Client
+{
+ public class DuckDbClientTests : IClassFixture<DuckDbFixture>
+ {
+ readonly DuckDbFixture _duckDb;
+
+ public DuckDbClientTests(DuckDbFixture duckDb)
+ {
+ _duckDb = duckDb;
+ }
+
+ [Fact]
+ public void SimpleEndToEndTest()
+ {
+ using var connection = _duckDb.CreateConnection("clienttest.db",
null);
+ connection.Open();
+ var command = connection.CreateCommand();
+
+ command.CommandText = "CREATE TABLE integers(foo INTEGER, bar
INTEGER);";
+ int count = command.ExecuteNonQuery();
+
+ command.CommandText = "INSERT INTO integers VALUES (3, 4), (5, 6),
(7, 8);";
+ count = command.ExecuteNonQuery();
+
+ command.CommandText = "SELECT * from integers";
+ using var reader = command.ExecuteReader();
+
+ var schema = reader.GetSchemaTable();
+ Assert.Equal(2, schema.Rows.Count);
+ Assert.Equal(typeof(int), schema.Rows[0].ItemArray[2]);
+ Assert.Equal(typeof(int), schema.Rows[1].ItemArray[2]);
+
+ Assert.True(reader.Read());
+ Assert.Equal(3, reader.GetInt32(0));
+ Assert.True(reader.Read());
+ Assert.Equal(5, reader.GetInt32(0));
+ Assert.True(reader.Read());
+ Assert.Equal(7, reader.GetInt32(0));
+ Assert.False(reader.Read());
+ }
+
+ [Fact]
+ public void TransactionsTest()
+ {
+ using var connection =
_duckDb.CreateConnection("clienttransactions.db", null);
+ connection.Open();
+ var command = connection.CreateCommand();
+
+ command.CommandText = "CREATE TABLE test(column1 INTEGER);";
+ int count = command.ExecuteNonQuery();
+
+ var transaction = connection.BeginTransaction();
+
+ // Insert into connection1
+ command.CommandText = "INSERT INTO test VALUES (3), (5), (7);";
+ command.ExecuteUpdate();
+ Assert.Equal(3, GetResultCount(command, "SELECT * from test"));
+
+ // Validate that we don't see the data on connection2
+ using var connection2 =
_duckDb.CreateConnection("clienttransactions.db", null);
+ connection2.Open();
+ var command2 = connection2.CreateCommand();
+ Assert.Equal(0, GetResultCount(command2, "SELECT * from test"));
+
+ // ... until after a commit on connection1
+ transaction.Commit();
+ Assert.Equal(3, GetResultCount(command2, "SELECT * from test"));
+
+ // When not in a transaction, we immediately see the results on
another connection
+ command.CommandText = "INSERT INTO test VALUES (2), (4);";
+ command.ExecuteUpdate();
+ Assert.Equal(5, GetResultCount(command2, "SELECT * from test"));
+
+ transaction = connection.BeginTransaction();
+
+ // Now you see it...
+ command.CommandText = "INSERT INTO test VALUES (6);";
+ command.ExecuteUpdate();
+ Assert.Equal(6, GetResultCount(command, "SELECT * from test"));
+
+ // Now you don't
+ transaction.Rollback();
+ Assert.Equal(5, GetResultCount(command, "SELECT * from test"));
+ }
+
+ [Fact]
+ public void SetIsolationLevelFails()
+ {
+ using var connection =
_duckDb.CreateConnection("clientisolation.db", null);
+ connection.Open();
+
+ Assert.Throws<AdbcException>(() =>
+ {
+
connection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted);
+ });
+ }
+
+ private static long GetResultCount(AdbcCommand command, string query)
+ {
+ command.CommandText = "SELECT * from test";
+ using var reader = command.ExecuteReader();
+ long count = 0;
+ while (reader.Read())
+ {
+ count++;
+ }
+ return count;
+ }
+ }
+}
diff --git a/csharp/test/Apache.Arrow.Adbc.Tests/DuckDbFixture.cs
b/csharp/test/Apache.Arrow.Adbc.Tests/DuckDbFixture.cs
index 99925325d..07e525ca7 100644
--- a/csharp/test/Apache.Arrow.Adbc.Tests/DuckDbFixture.cs
+++ b/csharp/test/Apache.Arrow.Adbc.Tests/DuckDbFixture.cs
@@ -20,12 +20,14 @@ using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using Apache.Arrow.Adbc.C;
+using DbClient = Apache.Arrow.Adbc.Client;
namespace Apache.Arrow.Adbc.Tests
{
public class DuckDbFixture : IDisposable
{
readonly string _dataDirectory;
+ readonly Dictionary<string, AdbcDatabase> _databases;
AdbcDriver _driver;
public DuckDbFixture()
@@ -33,6 +35,8 @@ namespace Apache.Arrow.Adbc.Tests
_dataDirectory = Path.Combine(Path.GetTempPath(),
"AdbcTest_DuckDb", Guid.NewGuid().ToString("D"));
Directory.CreateDirectory(_dataDirectory);
+ _databases = new Dictionary<string,
AdbcDatabase>(StringComparer.OrdinalIgnoreCase);
+
string root = Directory.GetCurrentDirectory();
string file;
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
@@ -52,6 +56,19 @@ namespace Apache.Arrow.Adbc.Tests
return _driver.Open(new Dictionary<string, string> { { "path",
Path.Combine(_dataDirectory, name) } });
}
+ public DbClient.AdbcConnection CreateConnection(string name,
IReadOnlyDictionary<string, string> connectionOptions)
+ {
+ AdbcDatabase database;
+ if (!_databases.TryGetValue(name, out database))
+ {
+ database = OpenDatabase(name);
+ _databases[name] = database;
+ }
+
+ var connection = database.Connect(connectionOptions);
+ return new DbClient.AdbcConnection(_driver, database, connection);
+ }
+
public void Dispose()
{
if (_driver != null)
diff --git a/csharp/test/Apache.Arrow.Adbc.Tests/ImportedDuckDbTests.cs
b/csharp/test/Apache.Arrow.Adbc.Tests/ImportedDuckDbTests.cs
index 2bb67777f..084addde3 100644
--- a/csharp/test/Apache.Arrow.Adbc.Tests/ImportedDuckDbTests.cs
+++ b/csharp/test/Apache.Arrow.Adbc.Tests/ImportedDuckDbTests.cs
@@ -61,5 +61,115 @@ namespace Apache.Arrow.Adbc.Tests
var secondBatch = stream.ReadNextRecordBatchAsync().Result;
Assert.Null(secondBatch);
}
+
+ [Fact]
+ public void TransactionsTest()
+ {
+ using var database = _duckDb.OpenDatabase("transactions.db");
+ using var connection = database.Connect(null);
+ using var statement = connection.CreateStatement();
+
+ statement.SqlQuery = "CREATE TABLE test(column1 INTEGER);";
+ statement.ExecuteUpdate();
+
+ connection.AutoCommit = false;
+
+ // Insert into connection1
+ statement.SqlQuery = "INSERT INTO test VALUES (3), (5), (7);";
+ statement.ExecuteUpdate();
+ Assert.Equal(3, GetResultCount(statement, "SELECT * from test"));
+
+ // Validate that we don't see the data on connection2
+ using var connection2 = database.Connect(null);
+ using var statement2 = connection2.CreateStatement();
+ Assert.Equal(0, GetResultCount(statement2, "SELECT * from test"));
+
+ // ... until after a commit on connection1
+ connection.Commit();
+ Assert.Equal(3, GetResultCount(statement2, "SELECT * from test"));
+
+ connection.AutoCommit = true;
+
+ // With AutoCommit on, we immediately see the results on another
connection
+ statement.SqlQuery = "INSERT INTO test VALUES (2), (4);";
+ statement.ExecuteUpdate();
+ Assert.Equal(5, GetResultCount(statement2, "SELECT * from test"));
+
+ connection.AutoCommit = false;
+
+ // Now you see it...
+ statement.SqlQuery = "INSERT INTO test VALUES (6);";
+ statement.ExecuteUpdate();
+ Assert.Equal(6, GetResultCount(statement, "SELECT * from test"));
+
+ // Now you don't
+ connection.Rollback();
+ Assert.Equal(5, GetResultCount(statement, "SELECT * from test"));
+ }
+
+ [Fact(Skip = "DuckDb doesn't support ADBC readonly option yet")]
+ public void ReadOnlyTest()
+ {
+ using var database = _duckDb.OpenDatabase("readonly.db");
+ using var connection = database.Connect(null);
+ using var statement = connection.CreateStatement();
+
+ statement.SqlQuery = "CREATE TABLE test(column1 INTEGER);";
+ statement.ExecuteUpdate();
+
+ connection.ReadOnly = true;
+
+ AdbcException exception = Assert.Throws<AdbcException>(() =>
+ {
+ statement.SqlQuery = "INSERT INTO test VALUES (3), (5), (7);";
+ statement.ExecuteUpdate();
+ });
+
+ connection.ReadOnly = false;
+
+ statement.SqlQuery = "INSERT INTO test VALUES (2), (4);";
+ statement.ExecuteUpdate();
+ Assert.Equal(2, GetResultCount(statement, "SELECT * from test"));
+ }
+
+ [Fact]
+ public void ReadOnlyFails()
+ {
+ using var database = _duckDb.OpenDatabase("readonly.db");
+ using var connection = database.Connect(null);
+
+ Assert.Throws<AdbcException>(() =>
+ {
+ connection.ReadOnly = true;
+ });
+ }
+
+ [Fact]
+ public void SetIsolationLevelFails()
+ {
+ using var database = _duckDb.OpenDatabase("isolation.db");
+ using var connection = database.Connect(null);
+
+ Assert.Throws<AdbcException>(() =>
+ {
+ connection.IsolationLevel = IsolationLevel.Default;
+ });
+ }
+
+ private static long GetResultCount(AdbcStatement statement, string
query)
+ {
+ statement.SqlQuery = "SELECT * from test";
+ var results = statement.ExecuteQuery();
+ long count = 0;
+ using (var stream = results.Stream)
+ {
+ RecordBatch batch;
+ while ((batch = stream.ReadNextRecordBatchAsync().Result) !=
null)
+ {
+ count += batch.Length;
+ }
+ }
+ return count;
+ }
}
}