This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new a48c3b050 feat(csharp): imported objects should have call "release"
when no longer in use (#1802)
a48c3b050 is described below
commit a48c3b0500e6d80c383f57fbf6a9c907ca584a80
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Wed May 1 09:04:03 2024 -0700
feat(csharp): imported objects should have call "release" when no longer in
use (#1802)
Closes #1780
---
.../src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs | 105 ++-----
.../src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs | 345 ++++++++++++++++-----
csharp/src/Apache.Arrow.Adbc/C/Delegates.cs | 47 +++
csharp/src/Apache.Arrow.Adbc/C/NativeDelegate.cs | 14 +-
4 files changed, 361 insertions(+), 150 deletions(-)
diff --git a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
index 9034c37e3..7352e3480 100644
--- a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
+++ b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
@@ -27,10 +27,7 @@ namespace Apache.Arrow.Adbc.C
{
public class CAdbcDriverExporter
{
- internal unsafe delegate void ErrorRelease(CAdbcError* error);
private static unsafe readonly NativeDelegate<ErrorRelease>
s_releaseError = new NativeDelegate<ErrorRelease>(ReleaseError);
- internal unsafe delegate AdbcStatusCode DatabaseFn(CAdbcDatabase*
database, CAdbcError* error);
- internal unsafe delegate AdbcStatusCode ConnectionFn(CAdbcConnection*
connection, CAdbcError* error);
#if NET5_0_OR_GREATER
private static unsafe delegate* unmanaged<CAdbcError*, void>
ReleaseErrorPtr => (delegate* unmanaged<CAdbcError*,
void>)s_releaseError.Pointer;
@@ -63,79 +60,35 @@ namespace Apache.Arrow.Adbc.C
private static unsafe delegate* unmanaged<CAdbcStatement*, byte*, int,
CAdbcError*, AdbcStatusCode> StatementSetSubstraitPlanPtr =>
&SetStatementSubstraitPlan;
private static unsafe delegate* unmanaged<CAdbcStatement*,
CArrowSchema*, CAdbcError*, AdbcStatusCode> StatementGetParameterSchemaPtr =>
&GetStatementParameterSchema;
#else
- private static IntPtr ReleaseErrorPtr => s_releaseError.Pointer;
- internal unsafe delegate AdbcStatusCode DriverRelease(CAdbcDriver*
driver, CAdbcError* error);
- private static unsafe readonly NativeDelegate<DriverRelease>
s_releaseDriver = new NativeDelegate<DriverRelease>(ReleaseDriver);
- private static IntPtr ReleaseDriverPtr => s_releaseDriver.Pointer;
- internal unsafe delegate void PartitionsRelease(CAdbcPartitions*
partitions);
- private static unsafe readonly NativeDelegate<PartitionsRelease>
s_releasePartitions = new NativeDelegate<PartitionsRelease>(ReleasePartitions);
- private static IntPtr ReleasePartitionsPtr =>
s_releasePartitions.Pointer;
-
- private static unsafe readonly NativeDelegate<DatabaseFn>
s_databaseInit = new NativeDelegate<DatabaseFn>(InitDatabase);
- private static IntPtr DatabaseInitPtr => s_databaseInit.Pointer;
- private static unsafe readonly NativeDelegate<DatabaseFn>
s_databaseRelease = new NativeDelegate<DatabaseFn>(ReleaseDatabase);
- private static IntPtr DatabaseReleasePtr => s_databaseRelease.Pointer;
- internal unsafe delegate AdbcStatusCode
DatabaseSetOption(CAdbcDatabase* database, byte* name, byte* value, CAdbcError*
error);
- private static unsafe readonly NativeDelegate<DatabaseSetOption>
s_databaseSetOption = new NativeDelegate<DatabaseSetOption>(SetDatabaseOption);
- private static IntPtr DatabaseSetOptionPtr =>
s_databaseSetOption.Pointer;
-
- internal unsafe delegate AdbcStatusCode
ConnectionGetObjects(CAdbcConnection* connection, int depth, byte* catalog,
byte* db_schema, byte* table_name, byte** table_type, byte* column_name,
CArrowArrayStream* stream, CAdbcError* error);
- private static unsafe readonly NativeDelegate<ConnectionGetObjects>
s_connectionGetObjects = new
NativeDelegate<ConnectionGetObjects>(GetConnectionObjects);
- private static IntPtr ConnectionGetObjectsPtr =>
s_connectionGetObjects.Pointer;
- internal unsafe delegate AdbcStatusCode
ConnectionGetTableSchema(CAdbcConnection* connection, byte* catalog, byte*
db_schema, byte* table_name, CArrowSchema* schema, CAdbcError* error);
- private static unsafe readonly
NativeDelegate<ConnectionGetTableSchema> s_connectionGetTableSchema = new
NativeDelegate<ConnectionGetTableSchema>(GetConnectionTableSchema);
- private static IntPtr ConnectionGetTableSchemaPtr =>
s_connectionGetTableSchema.Pointer;
- internal unsafe delegate AdbcStatusCode
ConnectionGetTableTypes(CAdbcConnection* connection, CArrowArrayStream* stream,
CAdbcError* error);
- private static unsafe readonly NativeDelegate<ConnectionGetTableTypes>
s_connectionGetTableTypes = new
NativeDelegate<ConnectionGetTableTypes>(GetConnectionTableTypes);
- private static IntPtr ConnectionGetTableTypesPtr =>
s_connectionGetTableTypes.Pointer;
- internal unsafe delegate AdbcStatusCode
ConnectionInit(CAdbcConnection* connection, CAdbcDatabase* database,
CAdbcError* error);
- private static unsafe readonly NativeDelegate<ConnectionInit>
s_connectionInit = new NativeDelegate<ConnectionInit>(InitConnection);
- private static IntPtr ConnectionInitPtr => s_connectionInit.Pointer;
- private static unsafe readonly NativeDelegate<ConnectionFn>
s_connectionRollback = new NativeDelegate<ConnectionFn>(RollbackConnection);
- private static IntPtr ConnectionRollbackPtr =>
s_connectionRollback.Pointer;
- private static unsafe readonly NativeDelegate<ConnectionFn>
s_connectionCommit = new NativeDelegate<ConnectionFn>(CommitConnection);
- private static IntPtr ConnectionCommitPtr =>
s_connectionCommit.Pointer;
- private static unsafe readonly NativeDelegate<ConnectionFn>
s_connectionRelease = new NativeDelegate<ConnectionFn>(ReleaseConnection);
- private static IntPtr ConnectionReleasePtr =>
s_connectionRelease.Pointer;
- internal unsafe delegate AdbcStatusCode
ConnectionGetInfo(CAdbcConnection* connection, int* info_codes, int
info_codes_length, CArrowArrayStream* stream, CAdbcError* error);
- private static unsafe readonly NativeDelegate<ConnectionGetInfo>
s_connectionGetInfo = new NativeDelegate<ConnectionGetInfo>(GetConnectionInfo);
- private static IntPtr ConnectionGetInfoPtr =>
s_connectionGetInfo.Pointer;
- private unsafe delegate AdbcStatusCode
ConnectionReadPartition(CAdbcConnection* connection, byte*
serialized_partition, int serialized_length, CArrowArrayStream* stream,
CAdbcError* error);
- private static unsafe readonly NativeDelegate<ConnectionReadPartition>
s_connectionReadPartition = new
NativeDelegate<ConnectionReadPartition>(ReadConnectionPartition);
- private static IntPtr ConnectionReadPartitionPtr =>
s_connectionReadPartition.Pointer;
- internal unsafe delegate AdbcStatusCode
ConnectionSetOption(CAdbcConnection* connection, byte* name, byte* value,
CAdbcError* error);
- private static unsafe readonly NativeDelegate<ConnectionSetOption>
s_connectionSetOption = new
NativeDelegate<ConnectionSetOption>(SetConnectionOption);
- private static IntPtr ConnectionSetOptionPtr =>
s_connectionSetOption.Pointer;
-
- internal unsafe delegate AdbcStatusCode StatementBind(CAdbcStatement*
statement, CArrowArray* array, CArrowSchema* schema, CAdbcError* error);
- private static unsafe readonly NativeDelegate<StatementBind>
s_statementBind = new NativeDelegate<StatementBind>(BindStatement);
- private static IntPtr StatementBindPtr => s_statementBind.Pointer;
- internal unsafe delegate AdbcStatusCode
StatementBindStream(CAdbcStatement* statement, CArrowArrayStream* stream,
CAdbcError* error);
- private static unsafe readonly NativeDelegate<StatementBindStream>
s_statementBindStream = new
NativeDelegate<StatementBindStream>(BindStreamStatement);
- private static IntPtr StatementBindStreamPtr =>
s_statementBindStream.Pointer;
- internal unsafe delegate AdbcStatusCode
StatementExecuteQuery(CAdbcStatement* statement, CArrowArrayStream* stream,
long* rows, CAdbcError* error);
- private static unsafe readonly NativeDelegate<StatementExecuteQuery>
s_statementExecuteQuery = new
NativeDelegate<StatementExecuteQuery>(ExecuteStatementQuery);
- private static IntPtr StatementExecuteQueryPtr =
s_statementExecuteQuery.Pointer;
- internal unsafe delegate AdbcStatusCode
StatementExecutePartitions(CAdbcStatement* statement, CArrowSchema* schema,
CAdbcPartitions* partitions, long* rows, CAdbcError* error);
- private static unsafe readonly
NativeDelegate<StatementExecutePartitions> s_statementExecutePartitions = new
NativeDelegate<StatementExecutePartitions>(ExecuteStatementPartitions);
- private static IntPtr StatementExecutePartitionsPtr =
s_statementExecutePartitions.Pointer;
- internal unsafe delegate AdbcStatusCode StatementNew(CAdbcConnection*
connection, CAdbcStatement* statement, CAdbcError* error);
- private static unsafe readonly NativeDelegate<StatementNew>
s_statementNew = new NativeDelegate<StatementNew>(NewStatement);
- private static IntPtr StatementNewPtr => s_statementNew.Pointer;
- internal unsafe delegate AdbcStatusCode StatementFn(CAdbcStatement*
statement, CAdbcError* error);
- private static unsafe readonly NativeDelegate<StatementFn>
s_statementRelease = new NativeDelegate<StatementFn>(ReleaseStatement);
- private static IntPtr StatementReleasePtr =>
s_statementRelease.Pointer;
- private static unsafe readonly NativeDelegate<StatementFn>
s_statementPrepare = new NativeDelegate<StatementFn>(PrepareStatement);
- private static IntPtr StatementPreparePtr =>
s_statementPrepare.Pointer;
- internal unsafe delegate AdbcStatusCode
StatementSetSqlQuery(CAdbcStatement* statement, byte* text, CAdbcError* error);
- private static unsafe readonly NativeDelegate<StatementSetSqlQuery>
s_statementSetSqlQuery = new
NativeDelegate<StatementSetSqlQuery>(SetStatementSqlQuery);
- private static IntPtr StatementSetSqlQueryPtr =
s_statementSetSqlQuery.Pointer;
- internal unsafe delegate AdbcStatusCode
StatementSetSubstraitPlan(CAdbcStatement* statement, byte* plan, int length,
CAdbcError* error);
- private static unsafe readonly
NativeDelegate<StatementSetSubstraitPlan> s_statementSetSubstraitPlan = new
NativeDelegate<StatementSetSubstraitPlan>(SetStatementSubstraitPlan);
- private static IntPtr StatementSetSubstraitPlanPtr =
s_statementSetSubstraitPlan.Pointer;
- internal unsafe delegate AdbcStatusCode
StatementGetParameterSchema(CAdbcStatement* statement, CArrowSchema* schema,
CAdbcError* error);
- private static unsafe readonly
NativeDelegate<StatementGetParameterSchema> s_statementGetParameterSchema = new
NativeDelegate<StatementGetParameterSchema>(GetStatementParameterSchema);
- private static IntPtr StatementGetParameterSchemaPtr =
s_statementGetParameterSchema.Pointer;
+ private static unsafe IntPtr ReleaseErrorPtr => s_releaseError.Pointer;
+ private static unsafe IntPtr ReleaseDriverPtr =
NativeDelegate<DriverRelease>.AsNativePointer(ReleaseDriver);
+ private static unsafe IntPtr ReleasePartitionsPtr =
NativeDelegate<PartitionsRelease>.AsNativePointer(ReleasePartitions);
+
+ private static unsafe IntPtr DatabaseInitPtr =
NativeDelegate<DatabaseFn>.AsNativePointer(InitDatabase);
+ private static unsafe IntPtr DatabaseReleasePtr =
NativeDelegate<DatabaseFn>.AsNativePointer(ReleaseDatabase);
+ private static unsafe IntPtr DatabaseSetOptionPtr =
NativeDelegate<DatabaseSetOption>.AsNativePointer(SetDatabaseOption);
+
+ private static unsafe IntPtr ConnectionGetObjectsPtr =
NativeDelegate<ConnectionGetObjects>.AsNativePointer(GetConnectionObjects);
+ private static unsafe IntPtr ConnectionGetTableSchemaPtr =
NativeDelegate<ConnectionGetTableSchema>.AsNativePointer(GetConnectionTableSchema);
+ private static unsafe IntPtr ConnectionGetTableTypesPtr =
NativeDelegate<ConnectionGetTableTypes>.AsNativePointer(GetConnectionTableTypes);
+ private static unsafe IntPtr ConnectionInitPtr =
NativeDelegate<ConnectionInit>.AsNativePointer(InitConnection);
+ private static unsafe IntPtr ConnectionRollbackPtr =
NativeDelegate<ConnectionFn>.AsNativePointer(RollbackConnection);
+ private static unsafe IntPtr ConnectionCommitPtr =
NativeDelegate<ConnectionFn>.AsNativePointer(CommitConnection);
+ private static unsafe IntPtr ConnectionReleasePtr =
NativeDelegate<ConnectionFn>.AsNativePointer(ReleaseConnection);
+ private static unsafe IntPtr ConnectionGetInfoPtr =
NativeDelegate<ConnectionGetInfo>.AsNativePointer(GetConnectionInfo);
+ private static unsafe IntPtr ConnectionReadPartitionPtr =
NativeDelegate<ConnectionReadPartition>.AsNativePointer(ReadConnectionPartition);
+ private static unsafe IntPtr ConnectionSetOptionPtr =
NativeDelegate<ConnectionSetOption>.AsNativePointer(SetConnectionOption);
+
+ private static unsafe IntPtr StatementBindPtr =
NativeDelegate<StatementBind>.AsNativePointer(BindStatement);
+ private static unsafe IntPtr StatementBindStreamPtr =
NativeDelegate<StatementBindStream>.AsNativePointer(BindStreamStatement);
+ private static unsafe IntPtr StatementExecuteQueryPtr =
NativeDelegate<StatementExecuteQuery>.AsNativePointer(ExecuteStatementQuery);
+ private static unsafe IntPtr StatementExecutePartitionsPtr =
NativeDelegate<StatementExecutePartitions>.AsNativePointer(ExecuteStatementPartitions);
+ private static unsafe IntPtr StatementNewPtr =
NativeDelegate<StatementNew>.AsNativePointer(NewStatement);
+ private static unsafe IntPtr StatementReleasePtr =
NativeDelegate<StatementFn>.AsNativePointer(ReleaseStatement);
+ private static unsafe IntPtr StatementPreparePtr =
NativeDelegate<StatementFn>.AsNativePointer(PrepareStatement);
+ private static unsafe IntPtr StatementSetSqlQueryPtr =
NativeDelegate<StatementSetSqlQuery>.AsNativePointer(SetStatementSqlQuery);
+ private static unsafe IntPtr StatementSetSubstraitPlanPtr =
NativeDelegate<StatementSetSubstraitPlan>.AsNativePointer(SetStatementSubstraitPlan);
+ private static unsafe IntPtr StatementGetParameterSchemaPtr =
NativeDelegate<StatementGetParameterSchema>.AsNativePointer(GetStatementParameterSchema);
#endif
public unsafe static AdbcStatusCode AdbcDriverInit(int version,
CAdbcDriver* nativeDriver, CAdbcError* error, AdbcDriver driver)
diff --git a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
index a76d150ec..c8a61cf44 100644
--- a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
+++ b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
@@ -20,6 +20,7 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
+using System.Threading;
using Apache.Arrow.Adbc.Extensions;
using Apache.Arrow.C;
using Apache.Arrow.Ipc;
@@ -88,15 +89,55 @@ namespace Apache.Arrow.Adbc.C
/// <summary>
/// Native implementation of <see cref="AdbcDriver"/>
/// </summary>
- sealed class ImportedAdbcDriver : AdbcDriver
+ internal sealed class ImportedAdbcDriver : AdbcDriver
{
private IntPtr _library;
private CAdbcDriver _nativeDriver;
+ private int _references;
+ private bool _disposed;
public ImportedAdbcDriver(IntPtr library, CAdbcDriver nativeDriver)
{
_library = library;
_nativeDriver = nativeDriver;
+ _references = 1;
+ }
+
+ ~ImportedAdbcDriver()
+ {
+ Dispose(false);
+ }
+
+ internal unsafe ref CAdbcDriver Driver
+ {
+ get
+ {
+ if (_disposed) { throw new
ObjectDisposedException(nameof(ImportedAdbcDriver)); }
+ return ref _nativeDriver;
+ }
+ }
+
+ internal ImportedAdbcDriver AddReference()
+ {
+ Interlocked.Increment(ref _references);
+ return this;
+ }
+
+ internal unsafe void RemoveReference()
+ {
+ if (Interlocked.Decrement(ref _references) == 0)
+ {
+ if (_nativeDriver.release != default)
+ {
+ using (CallHelper caller = new CallHelper())
+ {
+ caller.Call(_nativeDriver.release, ref
_nativeDriver);
+ }
+
+ NativeLibrary.Free(_library);
+ _library = IntPtr.Zero;
+ }
+ }
}
/// <summary>
@@ -113,39 +154,35 @@ namespace Apache.Arrow.Adbc.C
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.DatabaseNew, ref nativeDatabase);
+ caller.Call(Driver.DatabaseNew, ref nativeDatabase);
foreach (KeyValuePair<string, string> pair in parameters)
{
- caller.Call(_nativeDriver.DatabaseSetOption, ref
nativeDatabase, pair.Key, pair.Value);
+ caller.Call(Driver.DatabaseSetOption, ref
nativeDatabase, pair.Key, pair.Value);
}
- caller.Call(_nativeDriver.DatabaseInit, ref
nativeDatabase);
+ caller.Call(Driver.DatabaseInit, ref nativeDatabase);
}
- return new AdbcDatabaseNative(_nativeDriver, nativeDatabase);
+ return new ImportedAdbcDatabase(this, nativeDatabase);
}
public unsafe override void Dispose()
{
- if (_nativeDriver.release != default)
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ private void Dispose(bool disposing)
+ {
+ if (!_disposed)
{
- using (CallHelper caller = new CallHelper())
+ _disposed = true;
+ RemoveReference();
+ if (disposing)
{
- try
- {
- caller.Call(_nativeDriver.release, ref
_nativeDriver);
- }
- finally
- {
- _nativeDriver.release = default;
- }
+ base.Dispose();
}
-
- NativeLibrary.Free(_library);
- _library = IntPtr.Zero;
-
- base.Dispose();
}
}
}
@@ -153,62 +190,121 @@ namespace Apache.Arrow.Adbc.C
/// <summary>
/// A native implementation of <see cref="AdbcDatabase"/>
/// </summary>
- internal sealed class AdbcDatabaseNative : AdbcDatabase
+ internal sealed class ImportedAdbcDatabase : AdbcDatabase
{
- private CAdbcDriver _nativeDriver;
+ private readonly ImportedAdbcDriver _driver;
private CAdbcDatabase _nativeDatabase;
+ private bool _disposed;
- public AdbcDatabaseNative(CAdbcDriver nativeDriver, CAdbcDatabase
nativeDatabase)
+ internal ImportedAdbcDatabase(ImportedAdbcDriver driver,
CAdbcDatabase nativeDatabase)
{
- _nativeDriver = nativeDriver;
+ _driver = driver.AddReference();
_nativeDatabase = nativeDatabase;
}
+ ~ImportedAdbcDatabase()
+ {
+ Dispose(false);
+ }
+
+ private unsafe ref CAdbcDriver Driver
+ {
+ get
+ {
+ if (_disposed) { throw new
ObjectDisposedException(nameof(ImportedAdbcDatabase)); }
+ return ref _driver.Driver;
+ }
+ }
+
public unsafe override AdbcConnection
Connect(IReadOnlyDictionary<string, string>? options)
{
CAdbcConnection nativeConnection = new CAdbcConnection();
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.ConnectionNew, ref
nativeConnection);
+ caller.Call(Driver.ConnectionNew, ref nativeConnection);
if (options != null)
{
foreach (KeyValuePair<string, string> pair in options)
{
- caller.Call(_nativeDriver.ConnectionSetOption, ref
nativeConnection, pair.Key, pair.Value);
+ caller.Call(Driver.ConnectionSetOption, ref
nativeConnection, pair.Key, pair.Value);
}
}
- caller.Call(_nativeDriver.ConnectionInit, ref
nativeConnection, ref _nativeDatabase);
+ caller.Call(Driver.ConnectionInit, ref nativeConnection,
ref _nativeDatabase);
}
- return new AdbcConnectionNative(_nativeDriver,
nativeConnection);
+ return new ImportedAdbcConnection(_driver, nativeConnection);
}
public override void Dispose()
{
- base.Dispose();
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ private unsafe void Dispose(bool disposing)
+ {
+ if (!_disposed)
+ {
+ _disposed = true;
+
+ try
+ {
+ if (_nativeDatabase.private_data != default)
+ {
+ using (CallHelper caller = new CallHelper())
+ {
+ caller.Call(_driver.Driver.DatabaseRelease,
ref _nativeDatabase);
+ }
+ }
+ }
+ finally
+ {
+ _nativeDatabase.private_data = default;
+ _driver.RemoveReference();
+ }
+ if (disposing)
+ {
+ base.Dispose();
+ }
+ }
}
}
/// <summary>
/// A native implementation of <see cref="AdbcConnection"/>
/// </summary>
- internal sealed class AdbcConnectionNative : AdbcConnection
+ internal sealed class ImportedAdbcConnection : AdbcConnection
{
- private CAdbcDriver _nativeDriver;
+ private readonly ImportedAdbcDriver _driver;
private CAdbcConnection _nativeConnection;
+ private bool _disposed;
private bool? _autoCommit;
private IsolationLevel? _isolationLevel;
private bool? _readOnly;
- public AdbcConnectionNative(CAdbcDriver nativeDriver,
CAdbcConnection nativeConnection)
+ internal ImportedAdbcConnection(ImportedAdbcDriver driver,
CAdbcConnection nativeConnection)
{
- _nativeDriver = nativeDriver;
+ _driver = driver.AddReference();
_nativeConnection = nativeConnection;
}
+ ~ImportedAdbcConnection()
+ {
+ Dispose(false);
+ }
+
+ private unsafe ref CAdbcDriver Driver
+ {
+ get
+ {
+ if (_disposed) { throw new
ObjectDisposedException(nameof(ImportedAdbcConnection)); }
+ return ref _driver.Driver;
+ }
+ }
+
public override bool AutoCommit
{
get => _autoCommit ?? throw AdbcException.NotImplemented("no
value has been set for AutoCommit");
@@ -249,15 +345,15 @@ namespace Apache.Arrow.Adbc.C
{
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.StatementNew
+ Driver.StatementNew
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementNew>(_nativeDriver.StatementNew)
+
Marshal.GetDelegateForFunctionPointer<StatementNew>(Driver.StatementNew)
#endif
(connection, &nativeStatement, &caller._error));
}
}
- return new AdbcStatementNative(_nativeDriver, nativeStatement);
+ return new ImportedAdbcStatement(_driver, nativeStatement);
}
public unsafe override IArrowArrayStream
GetInfo(IReadOnlyList<AdbcInfoCode> codes)
@@ -270,9 +366,9 @@ namespace Apache.Arrow.Adbc.C
{
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.ConnectionGetInfo
+ Driver.ConnectionGetInfo
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetInfo>(_nativeDriver.ConnectionGetInfo)
+
Marshal.GetDelegateForFunctionPointer<ConnectionGetInfo>(Driver.ConnectionGetInfo)
#endif
(connection, (int*)spanPtr, codes.Count,
caller.CreateStream(), &caller._error));
return caller.ImportStream();
@@ -306,9 +402,9 @@ namespace Apache.Arrow.Adbc.C
{
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.ConnectionGetObjects
+ Driver.ConnectionGetObjects
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetObjects>(_nativeDriver.ConnectionGetObjects)
+
Marshal.GetDelegateForFunctionPointer<ConnectionGetObjects>(Driver.ConnectionGetObjects)
#endif
(connection, (int)depth, utf8Catalog,
utf8Schema, utf8Table, utf8TableTypes, utf8Column, caller.CreateStream(),
&caller._error));
return caller.ImportStream();
@@ -333,9 +429,9 @@ namespace Apache.Arrow.Adbc.C
{
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.ConnectionGetTableTypes
+ Driver.ConnectionGetTableTypes
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetTableTypes>(_nativeDriver.ConnectionGetTableTypes)
+
Marshal.GetDelegateForFunctionPointer<ConnectionGetTableTypes>(Driver.ConnectionGetTableTypes)
#endif
(connection, caller.CreateStream(),
&caller._error));
return caller.ImportStream();
@@ -354,9 +450,9 @@ namespace Apache.Arrow.Adbc.C
{
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.ConnectionGetTableSchema
+ Driver.ConnectionGetTableSchema
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetTableSchema>(_nativeDriver.ConnectionGetTableSchema)
+
Marshal.GetDelegateForFunctionPointer<ConnectionGetTableSchema>(Driver.ConnectionGetTableSchema)
#endif
(connection, utf8Catalog, utf8Schema, utf8Table,
caller.CreateSchema(), &caller._error));
return caller.ImportSchema();
@@ -368,7 +464,7 @@ namespace Apache.Arrow.Adbc.C
{
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.ConnectionCommit, ref
_nativeConnection);
+ caller.Call(Driver.ConnectionCommit, ref
_nativeConnection);
}
}
@@ -376,7 +472,7 @@ namespace Apache.Arrow.Adbc.C
{
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.ConnectionRollback, ref
_nativeConnection);
+ caller.Call(Driver.ConnectionRollback, ref
_nativeConnection);
}
}
@@ -384,7 +480,41 @@ namespace Apache.Arrow.Adbc.C
{
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.ConnectionSetOption, ref
_nativeConnection, key, value);
+ caller.Call(Driver.ConnectionSetOption, ref
_nativeConnection, key, value);
+ }
+ }
+
+ public override void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ private unsafe void Dispose(bool disposing)
+ {
+ if (!_disposed)
+ {
+ _disposed = true;
+
+ try
+ {
+ if (_nativeConnection.private_data != default)
+ {
+ using (CallHelper caller = new CallHelper())
+ {
+ caller.Call(_driver.Driver.ConnectionRelease,
ref _nativeConnection);
+ }
+ }
+ }
+ finally
+ {
+ _nativeConnection.private_data = default;
+ _driver.RemoveReference();
+ }
+ if (disposing)
+ {
+ base.Dispose();
+ }
}
}
}
@@ -392,18 +522,33 @@ namespace Apache.Arrow.Adbc.C
/// <summary>
/// A native implementation of <see cref="AdbcStatement"/>
/// </summary>
- sealed class AdbcStatementNative : AdbcStatement
+ internal sealed class ImportedAdbcStatement : AdbcStatement
{
- private CAdbcDriver _nativeDriver;
+ private ImportedAdbcDriver _driver;
private CAdbcStatement _nativeStatement;
private byte[]? _substraitPlan;
+ private bool _disposed;
- public AdbcStatementNative(CAdbcDriver nativeDriver,
CAdbcStatement nativeStatement)
+ internal ImportedAdbcStatement(ImportedAdbcDriver driver,
CAdbcStatement nativeStatement)
{
- _nativeDriver = nativeDriver;
+ _driver = driver.AddReference();
_nativeStatement = nativeStatement;
}
+ ~ImportedAdbcStatement()
+ {
+ Dispose(false);
+ }
+
+ private unsafe ref CAdbcDriver Driver
+ {
+ get
+ {
+ if (_disposed) { throw new
ObjectDisposedException(nameof(ImportedAdbcStatement)); }
+ return ref _driver.Driver;
+ }
+ }
+
public unsafe override byte[]? SubstraitPlan
{
get => _substraitPlan;
@@ -416,9 +561,9 @@ namespace Apache.Arrow.Adbc.C
{
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.StatementSetSubstraitPlan
+ Driver.StatementSetSubstraitPlan
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementSetSubstraitPlan>(_nativeDriver.StatementSetSubstraitPlan)
+
Marshal.GetDelegateForFunctionPointer<StatementSetSubstraitPlan>(Driver.StatementSetSubstraitPlan)
#endif
(statement, substraitPlan, value?.Length ?? 0,
&caller._error));
}
@@ -438,9 +583,9 @@ namespace Apache.Arrow.Adbc.C
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.StatementBind
+ Driver.StatementBind
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementBind>(_nativeDriver.StatementBind)
+
Marshal.GetDelegateForFunctionPointer<StatementBind>(Driver.StatementBind)
#endif
(statement, caller.Array, caller.Schema,
&caller._error));
@@ -460,9 +605,9 @@ namespace Apache.Arrow.Adbc.C
CArrowArrayStreamExporter.ExportArrayStream(stream,
caller.CreateStream());
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.StatementBindStream
+ Driver.StatementBindStream
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementBindStream>(_nativeDriver.StatementBindStream)
+
Marshal.GetDelegateForFunctionPointer<StatementBindStream>(Driver.StatementBindStream)
#endif
(statement, caller.ArrayStream, &caller._error));
@@ -487,9 +632,9 @@ namespace Apache.Arrow.Adbc.C
long rows = 0;
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.StatementExecuteQuery
+ Driver.StatementExecuteQuery
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementExecuteQuery>(_nativeDriver.StatementExecuteQuery)
+
Marshal.GetDelegateForFunctionPointer<StatementExecuteQuery>(Driver.StatementExecuteQuery)
#endif
(statement, caller.CreateStream(), &rows,
&caller._error));
@@ -513,9 +658,9 @@ namespace Apache.Arrow.Adbc.C
long rows = 0;
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.StatementExecuteQuery
+ Driver.StatementExecuteQuery
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementExecuteQuery>(_nativeDriver.StatementExecuteQuery)
+
Marshal.GetDelegateForFunctionPointer<StatementExecuteQuery>(Driver.StatementExecuteQuery)
#endif
(statement, caller.CreateStream(), &rows,
&caller._error));
@@ -543,9 +688,9 @@ namespace Apache.Arrow.Adbc.C
nativePartitions =
(CAdbcPartitions*)Marshal.AllocHGlobal(sizeof(CAdbcPartitions));
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.StatementExecutePartitions
+ Driver.StatementExecutePartitions
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementExecutePartitions>(_nativeDriver.StatementExecutePartitions)
+
Marshal.GetDelegateForFunctionPointer<StatementExecutePartitions>(Driver.StatementExecutePartitions)
#endif
(statement, caller.CreateSchema(),
nativePartitions, &rowsAffected, &caller._error));
@@ -564,7 +709,7 @@ namespace Apache.Arrow.Adbc.C
#if NET5_0_OR_GREATER
nativePartitions->release(nativePartitions);
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.PartitionsRelease>(nativePartitions->release)(nativePartitions);
+
Marshal.GetDelegateForFunctionPointer<PartitionsRelease>(nativePartitions->release)(nativePartitions);
#endif
}
@@ -577,6 +722,40 @@ namespace Apache.Arrow.Adbc.C
}
}
+ public override void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ private unsafe void Dispose(bool disposing)
+ {
+ if (!_disposed)
+ {
+ _disposed = true;
+
+ try
+ {
+ if (_nativeStatement.private_data != default)
+ {
+ using (CallHelper caller = new CallHelper())
+ {
+ caller.Call(_driver.Driver.StatementRelease,
ref _nativeStatement);
+ }
+ }
+ }
+ finally
+ {
+ _nativeStatement.private_data = default;
+ _driver.RemoveReference();
+ }
+ if (disposing)
+ {
+ base.Dispose();
+ }
+ }
+ }
+
private unsafe void SetSqlQuery(string sqlQuery)
{
fixed (CAdbcStatement* statement = &_nativeStatement)
@@ -586,9 +765,9 @@ namespace Apache.Arrow.Adbc.C
{
caller.TranslateCode(
#if NET5_0_OR_GREATER
- _nativeDriver.StatementSetSqlQuery
+ _driver.Driver.StatementSetSqlQuery
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementSetSqlQuery>(_nativeDriver.StatementSetSqlQuery)
+
Marshal.GetDelegateForFunctionPointer<StatementSetSqlQuery>(_driver.Driver.StatementSetSqlQuery)
#endif
(statement, query, &caller._error));
}
@@ -710,7 +889,7 @@ namespace Apache.Arrow.Adbc.C
fixed (CAdbcDriver* driver = &nativeDriver)
fixed (CAdbcError* e = &_error)
{
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.DriverRelease>(fn)(driver,
e));
+
TranslateCode(Marshal.GetDelegateForFunctionPointer<DriverRelease>(fn)(driver,
e));
}
}
#endif
@@ -730,7 +909,7 @@ namespace Apache.Arrow.Adbc.C
fixed (CAdbcDatabase* db = &nativeDatabase)
fixed (CAdbcError* e = &_error)
{
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.DatabaseFn>(fn)(db,
e));
+
TranslateCode(Marshal.GetDelegateForFunctionPointer<DatabaseFn>(fn)(db, e));
}
}
#endif
@@ -757,7 +936,7 @@ namespace Apache.Arrow.Adbc.C
using (Utf8Helper utf8Key = new Utf8Helper(key))
using (Utf8Helper utf8Value = new Utf8Helper(value))
{
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.DatabaseSetOption>(fn)(db,
utf8Key, utf8Value, e));
+
TranslateCode(Marshal.GetDelegateForFunctionPointer<DatabaseSetOption>(fn)(db,
utf8Key, utf8Value, e));
}
}
}
@@ -778,7 +957,27 @@ namespace Apache.Arrow.Adbc.C
fixed (CAdbcConnection* cn = &nativeConnection)
fixed (CAdbcError* e = &_error)
{
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionFn>(fn)(cn,
e));
+
TranslateCode(Marshal.GetDelegateForFunctionPointer<ConnectionFn>(fn)(cn, e));
+ }
+ }
+#endif
+
+#if NET5_0_OR_GREATER
+ public unsafe void Call(delegate* unmanaged<CAdbcStatement*,
CAdbcError*, AdbcStatusCode> fn, ref CAdbcStatement nativeStatement)
+ {
+ fixed (CAdbcStatement* stmt = &nativeStatement)
+ fixed (CAdbcError* e = &_error)
+ {
+ TranslateCode(fn(stmt, e));
+ }
+ }
+#else
+ public unsafe void Call(IntPtr fn, ref CAdbcStatement
nativeStatement)
+ {
+ fixed (CAdbcStatement* stmt = &nativeStatement)
+ fixed (CAdbcError* e = &_error)
+ {
+
TranslateCode(Marshal.GetDelegateForFunctionPointer<StatementFn>(fn)(stmt, e));
}
}
#endif
@@ -805,7 +1004,7 @@ namespace Apache.Arrow.Adbc.C
using (Utf8Helper utf8Key = new Utf8Helper(key))
using (Utf8Helper utf8Value = new Utf8Helper(value))
{
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionSetOption>(fn)(cn,
utf8Key, utf8Value, e));
+
TranslateCode(Marshal.GetDelegateForFunctionPointer<ConnectionSetOption>(fn)(cn,
utf8Key, utf8Value, e));
}
}
}
@@ -828,7 +1027,7 @@ namespace Apache.Arrow.Adbc.C
fixed (CAdbcDatabase* db = &database)
fixed (CAdbcError* e = &_error)
{
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionInit>(fn)(cn,
db, e));
+
TranslateCode(Marshal.GetDelegateForFunctionPointer<ConnectionInit>(fn)(cn, db,
e));
}
}
#endif
@@ -842,7 +1041,7 @@ namespace Apache.Arrow.Adbc.C
#if NET5_0_OR_GREATER
_error.release(err);
#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ErrorRelease>(err->release)(err);
+
Marshal.GetDelegateForFunctionPointer<ErrorRelease>(err->release)(err);
#endif
_error.release = default;
}
diff --git a/csharp/src/Apache.Arrow.Adbc/C/Delegates.cs
b/csharp/src/Apache.Arrow.Adbc/C/Delegates.cs
new file mode 100644
index 000000000..482580159
--- /dev/null
+++ b/csharp/src/Apache.Arrow.Adbc/C/Delegates.cs
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.Arrow.C;
+
+namespace Apache.Arrow.Adbc.C
+{
+ internal unsafe delegate void ErrorRelease(CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode DatabaseFn(CAdbcDatabase*
database, CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode ConnectionFn(CAdbcConnection*
connection, CAdbcError* error);
+
+#if !NET5_0_OR_GREATER
+ internal unsafe delegate AdbcStatusCode DriverRelease(CAdbcDriver* driver,
CAdbcError* error);
+ internal unsafe delegate void PartitionsRelease(CAdbcPartitions*
partitions);
+ internal unsafe delegate AdbcStatusCode DatabaseSetOption(CAdbcDatabase*
database, byte* name, byte* value, CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode
ConnectionGetObjects(CAdbcConnection* connection, int depth, byte* catalog,
byte* db_schema, byte* table_name, byte** table_type, byte* column_name,
CArrowArrayStream* stream, CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode
ConnectionGetTableSchema(CAdbcConnection* connection, byte* catalog, byte*
db_schema, byte* table_name, CArrowSchema* schema, CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode
ConnectionGetTableTypes(CAdbcConnection* connection, CArrowArrayStream* stream,
CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode ConnectionInit(CAdbcConnection*
connection, CAdbcDatabase* database, CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode ConnectionGetInfo(CAdbcConnection*
connection, int* info_codes, int info_codes_length, CArrowArrayStream* stream,
CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode
ConnectionReadPartition(CAdbcConnection* connection, byte*
serialized_partition, int serialized_length, CArrowArrayStream* stream,
CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode
ConnectionSetOption(CAdbcConnection* connection, byte* name, byte* value,
CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode StatementBind(CAdbcStatement*
statement, CArrowArray* array, CArrowSchema* schema, CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode
StatementBindStream(CAdbcStatement* statement, CArrowArrayStream* stream,
CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode
StatementExecuteQuery(CAdbcStatement* statement, CArrowArrayStream* stream,
long* rows, CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode
StatementExecutePartitions(CAdbcStatement* statement, CArrowSchema* schema,
CAdbcPartitions* partitions, long* rows, CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode StatementNew(CAdbcConnection*
connection, CAdbcStatement* statement, CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode StatementFn(CAdbcStatement*
statement, CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode
StatementSetSqlQuery(CAdbcStatement* statement, byte* text, CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode
StatementSetSubstraitPlan(CAdbcStatement* statement, byte* plan, int length,
CAdbcError* error);
+ internal unsafe delegate AdbcStatusCode
StatementGetParameterSchema(CAdbcStatement* statement, CArrowSchema* schema,
CAdbcError* error);
+#endif
+}
diff --git a/csharp/src/Apache.Arrow.Adbc/C/NativeDelegate.cs
b/csharp/src/Apache.Arrow.Adbc/C/NativeDelegate.cs
index 4dcb063d6..5453c21ee 100644
--- a/csharp/src/Apache.Arrow.Adbc/C/NativeDelegate.cs
+++ b/csharp/src/Apache.Arrow.Adbc/C/NativeDelegate.cs
@@ -16,13 +16,16 @@
*/
using System;
+using System.Collections.Generic;
using System.Runtime.InteropServices;
namespace Apache.Arrow.Adbc.C
{
internal readonly struct NativeDelegate<T> where T : Delegate
{
- private readonly T _managedDelegate; // For lifetime management
+ // Lifetime management
+ private static readonly List<Delegate> _managedDelegates = new
List<Delegate>();
+ private readonly T _managedDelegate;
public NativeDelegate(T managedDelegate)
{
@@ -30,6 +33,15 @@ namespace Apache.Arrow.Adbc.C
Pointer = Marshal.GetFunctionPointerForDelegate(managedDelegate);
}
+ public static IntPtr AsNativePointer(T managedDelegate)
+ {
+ lock (_managedDelegates)
+ {
+ _managedDelegates.Add(managedDelegate);
+ }
+ return Marshal.GetFunctionPointerForDelegate(managedDelegate);
+ }
+
public IntPtr Pointer { get; }
}
}