davlee1972 commented on issue #1893:
URL: https://github.com/apache/arrow-adbc/issues/1893#issuecomment-2146291707
I created a temporary hack pyarrow.RecordBatchReader and
pyarrow.dataset.Scanner to fix this..
Here's the error example: Attempt to read from a stream that has already
been closed
```
def get_reader():
cursor = snowflake.source.conn.cursor()
cursor.execute("select * from FUND_FLOWS limit 10")
return cursor.fetch_record_batch()
my_reader = get_reader()
my_reader.read_all()
```
```
---------------------------------------------------------------------------
ArrowInvalid Traceback (most recent call last)
Cell In[6], line 7
4 return cursor.fetch_record_batch()
6 my_reader = get_reader()
----> 7 my_reader.read_all()
File
[~/miniconda3/lib/python3.9/site-packages/pyarrow/ipc.pxi:757](http://blkbhca006:8888/lab/tree/tutorials/~/miniconda3/lib/python3.9/site-packages/pyarrow/ipc.pxi#line=756),
in pyarrow.lib.RecordBatchReader.read_all()
File
[~/miniconda3/lib/python3.9/site-packages/pyarrow/error.pxi:91](http://blkbhca006:8888/lab/tree/tutorials/~/miniconda3/lib/python3.9/site-packages/pyarrow/error.pxi#line=90),
in pyarrow.lib.check_status()
ArrowInvalid: Attempt to read from a stream that has already been closed
```
Here's my temporary hack to create replacement classes for RecordBatchReader
and Scanner...
```
class Object_cursor(object):
def __init__(self, parent, cursor):
self.parent = parent
self.cursor = cursor
def __getitem__(self, item):
result = self.parent[item]
return result
def __getattr__(self, item):
result = getattr(self.parent, item)
return result
def __repr__(self):
return repr(self.parent)
def __dir__(self):
__all__ = dir(self.parent)
__all__.append("cursor")
return __all__
class RecordBatchReader_cursor(Object_cursor):
"""pyarrow.RecordBatchReader with ADBC Cursor"""
class Scanner_cursor(Object_cursor):
"""pyarrow.dataset.Scanner with ADBC Cursor"""
```
Now my get_reader() function works..
```
def get_reader():
cursor = snowflake.source.conn.cursor()
cursor.execute("select * from FUND_FLOWS limit 10")
return RecordBatchReader_cursor(cursor.fetch_record_batch(), cursor)
my_reader = get_reader()
my_reader.read_all()
```
```
pyarrow.Table
cusip: string
start_date: date32[day]
end_date: date32[day]
purpose: string
source: string
flow: double
flow_usd: double
currency: string
----
cusip:
[["BRSS04E70","BRSS04EB1","BRSS04EE5","BRSS04EK1","BRSS04EL9","BRSS04EM7","BRSS04EN5","BRSS04EQ8","BRSS04EV7","BRSS04EW5"]]
start_date:
[[2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21]]
end_date:
[[2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22]]
purpose:
[["DAILY","DAILY","DAILY","DAILY","DAILY","DAILY","DAILY","DAILY","DAILY","DAILY"]]
source:
[["MSTAR","MSTAR","MSTAR","MSTAR","MSTAR","MSTAR","MSTAR","MSTAR","MSTAR","MSTAR"]]
flow:
[[2.0206298828125,3520657.5,0,-1638.47998046875,55.05837631225586,0,-15.669054985046387,237385.4375,4411420.5,3.951692581176758]]
flow_usd:
[[0.6306782364845276,1098866.25,0,-1841.8153076171875,17.184799194335938,0,-4.890619277954102,266844.96875,4958877.5,1.2334007024765015]]
currency: [["BRL","BRL","EUR","EUR","BRL","KRW","BRL","EUR","EUR","BRL"]]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]