davlee1972 commented on issue #1893:
URL: https://github.com/apache/arrow-adbc/issues/1893#issuecomment-2146291707

   I created a temporary hack pyarrow.RecordBatchReader and 
pyarrow.dataset.Scanner to fix this..
   
   Here's the error example: Attempt to read from a stream that has already 
been closed
   
   ```
   def get_reader():
       cursor = snowflake.source.conn.cursor()
       cursor.execute("select * from FUND_FLOWS limit 10")
       return cursor.fetch_record_batch()
   
   my_reader = get_reader()
   my_reader.read_all()
   ```
   ```
   ---------------------------------------------------------------------------
   ArrowInvalid                              Traceback (most recent call last)
   Cell In[6], line 7
         4     return cursor.fetch_record_batch()
         6 my_reader = get_reader()
   ----> 7 my_reader.read_all()
   
   File 
[~/miniconda3/lib/python3.9/site-packages/pyarrow/ipc.pxi:757](http://blkbhca006:8888/lab/tree/tutorials/~/miniconda3/lib/python3.9/site-packages/pyarrow/ipc.pxi#line=756),
 in pyarrow.lib.RecordBatchReader.read_all()
   
   File 
[~/miniconda3/lib/python3.9/site-packages/pyarrow/error.pxi:91](http://blkbhca006:8888/lab/tree/tutorials/~/miniconda3/lib/python3.9/site-packages/pyarrow/error.pxi#line=90),
 in pyarrow.lib.check_status()
   
   ArrowInvalid: Attempt to read from a stream that has already been closed
   ```
   Here's my temporary hack to create replacement classes for RecordBatchReader 
and Scanner...
   ```
   class Object_cursor(object):
       def __init__(self, parent, cursor):
           self.parent = parent
           self.cursor = cursor
       def __getitem__(self, item):
           result = self.parent[item]
           return result
       def __getattr__(self, item):
           result = getattr(self.parent, item)
           return result
       def __repr__(self):
           return repr(self.parent)
       def __dir__(self):
           __all__ = dir(self.parent)
           __all__.append("cursor")
           return __all__
   
   class RecordBatchReader_cursor(Object_cursor):
       """pyarrow.RecordBatchReader with ADBC Cursor"""
   
   class Scanner_cursor(Object_cursor):
       """pyarrow.dataset.Scanner with ADBC Cursor"""
   ```
   
   Now my get_reader() function works..
   ```
   def get_reader():
       cursor = snowflake.source.conn.cursor()
       cursor.execute("select * from FUND_FLOWS limit 10")
       return RecordBatchReader_cursor(cursor.fetch_record_batch(), cursor)
   
   my_reader = get_reader()
   my_reader.read_all()
   ```
   ```
   pyarrow.Table
   cusip: string
   start_date: date32[day]
   end_date: date32[day]
   purpose: string
   source: string
   flow: double
   flow_usd: double
   currency: string
   ----
   cusip: 
[["BRSS04E70","BRSS04EB1","BRSS04EE5","BRSS04EK1","BRSS04EL9","BRSS04EM7","BRSS04EN5","BRSS04EQ8","BRSS04EV7","BRSS04EW5"]]
   start_date: 
[[2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21,2016-09-21]]
   end_date: 
[[2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22,2016-09-22]]
   purpose: 
[["DAILY","DAILY","DAILY","DAILY","DAILY","DAILY","DAILY","DAILY","DAILY","DAILY"]]
   source: 
[["MSTAR","MSTAR","MSTAR","MSTAR","MSTAR","MSTAR","MSTAR","MSTAR","MSTAR","MSTAR"]]
   flow: 
[[2.0206298828125,3520657.5,0,-1638.47998046875,55.05837631225586,0,-15.669054985046387,237385.4375,4411420.5,3.951692581176758]]
   flow_usd: 
[[0.6306782364845276,1098866.25,0,-1841.8153076171875,17.184799194335938,0,-4.890619277954102,266844.96875,4958877.5,1.2334007024765015]]
   currency: [["BRL","BRL","EUR","EUR","BRL","KRW","BRL","EUR","EUR","BRL"]]
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to