CurtHagenlocher commented on code in PR #1710:
URL: https://github.com/apache/arrow-adbc/pull/1710#discussion_r1565898954


##########
csharp/test/Drivers/Apache/Apache - 
Backup.Arrow.Adbc.Tests.Drivers.Apache.csproj:
##########
@@ -0,0 +1,42 @@
+<Project Sdk="Microsoft.NET.Sdk">

Review Comment:
   delete this file?



##########
csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBinaryColumn.cs:
##########
@@ -0,0 +1,237 @@
+/**

Review Comment:
   Consider adding a README.md to this directory indicating which files have 
been hand-edited and perhaps how the other files were generated. (If these were 
the files I originally generated, then I guess I'll have to remember... :/.)



##########
csharp/src/Drivers/Apache/Spark/SparkStatement.cs:
##########
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+    public class SparkStatement : HiveServer2Statement
+    {
+        internal SparkStatement(SparkConnection connection)
+            : base(connection)
+        {
+        }
+
+        protected override void SetStatementProperties(TExecuteStatementReq 
statement)
+        {
+            // TODO: Ensure this is set dynamically depending on server 
capabilities.
+            statement.EnforceResultPersistenceMode = false;
+            statement.ResultPersistenceMode = 2;
+
+            statement.CanReadArrowResult = true;
+            statement.CanDownloadResult = true;
+            statement.ConfOverlay = SparkConnection.timestampConfig;
+            statement.UseArrowNativeTypes = new TSparkArrowTypes
+            {
+                TimestampAsArrow = true,
+                DecimalAsArrow = true,
+                ComplexTypesAsArrow = false,
+                IntervalTypesAsArrow = false,

Review Comment:
   Why are these false? Just not implemented yet?



##########
csharp/src/Drivers/Apache/Spark/SparkStatement.cs:
##########
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+    public class SparkStatement : HiveServer2Statement
+    {
+        internal SparkStatement(SparkConnection connection)
+            : base(connection)
+        {
+        }
+
+        protected override void SetStatementProperties(TExecuteStatementReq 
statement)
+        {
+            // TODO: Ensure this is set dynamically depending on server 
capabilities.
+            statement.EnforceResultPersistenceMode = false;
+            statement.ResultPersistenceMode = 2;
+
+            statement.CanReadArrowResult = true;
+            statement.CanDownloadResult = true;
+            statement.ConfOverlay = SparkConnection.timestampConfig;
+            statement.UseArrowNativeTypes = new TSparkArrowTypes
+            {
+                TimestampAsArrow = true,
+                DecimalAsArrow = true,
+                ComplexTypesAsArrow = false,
+                IntervalTypesAsArrow = false,
+            };
+        }
+
+        public override QueryResult ExecuteQuery()
+        {
+            ExecuteStatement();
+            PollForResponse();
+            Schema schema = GetSchema();
+
+            // TODO: Ensure this is set dynamically based on server 
capabilities
+            return new QueryResult(-1, new SparkReader(this, schema));
+            //return new QueryResult(-1, new CloudFetchReader(this, schema));
+        }
+
+        public override UpdateResult ExecuteUpdate()
+        {
+            const string NumberOfAffectedRowsColumnName = "num_affected_rows";
+
+            QueryResult queryResult = ExecuteQuery();
+            using IArrowArrayStream stream = queryResult.Stream;
+
+            // Check if the affected rows columns are returned in the result.
+            Field affectedRowsField = 
stream.Schema.GetFieldByName(NumberOfAffectedRowsColumnName);
+            if (affectedRowsField != null && affectedRowsField.DataType.TypeId 
!= Types.ArrowTypeId.Int64)
+            {
+                throw new AdbcException($"Unexpected data type for column: 
'{NumberOfAffectedRowsColumnName}'", new 
ArgumentException(NumberOfAffectedRowsColumnName));
+            }
+
+            // If no altered rows, i.e. DDC statements, then -1 is the default.
+            long? affectedRows = null;
+            while (true)
+            {
+                using RecordBatch nextBatch = 
stream.ReadNextRecordBatchAsync().Result;
+                if (nextBatch == null) { break; }
+                Int64Array numOfModifiedArray = 
(Int64Array)nextBatch.Column(NumberOfAffectedRowsColumnName);
+                // Note: should only have one item, but iterate for 
completeness
+                for (int i = 0; i < numOfModifiedArray.Length; i++)
+                {
+                    // Note: handle the case where the affected rows are zero 
(0).
+                    affectedRows = (affectedRows ?? 0) + 
numOfModifiedArray.GetValue(i).GetValueOrDefault(0);
+                }
+            }
+
+            return new UpdateResult(affectedRows ?? -1);
+        }
+
+        public override object GetValue(IArrowArray arrowArray, int index)
+        {
+            return base.GetValue(arrowArray, index);
+        }
+
+        sealed class SparkReader : IArrowArrayStream
+        {
+            SparkStatement statement;
+            Schema schema;
+            List<TSparkArrowBatch> batches;
+            int index;
+            IArrowReader reader;
+
+            public SparkReader(SparkStatement statement, Schema schema)
+            {
+                this.statement = statement;
+                this.schema = schema;
+            }
+
+            public Schema Schema { get { return schema; } }
+
+            public async ValueTask<RecordBatch> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+            {
+                while (true)
+                {
+                    if (this.reader != null)
+                    {
+                        RecordBatch next = await 
this.reader.ReadNextRecordBatchAsync(cancellationToken);
+                        if (next != null)
+                        {
+                            return next;
+                        }
+                        this.reader = null;
+                    }
+
+                    if (this.batches != null && this.index < 
this.batches.Count)
+                    {
+                        this.reader = new ArrowStreamReader(new 
ChunkStream(this.schema, this.batches[this.index++].Batch));
+                        continue;
+                    }
+
+                    this.batches = null;
+                    this.index = 0;
+
+                    if (this.statement == null)
+                    {
+                        return null;
+                    }
+
+                    TFetchResultsReq request = new 
TFetchResultsReq(this.statement.operationHandle, TFetchOrientation.FETCH_NEXT, 
50000);
+                    TFetchResultsResp response = await 
this.statement.connection.client.FetchResults(request, cancellationToken);
+                    this.batches = response.Results.ArrowBatches;
+
+                    if (!response.HasMoreRows)
+                    {
+                        this.statement = null;
+                    }
+                }
+            }
+
+            public void Dispose()
+            {
+            }
+        }
+
+
+        sealed class CloudFetchReader : IArrowArrayStream
+        {
+            SparkStatement _statement;
+            Schema _schema;
+            ChunkDownloader _chunkDownloader;
+            IArrowReader _reader;
+
+            public CloudFetchReader(SparkStatement statement, Schema schema)
+            {
+                _statement = statement;
+                _schema = schema;
+                TFetchResultsReq request = new 
TFetchResultsReq(_statement.operationHandle, TFetchOrientation.FETCH_NEXT, 
500000);
+                TFetchResultsResp response = 
this._statement.connection.client.FetchResults(request, cancellationToken: 
default).Result;
+                _chunkDownloader = new 
ChunkDownloader(response.Results.ResultLinks);
+            }
+
+            public Schema Schema { get { return _schema; } }
+
+            public async ValueTask<RecordBatch> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+            {
+                while (true)
+                {
+                    if (_reader != null)
+                    {
+                        RecordBatch next = await 
_reader.ReadNextRecordBatchAsync(cancellationToken);
+                        if (next != null)
+                        {
+                            return next;
+                        }
+                        _reader = null;
+                        if (_chunkDownloader.currentChunkIndex >= 
_chunkDownloader.chunks.Count)
+                        {
+                            _statement = null;
+                        }
+                    }
+
+                    if (_statement == null)
+                    {
+                        return null;
+                    }
+
+                    if (_reader == null)
+                    {
+                        var currentChunk = 
_chunkDownloader.chunks[_chunkDownloader.currentChunkIndex];
+                        while (!currentChunk.isDownloaded)
+                        {
+                            await Task.Delay(500, cancellationToken);
+                        }
+                        _chunkDownloader.currentChunkIndex++;
+                        _reader = currentChunk.reader;
+                    }
+                }
+            }
+
+            public void Dispose()
+            {
+            }
+        }
+    }
+
+    internal class ChunkDownloader
+    {
+        public Dictionary<int, Chunk> chunks;
+
+        public int currentChunkIndex = 0;
+        HttpClient client;
+
+        public ChunkDownloader(List<TSparkArrowResultLink> links)
+        {
+            this.chunks = new Dictionary<int, Chunk>();
+            for (int i = 0; i < links.Count; i++)
+            {
+                var currentChunk = new Chunk(i, links[i].FileLink);
+                this.chunks.Add(i, currentChunk);
+            }
+            this.client = new HttpClient();
+            initialize();
+        }
+
+        public ChunkDownloader(Dictionary<string, Dictionary<string, string>> 
links)
+        {
+            //this.links = links;
+            this.client = new HttpClient();
+        }
+
+        void initialize()

Review Comment:
   nit: this isn't Java :P.



##########
csharp/src/Drivers/Apache/Spark/SparkStatement.cs:
##########
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+    public class SparkStatement : HiveServer2Statement
+    {
+        internal SparkStatement(SparkConnection connection)
+            : base(connection)
+        {
+        }
+
+        protected override void SetStatementProperties(TExecuteStatementReq 
statement)
+        {
+            // TODO: Ensure this is set dynamically depending on server 
capabilities.
+            statement.EnforceResultPersistenceMode = false;
+            statement.ResultPersistenceMode = 2;
+
+            statement.CanReadArrowResult = true;
+            statement.CanDownloadResult = true;
+            statement.ConfOverlay = SparkConnection.timestampConfig;
+            statement.UseArrowNativeTypes = new TSparkArrowTypes
+            {
+                TimestampAsArrow = true,
+                DecimalAsArrow = true,
+                ComplexTypesAsArrow = false,
+                IntervalTypesAsArrow = false,
+            };
+        }
+
+        public override QueryResult ExecuteQuery()
+        {
+            ExecuteStatement();
+            PollForResponse();
+            Schema schema = GetSchema();
+
+            // TODO: Ensure this is set dynamically based on server 
capabilities
+            return new QueryResult(-1, new SparkReader(this, schema));
+            //return new QueryResult(-1, new CloudFetchReader(this, schema));
+        }
+
+        public override UpdateResult ExecuteUpdate()
+        {
+            const string NumberOfAffectedRowsColumnName = "num_affected_rows";
+
+            QueryResult queryResult = ExecuteQuery();
+            using IArrowArrayStream stream = queryResult.Stream;
+
+            // Check if the affected rows columns are returned in the result.
+            Field affectedRowsField = 
stream.Schema.GetFieldByName(NumberOfAffectedRowsColumnName);
+            if (affectedRowsField != null && affectedRowsField.DataType.TypeId 
!= Types.ArrowTypeId.Int64)
+            {
+                throw new AdbcException($"Unexpected data type for column: 
'{NumberOfAffectedRowsColumnName}'", new 
ArgumentException(NumberOfAffectedRowsColumnName));
+            }
+
+            // If no altered rows, i.e. DDC statements, then -1 is the default.
+            long? affectedRows = null;
+            while (true)
+            {
+                using RecordBatch nextBatch = 
stream.ReadNextRecordBatchAsync().Result;
+                if (nextBatch == null) { break; }
+                Int64Array numOfModifiedArray = 
(Int64Array)nextBatch.Column(NumberOfAffectedRowsColumnName);
+                // Note: should only have one item, but iterate for 
completeness
+                for (int i = 0; i < numOfModifiedArray.Length; i++)
+                {
+                    // Note: handle the case where the affected rows are zero 
(0).
+                    affectedRows = (affectedRows ?? 0) + 
numOfModifiedArray.GetValue(i).GetValueOrDefault(0);
+                }
+            }
+
+            return new UpdateResult(affectedRows ?? -1);
+        }
+
+        public override object GetValue(IArrowArray arrowArray, int index)
+        {
+            return base.GetValue(arrowArray, index);
+        }
+
+        sealed class SparkReader : IArrowArrayStream
+        {
+            SparkStatement statement;
+            Schema schema;
+            List<TSparkArrowBatch> batches;
+            int index;
+            IArrowReader reader;
+
+            public SparkReader(SparkStatement statement, Schema schema)
+            {
+                this.statement = statement;
+                this.schema = schema;
+            }
+
+            public Schema Schema { get { return schema; } }
+
+            public async ValueTask<RecordBatch> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+            {
+                while (true)
+                {
+                    if (this.reader != null)
+                    {
+                        RecordBatch next = await 
this.reader.ReadNextRecordBatchAsync(cancellationToken);
+                        if (next != null)
+                        {
+                            return next;
+                        }
+                        this.reader = null;
+                    }
+
+                    if (this.batches != null && this.index < 
this.batches.Count)
+                    {
+                        this.reader = new ArrowStreamReader(new 
ChunkStream(this.schema, this.batches[this.index++].Batch));
+                        continue;
+                    }
+
+                    this.batches = null;
+                    this.index = 0;
+
+                    if (this.statement == null)
+                    {
+                        return null;
+                    }
+
+                    TFetchResultsReq request = new 
TFetchResultsReq(this.statement.operationHandle, TFetchOrientation.FETCH_NEXT, 
50000);
+                    TFetchResultsResp response = await 
this.statement.connection.client.FetchResults(request, cancellationToken);
+                    this.batches = response.Results.ArrowBatches;
+
+                    if (!response.HasMoreRows)
+                    {
+                        this.statement = null;
+                    }
+                }
+            }
+
+            public void Dispose()
+            {
+            }
+        }
+
+
+        sealed class CloudFetchReader : IArrowArrayStream
+        {
+            SparkStatement _statement;
+            Schema _schema;
+            ChunkDownloader _chunkDownloader;
+            IArrowReader _reader;
+
+            public CloudFetchReader(SparkStatement statement, Schema schema)
+            {
+                _statement = statement;
+                _schema = schema;
+                TFetchResultsReq request = new 
TFetchResultsReq(_statement.operationHandle, TFetchOrientation.FETCH_NEXT, 
500000);
+                TFetchResultsResp response = 
this._statement.connection.client.FetchResults(request, cancellationToken: 
default).Result;
+                _chunkDownloader = new 
ChunkDownloader(response.Results.ResultLinks);
+            }
+
+            public Schema Schema { get { return _schema; } }
+
+            public async ValueTask<RecordBatch> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+            {
+                while (true)
+                {
+                    if (_reader != null)
+                    {
+                        RecordBatch next = await 
_reader.ReadNextRecordBatchAsync(cancellationToken);
+                        if (next != null)
+                        {
+                            return next;
+                        }
+                        _reader = null;
+                        if (_chunkDownloader.currentChunkIndex >= 
_chunkDownloader.chunks.Count)
+                        {
+                            _statement = null;
+                        }
+                    }
+
+                    if (_statement == null)
+                    {
+                        return null;
+                    }
+
+                    if (_reader == null)
+                    {
+                        var currentChunk = 
_chunkDownloader.chunks[_chunkDownloader.currentChunkIndex];
+                        while (!currentChunk.isDownloaded)
+                        {
+                            await Task.Delay(500, cancellationToken);
+                        }
+                        _chunkDownloader.currentChunkIndex++;
+                        _reader = currentChunk.reader;
+                    }
+                }
+            }
+
+            public void Dispose()
+            {
+            }
+        }
+    }
+
+    internal class ChunkDownloader
+    {
+        public Dictionary<int, Chunk> chunks;
+
+        public int currentChunkIndex = 0;
+        HttpClient client;
+
+        public ChunkDownloader(List<TSparkArrowResultLink> links)
+        {
+            this.chunks = new Dictionary<int, Chunk>();
+            for (int i = 0; i < links.Count; i++)
+            {
+                var currentChunk = new Chunk(i, links[i].FileLink);
+                this.chunks.Add(i, currentChunk);
+            }
+            this.client = new HttpClient();
+            initialize();
+        }
+
+        public ChunkDownloader(Dictionary<string, Dictionary<string, string>> 
links)
+        {
+            //this.links = links;
+            this.client = new HttpClient();
+        }
+
+        void initialize()
+        {
+            int workerThreads, completionPortThreads;
+            ThreadPool.GetMinThreads(out workerThreads, out 
completionPortThreads);
+            ThreadPool.SetMinThreads(5, completionPortThreads);
+            ThreadPool.SetMaxThreads(10, completionPortThreads);
+            foreach (KeyValuePair<int, Chunk> chunk in chunks)
+            {
+                ThreadPool.QueueUserWorkItem(async _ =>
+                {
+                    try
+                    {
+                        await chunk.Value.downloadData(this.client);
+                    }
+                    catch (Exception e)
+                    {
+                        Console.WriteLine(e);
+                    }
+                });
+            }
+        }
+    }
+
+    public class Chunk
+    {
+        int chunkId;
+        string chunkUrl;
+        Dictionary<string, string> headers;
+        public bool isDownloaded = false;
+        public bool isFailed = false;
+        public IArrowReader reader;
+
+        public Chunk(int chunkId, string chunkUrl) : this(chunkId, chunkUrl, 
new Dictionary<string, string>())
+        {
+        }
+
+        public Chunk(int chunkId, string chunkUrl, Dictionary<string, string> 
headers)
+        {
+            this.chunkId = chunkId;
+            this.chunkUrl = chunkUrl;
+            this.headers = headers;
+            this.reader = null;
+        }
+
+        public async Task downloadData(HttpClient client)

Review Comment:
   nit: casing



##########
csharp/src/Drivers/Apache/Thrift/SchemaParser.cs:
##########
@@ -0,0 +1,81 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using Apache.Arrow.Types;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache
+{
+    internal class SchemaParser
+    {
+        internal static Schema GetArrowSchema(TTableSchema thriftSchema)
+        {
+            Field[] fields = new Field[thriftSchema.Columns.Count];
+            for (int i = 0; i < thriftSchema.Columns.Count; i++)
+            {
+                TColumnDesc column = thriftSchema.Columns[i];
+                fields[i] = new Field(column.ColumnName, 
GetArrowType(column.TypeDesc.Types[0]), nullable: true /* ??? */);

