This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 92bc81269 fix(csharp): imported drivers have the potential for a lot
of memory leaks (#1776)
92bc81269 is described below
commit 92bc81269b8c2d23ee7eb5b8e3d0a9539f756876
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Fri Apr 26 10:16:16 2024 -0700
fix(csharp): imported drivers have the potential for a lot of memory leaks
(#1776)
Fairly large restructuring of the importer/exporter code with an eye
towards ensuring cleanup -- but also tightening the code.
---
.../src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs | 64 +-
.../src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs | 760 ++++++++-------------
.../Extensions/MarshalExtensions.cs | 10 +
3 files changed, 319 insertions(+), 515 deletions(-)
diff --git a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
index 9241e9701..0a119910f 100644
--- a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
+++ b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
@@ -531,11 +531,7 @@ namespace Apache.Arrow.Adbc.C
GCHandle gch =
GCHandle.FromIntPtr((IntPtr)nativeStatement->private_data);
AdbcStatement stub = (AdbcStatement)gch.Target;
-#if NETSTANDARD
- stub.SqlQuery =
MarshalExtensions.PtrToStringUTF8((IntPtr)text);
-#else
- stub.SqlQuery = Marshal.PtrToStringUTF8((IntPtr)text);
-#endif
+ stub.SqlQuery = MarshalExtensions.PtrToStringUTF8(text);
return AdbcStatusCode.Success;
}
@@ -831,14 +827,7 @@ namespace Apache.Arrow.Adbc.C
public unsafe void SetOption(byte* name, byte* value)
{
- IntPtr namePtr = (IntPtr)name;
- IntPtr valuePtr = (IntPtr)value;
-
-#if NETSTANDARD
- options[MarshalExtensions.PtrToStringUTF8(namePtr)] =
MarshalExtensions.PtrToStringUTF8(valuePtr);
-#else
- options[Marshal.PtrToStringUTF8(namePtr)] =
Marshal.PtrToStringUTF8(valuePtr);
-#endif
+ options[MarshalExtensions.PtrToStringUTF8(name)] =
MarshalExtensions.PtrToStringUTF8(value);
}
public void OpenConnection(IReadOnlyDictionary<string, string>
options, out AdbcConnection connection)
@@ -867,14 +856,7 @@ namespace Apache.Arrow.Adbc.C
public unsafe void SetOption(byte* name, byte* value)
{
- IntPtr namePtr = (IntPtr)name;
- IntPtr valuePtr = (IntPtr)value;
-
-#if NETSTANDARD
- options[MarshalExtensions.PtrToStringUTF8(namePtr)] =
MarshalExtensions.PtrToStringUTF8(valuePtr);
-#else
- options[Marshal.PtrToStringUTF8(namePtr)] =
Marshal.PtrToStringUTF8(valuePtr);
-#endif
+ options[MarshalExtensions.PtrToStringUTF8(name)] =
MarshalExtensions.PtrToStringUTF8(value);
}
public void Rollback() { this.connection.Rollback(); }
@@ -888,22 +870,10 @@ namespace Apache.Arrow.Adbc.C
public unsafe void GetObjects(ref CAdbcConnection
nativeConnection, int depth, byte* catalog, byte* db_schema, byte* table_name,
byte** table_type, byte* column_name, CArrowArrayStream* cstream)
{
- string catalogPattern = string.Empty;
- string dbSchemaPattern = string.Empty;
- string tableNamePattern = string.Empty;
- string columnNamePattern = string.Empty;
-
-#if NETSTANDARD
- catalogPattern =
MarshalExtensions.PtrToStringUTF8((IntPtr)catalog);
- dbSchemaPattern =
MarshalExtensions.PtrToStringUTF8((IntPtr)db_schema);
- tableNamePattern =
MarshalExtensions.PtrToStringUTF8((IntPtr)table_name);
- columnNamePattern =
MarshalExtensions.PtrToStringUTF8((IntPtr)column_name);
-#else
- catalogPattern = Marshal.PtrToStringUTF8((IntPtr)catalog);
- dbSchemaPattern = Marshal.PtrToStringUTF8((IntPtr)db_schema);
- tableNamePattern = Marshal.PtrToStringUTF8((IntPtr)table_name);
- columnNamePattern =
Marshal.PtrToStringUTF8((IntPtr)column_name);
-#endif
+ string catalogPattern =
MarshalExtensions.PtrToStringUTF8(catalog);
+ string dbSchemaPattern =
MarshalExtensions.PtrToStringUTF8(db_schema);
+ string tableNamePattern =
MarshalExtensions.PtrToStringUTF8(table_name);
+ string columnNamePattern =
MarshalExtensions.PtrToStringUTF8(column_name);
string[] tableTypes = null;
const int maxTableTypeCount = 100;
@@ -923,11 +893,7 @@ namespace Apache.Arrow.Adbc.C
tableTypes = new string[count];
for (int i = 0; i < count; i++)
{
-#if NETSTANDARD
tableTypes[i] =
MarshalExtensions.PtrToStringUTF8((IntPtr)table_type[i]);
-#else
- tableTypes[i] =
Marshal.PtrToStringUTF8((IntPtr)table_type[i]);
-#endif
}
}
@@ -940,19 +906,9 @@ namespace Apache.Arrow.Adbc.C
public unsafe void GetTableSchema(byte* catalog, byte* db_schema,
byte* table_name, CArrowSchema* cschema)
{
- string sCatalog = string.Empty;
- string sDbSchema = string.Empty;
- string sTableName = string.Empty;
-
-#if NETSTANDARD
- sCatalog = MarshalExtensions.PtrToStringUTF8((IntPtr)catalog);
- sDbSchema =
MarshalExtensions.PtrToStringUTF8((IntPtr)db_schema);
- sTableName =
MarshalExtensions.PtrToStringUTF8((IntPtr)table_name);
-#else
- sCatalog = Marshal.PtrToStringUTF8((IntPtr)catalog);
- sDbSchema = Marshal.PtrToStringUTF8((IntPtr)db_schema);
- sTableName = Marshal.PtrToStringUTF8((IntPtr)table_name);
-#endif
+ string sCatalog = MarshalExtensions.PtrToStringUTF8(catalog);
+ string sDbSchema =
MarshalExtensions.PtrToStringUTF8(db_schema);
+ string sTableName =
MarshalExtensions.PtrToStringUTF8(table_name);
Schema schema = connection.GetTableSchema(sCatalog, sDbSchema,
sTableName);
diff --git a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
index 51b5c0d0a..07398b8f9 100644
--- a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
+++ b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
@@ -213,7 +213,16 @@ namespace Apache.Arrow.Adbc.C
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.StatementNew, ref
_nativeConnection, ref nativeStatement);
+ fixed (CAdbcConnection* connection = &_nativeConnection)
+ {
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.StatementNew
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementNew>(_nativeDriver.StatementNew)
+#endif
+ (connection, &nativeStatement, &caller._error));
+ }
}
return new AdbcStatementNative(_nativeDriver, nativeStatement);
@@ -221,58 +230,105 @@ namespace Apache.Arrow.Adbc.C
public unsafe override IArrowArrayStream
GetInfo(IReadOnlyList<AdbcInfoCode> codes)
{
- CArrowArrayStream* nativeArrayStream =
CArrowArrayStream.Create();
-
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.ConnectionGetInfo, ref
_nativeConnection, codes, nativeArrayStream);
+ Span<AdbcInfoCode> span = codes.AsSpan();
+ fixed (CAdbcConnection* connection = &_nativeConnection)
+ fixed (AdbcInfoCode* spanPtr = span)
+ {
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.ConnectionGetInfo
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetInfo>(_nativeDriver.ConnectionGetInfo)
+#endif
+ (connection, (int*)spanPtr, codes.Count,
caller.CreateStream(), &caller._error));
+ return caller.ImportStream();
+ }
}
-
- IArrowArrayStream arrowArrayStream =
CArrowArrayStreamImporter.ImportArrayStream(nativeArrayStream);
-
- return arrowArrayStream;
}
- public override unsafe IArrowArrayStream
GetObjects(GetObjectsDepth depth, string catalogPattern, string
dbSchemaPattern, string tableNamePattern, IReadOnlyList<string> tableTypes,
string columnNamePattern)
+ public unsafe override IArrowArrayStream
GetObjects(GetObjectsDepth depth, string catalogPattern, string
dbSchemaPattern, string tableNamePattern, IReadOnlyList<string> tableTypes,
string columnNamePattern)
{
- CArrowArrayStream* nativeArrayStream =
CArrowArrayStream.Create();
-
- using (CallHelper caller = new CallHelper())
+ byte** utf8TableTypes = null;
+ try
{
- caller.Call(_nativeDriver.ConnectionGetObjects, ref
_nativeConnection, (int)depth, catalogPattern, dbSchemaPattern,
tableNamePattern, tableTypes, columnNamePattern, nativeArrayStream);
- }
+ // need to terminate with a null entry per
https://github.com/apache/arrow-adbc/blob/b97e22c4d6524b60bf261e1970155500645be510/adbc.h#L909-L911
+ utf8TableTypes = (byte**)Marshal.AllocHGlobal(IntPtr.Size
* (tableTypes.Count + 1));
+ utf8TableTypes[tableTypes.Count] = null;
- IArrowArrayStream arrowArrayStream =
CArrowArrayStreamImporter.ImportArrayStream(nativeArrayStream);
+ for (int i = 0; i < tableTypes.Count; i++)
+ {
+ string tableType = tableTypes[i];
+ utf8TableTypes[i] =
(byte*)MarshalExtensions.StringToCoTaskMemUTF8(tableType);
+ }
- return arrowArrayStream;
+ using (Utf8Helper utf8Catalog = new
Utf8Helper(catalogPattern))
+ using (Utf8Helper utf8Schema = new
Utf8Helper(dbSchemaPattern))
+ using (Utf8Helper utf8Table = new
Utf8Helper(tableNamePattern))
+ using (Utf8Helper utf8Column = new
Utf8Helper(columnNamePattern))
+ using (CallHelper caller = new CallHelper())
+ {
+ fixed (CAdbcConnection* connection =
&_nativeConnection)
+ {
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.ConnectionGetObjects
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetObjects>(_nativeDriver.ConnectionGetObjects)
+#endif
+ (connection, (int)depth, utf8Catalog,
utf8Schema, utf8Table, utf8TableTypes, utf8Column, caller.CreateStream(),
&caller._error));
+ return caller.ImportStream();
+ }
+ }
+ }
+ finally
+ {
+ for (int i = 0; i < tableTypes.Count; i++)
+ {
+ Marshal.FreeCoTaskMem((IntPtr)utf8TableTypes[i]);
+ }
+ Marshal.FreeHGlobal((IntPtr)utf8TableTypes);
+ }
}
- public override unsafe IArrowArrayStream GetTableTypes()
+ public unsafe override IArrowArrayStream GetTableTypes()
{
- CArrowArrayStream* nativeArrayStream =
CArrowArrayStream.Create();
-
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.ConnectionGetTableTypes, ref
_nativeConnection, nativeArrayStream);
+ fixed (CAdbcConnection* connection = &_nativeConnection)
+ {
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.ConnectionGetTableTypes
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetTableTypes>(_nativeDriver.ConnectionGetTableTypes)
+#endif
+ (connection, caller.CreateStream(),
&caller._error));
+ return caller.ImportStream();
+ }
}
-
- IArrowArrayStream arrowArrayStream =
CArrowArrayStreamImporter.ImportArrayStream(nativeArrayStream);
-
- return arrowArrayStream;
}
- public override unsafe Schema GetTableSchema(string catalog,
string db_schema, string table_name)
+ public unsafe override Schema GetTableSchema(string catalog,
string db_schema, string table_name)
{
- CArrowSchema* nativeSchema = CArrowSchema.Create();
-
+ using (Utf8Helper utf8Catalog = new Utf8Helper(catalog))
+ using (Utf8Helper utf8Schema = new Utf8Helper(db_schema))
+ using (Utf8Helper utf8Table = new Utf8Helper(table_name))
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.ConnectionGetTableSchema, ref
_nativeConnection, catalog, db_schema, table_name, nativeSchema);
+ fixed (CAdbcConnection* connection = &_nativeConnection)
+ {
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.ConnectionGetTableSchema
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetTableSchema>(_nativeDriver.ConnectionGetTableSchema)
+#endif
+ (connection, utf8Catalog, utf8Schema, utf8Table,
caller.CreateSchema(), &caller._error));
+ return caller.ImportSchema();
+ }
}
-
- Schema schema =
CArrowSchemaImporter.ImportSchema(nativeSchema);
-
- return schema;
}
}
@@ -298,7 +354,17 @@ namespace Apache.Arrow.Adbc.C
{
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.StatementSetSubstraitPlan,
ref _nativeStatement, value);
+ fixed (CAdbcStatement* statement = &_nativeStatement)
+ fixed (byte* substraitPlan = value)
+ {
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.StatementSetSubstraitPlan
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementSetSubstraitPlan>(_nativeDriver.StatementSetSubstraitPlan)
+#endif
+ (statement, substraitPlan, value.Length,
&caller._error));
+ }
_substraitPlan = value;
}
}
@@ -308,7 +374,23 @@ namespace Apache.Arrow.Adbc.C
{
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.StatementBind, ref
_nativeStatement, batch, schema);
+ fixed (CAdbcStatement* statement = &_nativeStatement)
+ {
+ CArrowArrayExporter.ExportRecordBatch(batch,
caller.CreateArray());
+ CArrowSchemaExporter.ExportSchema(schema,
caller.CreateSchema());
+
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.StatementBind
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementBind>(_nativeDriver.StatementBind)
+#endif
+ (statement, caller.Array, caller.Schema,
&caller._error));
+
+ // On success, ownership passes to the driver
+ caller.ForgetArray();
+ caller.ForgetSchema();
+ }
}
}
@@ -316,60 +398,143 @@ namespace Apache.Arrow.Adbc.C
{
using (CallHelper caller = new CallHelper())
{
- caller.Call(_nativeDriver.StatementBindStream, ref
_nativeStatement, stream);
+ fixed (CAdbcStatement* statement = &_nativeStatement)
+ {
+ CArrowArrayStreamExporter.ExportArrayStream(stream,
caller.CreateStream());
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.StatementBindStream
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementBindStream>(_nativeDriver.StatementBindStream)
+#endif
+ (statement, caller.ArrayStream, &caller._error));
+
+ // On success, ownership passes to the driver
+ caller.ForgetStream();
+ }
}
}
public unsafe override QueryResult ExecuteQuery()
{
- CArrowArrayStream* nativeArrayStream =
CArrowArrayStream.Create();
+ if (SqlQuery != null)
+ {
+ // TODO: Consider moving this to the setter
+ SetSqlQuery(SqlQuery);
+ }
using (CallHelper caller = new CallHelper())
{
- if (SqlQuery != null)
+ fixed (CAdbcStatement* statement = &_nativeStatement)
{
- // TODO: Consider moving this to the setter
- caller.Call(_nativeDriver.StatementSetSqlQuery, ref
_nativeStatement, SqlQuery);
+ long rows = 0;
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.StatementExecuteQuery
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementExecuteQuery>(_nativeDriver.StatementExecuteQuery)
+#endif
+ (statement, caller.CreateStream(), &rows,
&caller._error));
+
+ return new QueryResult(rows, caller.ImportStream());
}
+ }
+ }
- long rows = 0;
+ public unsafe override UpdateResult ExecuteUpdate()
+ {
+ if (SqlQuery != null)
+ {
+ // TODO: Consider moving this to the setter
+ SetSqlQuery(SqlQuery);
+ }
- caller.Call(_nativeDriver.StatementExecuteQuery, ref
_nativeStatement, nativeArrayStream, ref rows);
+ using (CallHelper caller = new CallHelper())
+ {
+ fixed (CAdbcStatement* statement = &_nativeStatement)
+ {
+ long rows = 0;
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.StatementExecuteQuery
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementExecuteQuery>(_nativeDriver.StatementExecuteQuery)
+#endif
+ (statement, caller.CreateStream(), &rows,
&caller._error));
- return new QueryResult(rows,
CArrowArrayStreamImporter.ImportArrayStream(nativeArrayStream));
+ return new UpdateResult(rows);
+ }
}
}
- public override unsafe UpdateResult ExecuteUpdate()
+ public unsafe override PartitionedResult ExecutePartitioned()
{
+ if (SqlQuery != null)
+ {
+ // TODO: Consider moving this to the setter
+ SetSqlQuery(SqlQuery);
+ }
+
using (CallHelper caller = new CallHelper())
{
- if (SqlQuery != null)
+ fixed (CAdbcStatement* statement = &_nativeStatement)
{
- // TODO: Consider moving this to the setter
- caller.Call(_nativeDriver.StatementSetSqlQuery, ref
_nativeStatement, SqlQuery);
- }
+ CAdbcPartitions* nativePartitions = null;
+ long rowsAffected = 0;
+ try
+ {
+ nativePartitions =
(CAdbcPartitions*)Marshal.AllocHGlobal(sizeof(CAdbcPartitions));
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.StatementExecutePartitions
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementExecutePartitions>(_nativeDriver.StatementExecutePartitions)
+#endif
+ (statement, caller.CreateSchema(),
nativePartitions, &rowsAffected, &caller._error));
- long rows = 0;
+ PartitionDescriptor[] partitions = new
PartitionDescriptor[nativePartitions->num_partitions];
+ for (int i = 0; i < partitions.Length; i++)
+ {
+ partitions[i] = new
PartitionDescriptor(MarshalExtensions.MarshalBuffer(nativePartitions->partitions[i],
checked((int)nativePartitions->partition_lengths[i])));
+ }
- caller.Call(_nativeDriver.StatementExecuteQuery, ref
_nativeStatement, null, ref rows);
+ return new
PartitionedResult(caller.ImportSchema(), rowsAffected, partitions);
+ }
+ finally
+ {
+ if (nativePartitions->release != null)
+ {
+#if NET5_0_OR_GREATER
+ nativePartitions->release(nativePartitions);
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.PartitionsRelease>(nativePartitions->release)(nativePartitions);
+#endif
+ }
- return new UpdateResult(rows);
+ if (nativePartitions != null)
+ {
+ Marshal.FreeHGlobal((IntPtr)nativePartitions);
+ }
+ }
+ }
}
}
- public override unsafe PartitionedResult ExecutePartitioned()
+ private unsafe void SetSqlQuery(string sqlQuery)
{
- using (CallHelper caller = new CallHelper())
+ fixed (CAdbcStatement* statement = &_nativeStatement)
{
- if (SqlQuery != null)
+ using (Utf8Helper query = new Utf8Helper(sqlQuery))
+ using (CallHelper caller = new CallHelper())
{
- // TODO: Consider moving this to the setter
- caller.Call(_nativeDriver.StatementSetSqlQuery, ref
_nativeStatement, SqlQuery);
+ caller.TranslateCode(
+#if NET5_0_OR_GREATER
+ _nativeDriver.StatementSetSqlQuery
+#else
+
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementSetSqlQuery>(_nativeDriver.StatementSetSqlQuery)
+#endif
+ (statement, query, &caller._error));
}
-
- caller.Call(_nativeDriver.StatementExecutePartitions, ref
_nativeStatement, out PartitionedResult result);
- return result;
}
}
}
@@ -387,7 +552,16 @@ namespace Apache.Arrow.Adbc.C
}
public static implicit operator IntPtr(Utf8Helper s) { return
s._s; }
- public void Dispose() { Marshal.FreeCoTaskMem(_s); }
+ public static unsafe implicit operator byte*(Utf8Helper s) {
return (byte*)s._s; }
+
+ public void Dispose()
+ {
+ if (_s != IntPtr.Zero)
+ {
+ Marshal.FreeCoTaskMem(_s);
+ _s = IntPtr.Zero;
+ }
+ }
}
/// <summary>
@@ -395,11 +569,70 @@ namespace Apache.Arrow.Adbc.C
/// </summary>
private unsafe struct CallHelper : IDisposable
{
- private CAdbcError _error;
- private CArrowArray* _array;
+ public CAdbcError _error;
private CArrowSchema* _schema;
+ private CArrowArray* _array;
private CArrowArrayStream* _arrayStream;
+ public CArrowSchema* Schema => _schema;
+ public CArrowArray* Array => _array;
+ public CArrowArrayStream* ArrayStream => _arrayStream;
+
+ public CArrowSchema* CreateSchema()
+ {
+ Debug.Assert(_schema == null);
+ _schema = CArrowSchema.Create();
+ return _schema;
+ }
+
+ public void ForgetSchema()
+ {
+ Debug.Assert(_schema != null);
+ _schema = null;
+ }
+
+ public Schema ImportSchema()
+ {
+ Debug.Assert(_schema != null);
+ Schema schema = CArrowSchemaImporter.ImportSchema(_schema);
+ _schema = null;
+ return schema;
+ }
+
+ public CArrowArray* CreateArray()
+ {
+ Debug.Assert(_array == null);
+ _array = CArrowArray.Create();
+ return _array;
+ }
+
+ public void ForgetArray()
+ {
+ Debug.Assert(_array != null);
+ _array = null;
+ }
+
+ public CArrowArrayStream* CreateStream()
+ {
+ Debug.Assert(_arrayStream == null);
+ _arrayStream = CArrowArrayStream.Create();
+ return _arrayStream;
+ }
+
+ public void ForgetStream()
+ {
+ Debug.Assert(_arrayStream != null);
+ _arrayStream = null;
+ }
+
+ public IArrowArrayStream ImportStream()
+ {
+ Debug.Assert(_arrayStream != null);
+ IArrowArrayStream arrayStream =
CArrowArrayStreamImporter.ImportArrayStream(_arrayStream);
+ _arrayStream = null;
+ return arrayStream;
+ }
+
public unsafe void Call(AdbcDriverInit init, int version, ref
CAdbcDriver driver)
{
TranslateCode(init(version, ref driver, ref this._error));
@@ -454,13 +687,7 @@ namespace Apache.Arrow.Adbc.C
using (Utf8Helper utf8Key = new Utf8Helper(key))
using (Utf8Helper utf8Value = new Utf8Helper(value))
{
- unsafe
- {
- IntPtr keyPtr = utf8Key;
- IntPtr valuePtr = utf8Value;
-
- TranslateCode(fn(db, (byte*)keyPtr,
(byte*)valuePtr, e));
- }
+ TranslateCode(fn(db, utf8Key, utf8Value, e));
}
}
}
@@ -473,13 +700,7 @@ namespace Apache.Arrow.Adbc.C
using (Utf8Helper utf8Key = new Utf8Helper(key))
using (Utf8Helper utf8Value = new Utf8Helper(value))
{
- unsafe
- {
- IntPtr keyPtr = utf8Key;
- IntPtr valuePtr = utf8Value;
-
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.DatabaseSetOption>(fn)(db,
(byte*)keyPtr, (byte*)valuePtr, e));
- }
+
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.DatabaseSetOption>(fn)(db,
utf8Key, utf8Value, e));
}
}
}
@@ -514,13 +735,7 @@ namespace Apache.Arrow.Adbc.C
using (Utf8Helper utf8Key = new Utf8Helper(key))
using (Utf8Helper utf8Value = new Utf8Helper(value))
{
- unsafe
- {
- IntPtr keyPtr = utf8Key;
- IntPtr valuePtr = utf8Value;
-
- TranslateCode(fn(cn, (byte*)keyPtr,
(byte*)valuePtr, e));
- }
+ TranslateCode(fn(cn, utf8Key, utf8Value, e));
}
}
}
@@ -533,13 +748,7 @@ namespace Apache.Arrow.Adbc.C
using (Utf8Helper utf8Key = new Utf8Helper(key))
using (Utf8Helper utf8Value = new Utf8Helper(value))
{
- unsafe
- {
- IntPtr keyPtr = utf8Key;
- IntPtr valuePtr = utf8Value;
-
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionSetOption>(fn)(cn,
(byte*)keyPtr, (byte*)valuePtr, e));
- }
+
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionSetOption>(fn)(cn,
utf8Key, utf8Value, e));
}
}
}
@@ -567,293 +776,6 @@ namespace Apache.Arrow.Adbc.C
}
#endif
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcConnection*,
CAdbcStatement*, CAdbcError*, AdbcStatusCode> fn, ref CAdbcConnection
nativeConnection, ref CAdbcStatement nativeStatement)
- {
- fixed (CAdbcConnection* cn = &nativeConnection)
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (CAdbcError* e = &_error)
- {
- TranslateCode(fn(cn, stmt, e));
- }
- }
-#else
- public unsafe void Call(IntPtr fn, ref CAdbcConnection
nativeConnection, ref CAdbcStatement nativeStatement)
- {
- fixed (CAdbcConnection* cn = &nativeConnection)
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (CAdbcError* e = &_error)
- {
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementNew>(fn)(cn,
stmt, e));
- }
- }
-#endif
-
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcStatement*,
CArrowArray*, CArrowSchema*, CAdbcError*, AdbcStatusCode> fn, ref
CAdbcStatement nativeStatement, RecordBatch batch, Schema schema)
-#else
- public unsafe void Call(IntPtr fn, ref CAdbcStatement
nativeStatement, RecordBatch batch, Schema schema)
-#endif
- {
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (CAdbcError* e = &_error)
- {
- Debug.Assert(_array == null);
- _array = CArrowArray.Create();
- CArrowArrayExporter.ExportRecordBatch(batch, _array);
-
- Debug.Assert(_schema == null);
- _schema = CArrowSchema.Create();
- CArrowSchemaExporter.ExportSchema(schema, _schema);
-
-#if NET5_0_OR_GREATER
- TranslateCode(fn(stmt, _array, _schema, e));
-#else
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementBind>(fn)(stmt,
_array, _schema, e));
-#endif
-
- _array = null;
- _schema = null;
- }
- }
-
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcStatement*,
CArrowArrayStream*, CAdbcError*, AdbcStatusCode> fn, ref CAdbcStatement
nativeStatement, IArrowArrayStream stream)
-#else
- public unsafe void Call(IntPtr fn, ref CAdbcStatement
nativeStatement, IArrowArrayStream stream)
-#endif
- {
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (CAdbcError* e = &_error)
- {
- Debug.Assert(_arrayStream == null);
- _arrayStream = CArrowArrayStream.Create();
- CArrowArrayStreamExporter.ExportArrayStream(stream,
_arrayStream);
-
-#if NET5_0_OR_GREATER
- TranslateCode(fn(stmt, _arrayStream, e));
-#else
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementBindStream>(fn)(stmt,
_arrayStream, e));
-#endif
-
- _arrayStream = null;
- }
- }
-
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcStatement*,
CAdbcError*, AdbcStatusCode> fn, ref CAdbcStatement nativeStatement)
- {
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (CAdbcError* e = &_error)
- {
- TranslateCode(fn(stmt, e));
- }
- }
-#else
- public unsafe void Call(IntPtr fn, ref CAdbcStatement
nativeStatement)
- {
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (CAdbcError* e = &_error)
- {
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementFn>(fn)(stmt,
e));
- }
- }
-#endif
-
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcStatement*,
byte*, CAdbcError*, AdbcStatusCode> fn, ref CAdbcStatement nativeStatement,
string sqlQuery)
- {
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (CAdbcError* e = &_error)
- {
- using (Utf8Helper query = new Utf8Helper(sqlQuery))
- {
- IntPtr bQuery = (IntPtr)(query);
-
- TranslateCode(fn(stmt, (byte*)bQuery, e));
- }
- }
- }
-#else
- public unsafe void Call(IntPtr fn, ref CAdbcStatement
nativeStatement, string sqlQuery)
- {
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (CAdbcError* e = &_error)
- {
- using (Utf8Helper query = new Utf8Helper(sqlQuery))
- {
- IntPtr bQuery = (IntPtr)(query);
-
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementSetSqlQuery>(fn)(stmt,
(byte*)bQuery, e));
- }
- }
- }
-#endif
-
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcStatement*,
byte*, int, CAdbcError*, AdbcStatusCode> fn, ref CAdbcStatement
nativeStatement, byte[] substraitPlan)
- {
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (byte* plan = substraitPlan)
- fixed (CAdbcError* e = &_error)
- {
- TranslateCode(fn(stmt, plan, substraitPlan.Length, e));
- }
- }
-#else
- public unsafe void Call(IntPtr fn, ref CAdbcStatement
nativeStatement, byte[] substraitPlan)
- {
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (byte* plan = substraitPlan)
- fixed (CAdbcError* e = &_error)
- {
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementSetSubstraitPlan>(fn)(stmt,
plan, substraitPlan.Length, e));
- }
- }
-#endif
-
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcStatement*,
CArrowArrayStream*, long*, CAdbcError*, AdbcStatusCode> fn, ref CAdbcStatement
nativeStatement, CArrowArrayStream* arrowStream, ref long nRows)
- {
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (long* rows = &nRows)
- fixed (CAdbcError* e = &_error)
- {
- TranslateCode(fn(stmt, arrowStream, rows, e));
- }
- }
-#else
- public unsafe void Call(IntPtr fn, ref CAdbcStatement
nativeStatement, CArrowArrayStream* arrowStream, ref long nRows)
- {
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (long* rows = &nRows)
- fixed (CAdbcError* e = &_error)
- {
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementExecuteQuery>(fn)(stmt,
arrowStream, rows, e));
- }
- }
-#endif
-
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcStatement*,
CArrowSchema*, CAdbcPartitions*, long*, CAdbcError*, AdbcStatusCode> fn, ref
CAdbcStatement nativeStatement, out PartitionedResult result)
-#else
- public unsafe void Call(IntPtr fn, ref CAdbcStatement
nativeStatement, out PartitionedResult result)
-#endif
- {
- fixed (CAdbcStatement* stmt = &nativeStatement)
- fixed (CAdbcError* e = &_error)
- {
- CAdbcPartitions* nativePartitions = null;
- CArrowSchema* nativeSchema = null;
- long rowsAffected = 0;
- try
- {
- nativePartitions =
(CAdbcPartitions*)Marshal.AllocHGlobal(sizeof(CAdbcPartitions));
- nativeSchema = CArrowSchema.Create();
-
-#if NET5_0_OR_GREATER
- TranslateCode(fn(stmt, nativeSchema, nativePartitions,
&rowsAffected, e));
-#else
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementExecutePartitions>(fn)(stmt,
nativeSchema, nativePartitions, &rowsAffected, e));
-#endif
-
- PartitionDescriptor[] partitions = new
PartitionDescriptor[nativePartitions->num_partitions];
- for (int i = 0; i < partitions.Length; i++)
- {
- partitions[i] = new
PartitionDescriptor(MarshalExtensions.MarshalBuffer(nativePartitions->partitions[i],
checked((int)nativePartitions->partition_lengths[i])));
- }
-
- result = new PartitionedResult(
- CArrowSchemaImporter.ImportSchema(nativeSchema),
- rowsAffected,
- partitions);
- }
- finally
- {
- if (nativePartitions->release != null)
- {
-#if NET5_0_OR_GREATER
- nativePartitions->release(nativePartitions);
-#else
-
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.PartitionsRelease>(nativePartitions->release)(nativePartitions);
-#endif
- }
-
- if (nativePartitions != null)
- {
- Marshal.FreeHGlobal((IntPtr)nativePartitions);
- }
-
- if (nativeSchema != null)
- {
- CArrowSchema.Free(nativeSchema);
- }
- }
- }
- }
-
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcConnection*,
byte*, byte*, byte*, CArrowSchema*, CAdbcError*, AdbcStatusCode> fn, ref
CAdbcConnection nativeconnection, string catalog, string dbSchema, string
tableName, CArrowSchema* nativeSchema)
- {
- byte* bCatalog, bDb_schema, bTable_name;
-
- using (Utf8Helper catalogHelper = new Utf8Helper(catalog))
- using (Utf8Helper schemaHelper = new Utf8Helper(dbSchema))
- using (Utf8Helper tableNameHelper = new Utf8Helper(tableName))
- {
- bCatalog = (byte*)(IntPtr)(catalogHelper);
- bDb_schema = (byte*)(IntPtr)(schemaHelper);
- bTable_name = (byte*)(IntPtr)(tableNameHelper);
-
- fixed (CAdbcConnection* connection = &nativeconnection)
- fixed (CAdbcError* e = &_error)
- {
- TranslateCode(fn(connection, bCatalog, bDb_schema,
bTable_name, nativeSchema, e));
- }
- }
- }
-#else
- public unsafe void Call(IntPtr fn, ref CAdbcConnection
nativeconnection, string catalog, string dbSchema, string tableName,
CArrowSchema* nativeSchema)
- {
- byte* bCatalog, bDb_schema, bTable_name;
-
- using (Utf8Helper catalogHelper = new Utf8Helper(catalog))
- using (Utf8Helper schemaHelper = new Utf8Helper(dbSchema))
- using (Utf8Helper tableNameHelper = new Utf8Helper(tableName))
- {
- bCatalog = (byte*)(IntPtr)(catalogHelper);
- bDb_schema = (byte*)(IntPtr)(schemaHelper);
- bTable_name = (byte*)(IntPtr)(tableNameHelper);
-
- fixed (CAdbcConnection* connection = &nativeconnection)
- fixed (CAdbcError* e = &_error)
- {
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetTableSchema>(fn)(connection,
bCatalog, bDb_schema, bTable_name, nativeSchema, e));
- }
- }
- }
-#endif
-
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcConnection*,
CArrowArrayStream*, CAdbcError*, AdbcStatusCode> fn, ref CAdbcConnection
nativeconnection, CArrowArrayStream* arrowStream)
- {
- fixed (CAdbcConnection* connection = &nativeconnection)
- fixed (CAdbcError* e = &_error)
- {
- TranslateCode(fn(connection, arrowStream, e));
- }
- }
-#else
- public unsafe void Call(IntPtr fn, ref CAdbcConnection
nativeconnection, CArrowArrayStream* arrowStream)
- {
- fixed (CAdbcConnection* connection = &nativeconnection)
- fixed (CAdbcError* e = &_error)
- {
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetTableTypes>(fn)(connection,
arrowStream, e));
- }
- }
-#endif
-
public unsafe void Dispose()
{
if (_error.release != default)
@@ -886,91 +808,7 @@ namespace Apache.Arrow.Adbc.C
}
}
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcConnection*,
int*, int, CArrowArrayStream*, CAdbcError*, AdbcStatusCode> fn, ref
CAdbcConnection connection, IReadOnlyList<AdbcInfoCode> infoCodes,
CArrowArrayStream* stream)
- {
- fixed (CAdbcConnection* cn = &connection)
- fixed (CAdbcError* e = &_error)
- {
- Span<AdbcInfoCode> span = infoCodes.AsSpan();
- fixed (AdbcInfoCode* spanPtr = span)
- {
- TranslateCode(fn(cn, (int*)spanPtr, infoCodes.Count,
stream, e));
- }
- }
- }
-#else
- public unsafe void Call(IntPtr ptr, ref CAdbcConnection
connection, IReadOnlyList<AdbcInfoCode> infoCodes, CArrowArrayStream* stream)
- {
- fixed (CAdbcConnection* cn = &connection)
- fixed (CAdbcError* e = &_error)
- {
- Span<AdbcInfoCode> span = infoCodes.AsSpan();
- fixed (AdbcInfoCode* spanPtr = span)
- {
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetInfo>(ptr)(cn,
(int*)spanPtr, infoCodes.Count, stream, e));
- }
- }
- }
-#endif
-
-#if NET5_0_OR_GREATER
- public unsafe void Call(delegate* unmanaged<CAdbcConnection*, int,
byte*, byte*, byte*, byte**, byte*, CArrowArrayStream*, CAdbcError*,
AdbcStatusCode> fn, ref CAdbcConnection connection, int depth, string catalog,
string db_schema, string table_name, IReadOnlyList<string> table_types, string
column_name, CArrowArrayStream* stream)
-#else
- public unsafe void Call(IntPtr fn, ref CAdbcConnection connection,
int depth, string catalog, string db_schema, string table_name,
IReadOnlyList<string> table_types, string column_name, CArrowArrayStream*
stream)
-#endif
- {
- byte* bcatalog, bDb_schema, bTable_name, bColumn_Name;
-
- if (table_types == null)
- {
- table_types = new string[0];
- }
-
- // need to terminate with a null entry per
https://github.com/apache/arrow-adbc/blob/b97e22c4d6524b60bf261e1970155500645be510/adbc.h#L909-L911
- byte** bTable_type = (byte**)Marshal.AllocHGlobal(IntPtr.Size
* (table_types.Count + 1));
- bTable_type[table_types.Count] = null;
-
- for (int i = 0; i < table_types.Count; i++)
- {
- string tableType = table_types[i];
- bTable_type[i] =
(byte*)MarshalExtensions.StringToCoTaskMemUTF8(tableType);
- }
-
- try
- {
- using (Utf8Helper catalogHelper = new Utf8Helper(catalog))
- using (Utf8Helper schemaHelper = new Utf8Helper(db_schema))
- using (Utf8Helper tableNameHelper = new
Utf8Helper(table_name))
- using (Utf8Helper columnNameHelper = new
Utf8Helper(column_name))
- {
- bcatalog = (byte*)(IntPtr)(catalogHelper);
- bDb_schema = (byte*)(IntPtr)(schemaHelper);
- bTable_name = (byte*)(IntPtr)(tableNameHelper);
- bColumn_Name = (byte*)(IntPtr)(columnNameHelper);
-
- fixed (CAdbcConnection* cn = &connection)
- fixed (CAdbcError* e = &_error)
- {
-#if NET5_0_OR_GREATER
- TranslateCode(fn(cn, depth, bcatalog, bDb_schema,
bTable_name, bTable_type, bColumn_Name, stream, e));
-#else
-
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.ConnectionGetObjects>(fn)(cn,
depth, bcatalog, bDb_schema, bTable_name, bTable_type, bColumn_Name, stream,
e));
-#endif
- }
- }
- }
- finally
- {
- for (int i = 0; i < table_types.Count; i++)
- {
- Marshal.FreeCoTaskMem((IntPtr)bTable_type[i]);
- }
- Marshal.FreeHGlobal((IntPtr)bTable_type);
- }
- }
-
- private unsafe void TranslateCode(AdbcStatusCode statusCode)
+ public unsafe void TranslateCode(AdbcStatusCode statusCode)
{
if (statusCode != AdbcStatusCode.Success)
{
diff --git a/csharp/src/Apache.Arrow.Adbc/Extensions/MarshalExtensions.cs
b/csharp/src/Apache.Arrow.Adbc/Extensions/MarshalExtensions.cs
index ce0b50b25..c776f897a 100644
--- a/csharp/src/Apache.Arrow.Adbc/Extensions/MarshalExtensions.cs
+++ b/csharp/src/Apache.Arrow.Adbc/Extensions/MarshalExtensions.cs
@@ -74,12 +74,22 @@ namespace Apache.Arrow.Adbc.Extensions
return pMem;
}
#else
+ public static unsafe string PtrToStringUTF8(IntPtr intPtr)
+ {
+ return Marshal.PtrToStringUTF8(intPtr);
+ }
+
public static IntPtr StringToCoTaskMemUTF8(string s)
{
return Marshal.StringToCoTaskMemUTF8(s);
}
#endif
+ public static unsafe string PtrToStringUTF8(byte* ptr)
+ {
+ return PtrToStringUTF8((IntPtr)ptr);
+ }
+
public static unsafe byte[] MarshalBuffer(void* ptr, int size)
{
if (ptr == null)