Review Comment:
   The Thrift type doesn't include a nullable indicator?



##########
csharp/src/Drivers/Apache/Thrift/SchemaParser.cs:
##########
@@ -0,0 +1,81 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using Apache.Arrow.Types;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache
+{
+    internal class SchemaParser
+    {
+        internal static Schema GetArrowSchema(TTableSchema thriftSchema)
+        {
+            Field[] fields = new Field[thriftSchema.Columns.Count];
+            for (int i = 0; i < thriftSchema.Columns.Count; i++)
+            {
+                TColumnDesc column = thriftSchema.Columns[i];
+                fields[i] = new Field(column.ColumnName, 
GetArrowType(column.TypeDesc.Types[0]), nullable: true /* ??? */);
+            }
+            return new Schema(fields, null);
+        }
+
+        static IArrowType GetArrowType(TTypeEntry thriftType)
+        {
+            if (thriftType.PrimitiveEntry != null)
+            {
+                return GetArrowType(thriftType.PrimitiveEntry);
+            }
+            throw new InvalidOperationException();

Review Comment:
   For gaps like "doesn't support structured types", please make sure there are 
issues filed in GitHub. For major gaps (this may be one) it would also be good 
to call them out in a README.



##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs:
##########
@@ -0,0 +1,209 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+using Thrift.Protocol;
+using Thrift.Transport;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
+{
+    public abstract class HiveServer2Connection : AdbcConnection
+    {
+        const string userAgent = "AdbcExperimental/0.0";
+
+        protected TOperationHandle operationHandle;
+        protected IReadOnlyDictionary<string, string> properties;
+        internal TTransport transport;
+        internal TCLIService.Client client;
+        internal TSessionHandle sessionHandle;
+
+        internal HiveServer2Connection() : this(null)
+        {
+
+        }
+
+        internal HiveServer2Connection(IReadOnlyDictionary<string, string> 
properties)
+        {
+            this.properties = properties;
+        }
+
+        public void Open()
+        {
+            TProtocol protocol = CreateProtocol();
+            this.transport = protocol.Transport;
+            this.client = new TCLIService.Client(protocol);
+
+            var s0 = this.client.OpenSession(CreateSessionRequest()).Result;
+            this.sessionHandle = s0.SessionHandle;
+        }
+
+        protected abstract TProtocol CreateProtocol();
+        protected abstract TOpenSessionReq CreateSessionRequest();
+
+        public override IArrowArrayStream GetObjects(GetObjectsDepth depth, 
string catalogPattern, string dbSchemaPattern, string tableNamePattern, 
List<string> tableTypes, string columnNamePattern)

Review Comment:
   This appears to be quite incomplete. Consider removing it from this PR and 
submitting separately once it's complete and tested.



##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs:
##########
@@ -0,0 +1,209 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+using Thrift.Protocol;
+using Thrift.Transport;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
+{
+    public abstract class HiveServer2Connection : AdbcConnection
+    {
+        const string userAgent = "AdbcExperimental/0.0";
+
+        protected TOperationHandle operationHandle;
+        protected IReadOnlyDictionary<string, string> properties;
+        internal TTransport transport;
+        internal TCLIService.Client client;
+        internal TSessionHandle sessionHandle;
+
+        internal HiveServer2Connection() : this(null)
+        {
+
+        }
+
+        internal HiveServer2Connection(IReadOnlyDictionary<string, string> 
properties)
+        {
+            this.properties = properties;
+        }
+
+        public void Open()
+        {
+            TProtocol protocol = CreateProtocol();
+            this.transport = protocol.Transport;
+            this.client = new TCLIService.Client(protocol);
+
+            var s0 = this.client.OpenSession(CreateSessionRequest()).Result;
+            this.sessionHandle = s0.SessionHandle;
+        }
+
+        protected abstract TProtocol CreateProtocol();
+        protected abstract TOpenSessionReq CreateSessionRequest();
+
+        public override IArrowArrayStream GetObjects(GetObjectsDepth depth, 
string catalogPattern, string dbSchemaPattern, string tableNamePattern, 
List<string> tableTypes, string columnNamePattern)
+        {
+            Dictionary<string, Dictionary<string, Dictionary<string, 
List<string>>>> catalogMap = new Dictionary<string, Dictionary<string, 
Dictionary<string, List<string>>>>();
+            if (depth == GetObjectsDepth.All || depth >= 
GetObjectsDepth.Catalogs)
+            {
+                TGetCatalogsReq getCatalogsReq = new 
TGetCatalogsReq(this.sessionHandle);
+            }
+
+            if (depth == GetObjectsDepth.All || depth >= 
GetObjectsDepth.DbSchemas)
+            {
+                TGetSchemasReq getSchemasReq = new 
TGetSchemasReq(this.sessionHandle);
+            }
+
+            if (depth == GetObjectsDepth.All || depth >= 
GetObjectsDepth.Tables)
+            {
+                TGetTablesReq getTablesReq = new 
TGetTablesReq(this.sessionHandle);
+            }
+
+            if (depth == GetObjectsDepth.All)
+            {
+                TGetColumnsReq columnsReq = new 
TGetColumnsReq(this.sessionHandle);
+                columnsReq.CatalogName = catalogPattern;
+                columnsReq.SchemaName = dbSchemaPattern;
+                columnsReq.TableName = tableNamePattern;
+
+                if (!string.IsNullOrEmpty(columnNamePattern))
+                    columnsReq.ColumnName = columnNamePattern;
+
+                var columnsResponse = 
this.client.GetColumns(columnsReq).Result;
+                if (columnsResponse.Status.StatusCode == 
TStatusCode.ERROR_STATUS)
+                {
+                    throw new Exception(columnsResponse.Status.ErrorMessage);
+                }
+
+                this.operationHandle = columnsResponse.OperationHandle;
+            }
+
+            PollForResponse();
+
+            Schema schema = GetSchema();
+
+            return new GetObjectsReader(this,schema);
+        }
+
+        public override IArrowArrayStream GetInfo(List<int> codes)
+        {
+            throw new NotImplementedException();
+        }
+
+        public override IArrowArrayStream GetTableTypes()
+        {
+            throw new NotImplementedException();
+        }
+
+        protected void PollForResponse()
+        {
+            TGetOperationStatusResp statusResponse = null;
+            do
+            {
+                if (statusResponse != null) { Thread.Sleep(500); }

Review Comment:
   This is a good reminder for me that we really need a more async-friendly 
API, ideally a cross-process one.



##########
csharp/src/Drivers/Apache/Spark/SparkStatement.cs:
##########
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Ipc;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
+{
+    public class SparkStatement : HiveServer2Statement
+    {
+        internal SparkStatement(SparkConnection connection)
+            : base(connection)
+        {
+        }
+
+        protected override void SetStatementProperties(TExecuteStatementReq 
statement)
+        {
+            // TODO: Ensure this is set dynamically depending on server 
capabilities.
+            statement.EnforceResultPersistenceMode = false;
+            statement.ResultPersistenceMode = 2;
+
+            statement.CanReadArrowResult = true;
+            statement.CanDownloadResult = true;
+            statement.ConfOverlay = SparkConnection.timestampConfig;
+            statement.UseArrowNativeTypes = new TSparkArrowTypes
+            {
+                TimestampAsArrow = true,
+                DecimalAsArrow = true,
+                ComplexTypesAsArrow = false,
+                IntervalTypesAsArrow = false,
+            };
+        }
+
+        public override QueryResult ExecuteQuery()
+        {
+            ExecuteStatement();
+            PollForResponse();
+            Schema schema = GetSchema();
+
+            // TODO: Ensure this is set dynamically based on server 
capabilities
+            return new QueryResult(-1, new SparkReader(this, schema));
+            //return new QueryResult(-1, new CloudFetchReader(this, schema));
+        }
+
+        public override UpdateResult ExecuteUpdate()
+        {
+            const string NumberOfAffectedRowsColumnName = "num_affected_rows";
+
+            QueryResult queryResult = ExecuteQuery();
+            using IArrowArrayStream stream = queryResult.Stream;
+
+            // Check if the affected rows columns are returned in the result.
+            Field affectedRowsField = 
stream.Schema.GetFieldByName(NumberOfAffectedRowsColumnName);
+            if (affectedRowsField != null && affectedRowsField.DataType.TypeId 
!= Types.ArrowTypeId.Int64)
+            {
+                throw new AdbcException($"Unexpected data type for column: 
'{NumberOfAffectedRowsColumnName}'", new 
ArgumentException(NumberOfAffectedRowsColumnName));
+            }
+
+            // If no altered rows, i.e. DDC statements, then -1 is the default.
+            long? affectedRows = null;
+            while (true)
+            {
+                using RecordBatch nextBatch = 
stream.ReadNextRecordBatchAsync().Result;
+                if (nextBatch == null) { break; }
+                Int64Array numOfModifiedArray = 
(Int64Array)nextBatch.Column(NumberOfAffectedRowsColumnName);
+                // Note: should only have one item, but iterate for 
completeness
+                for (int i = 0; i < numOfModifiedArray.Length; i++)
+                {
+                    // Note: handle the case where the affected rows are zero 
(0).
+                    affectedRows = (affectedRows ?? 0) + 
numOfModifiedArray.GetValue(i).GetValueOrDefault(0);
+                }
+            }
+
+            return new UpdateResult(affectedRows ?? -1);
+        }
+
+        public override object GetValue(IArrowArray arrowArray, int index)
+        {
+            return base.GetValue(arrowArray, index);
+        }
+
+        sealed class SparkReader : IArrowArrayStream
+        {
+            SparkStatement statement;
+            Schema schema;
+            List<TSparkArrowBatch> batches;
+            int index;
+            IArrowReader reader;
+
+            public SparkReader(SparkStatement statement, Schema schema)
+            {
+                this.statement = statement;
+                this.schema = schema;
+            }
+
+            public Schema Schema { get { return schema; } }
+
+            public async ValueTask<RecordBatch> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+            {
+                while (true)
+                {
+                    if (this.reader != null)
+                    {
+                        RecordBatch next = await 
this.reader.ReadNextRecordBatchAsync(cancellationToken);
+                        if (next != null)
+                        {
+                            return next;
+                        }
+                        this.reader = null;
+                    }
+
+                    if (this.batches != null && this.index < 
this.batches.Count)
+                    {
+                        this.reader = new ArrowStreamReader(new 
ChunkStream(this.schema, this.batches[this.index++].Batch));
+                        continue;
+                    }
+
+                    this.batches = null;
+                    this.index = 0;
+
+                    if (this.statement == null)
+                    {
+                        return null;
+                    }
+
+                    TFetchResultsReq request = new 
TFetchResultsReq(this.statement.operationHandle, TFetchOrientation.FETCH_NEXT, 
50000);
+                    TFetchResultsResp response = await 
this.statement.connection.client.FetchResults(request, cancellationToken);
+                    this.batches = response.Results.ArrowBatches;
+
+                    if (!response.HasMoreRows)
+                    {
+                        this.statement = null;
+                    }
+                }
+            }
+
+            public void Dispose()
+            {
+            }
+        }
+
+
+        sealed class CloudFetchReader : IArrowArrayStream

Review Comment:
   CloudFetchReader and ChunkDownloader don't appear to be complete or used 
yet. Consider removing them from this PR and re-adding them once they are 
working and tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to