pdet opened a new issue #1329:
URL: https://github.com/apache/arrow-datafusion/issues/1329


   **Describe the bug**
   Hi all, 
   I'm currently writing up a blog post on our recent DuckDB/Arrow integration. 
I was recommended  to check your project since 
   (I think) DuckDB and Datafusion are the only systems out there consuming and 
producing memory Arrow Objects  :-)
   
   I just have a couple of questions and a small bug report.
   
   1. It seems to me that Datafusion is not consuming streaming arrow objects 
(i.e., not fully materialized objects), is that correct? 
   2. Is datafusion already running in parallel?
   3. Is there a specific batch size that is more attractive to the datafusion 
engine?
   
   I've managed to install v 0.4  of datafusion/python, from the sources and 
run the TPC-H Q6. I wanted to check with you, if the numbers I'm getting are 
expected, or if I did something wrong when experimenting with it (The code I 
used is under the reproduce)
   
   DataFusion: 0.20
   DuckDB: 0.046753292
   
   **The bug:** One last thing, when trying to generate RecordBatches with 
different sizes 
   e.g.:
   ``` python
   pq.read_table('lineitemsf1.snappy.parquet').to_batches(1024)
   ```
   I hit the following exception:
   ``` thread 'thread 'tokio-runtime-workertokio-runtime-worker' panicked at '' 
panicked at 'the offset of the new Buffer cannot exceed the existing lengththe 
offset of the new Buffer cannot exceed the existing length', ', thread 
'/Users/holanda/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-6.2.0/src/buffer/immutable.rs/Users/holanda/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-6.2.0/src/buffer/immutable.rs::tokio-runtime-worker142142'
 panicked at '::assertion failed: ceil(offset + len, 8) <= buffer.len() * 899', 
   
   note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
   
/Users/holanda/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-6.2.0/src/util/bit_chunk_iterator.rs:33:9
   thread 'tokio-runtime-worker' panicked at 'the offset of the new Buffer 
cannot exceed the existing length', 
/Users/holanda/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-6.2.0/src/buffer/immutable.rs:142:9
   thread 'tokio-runtime-worker' panicked at 'the offset of the new Buffer 
cannot exceed the existing length', 
/Users/holanda/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-6.2.0/src/buffer/immutable.rs:142:9
   thread 'tokio-runtime-worker' panicked at 'the offset of the new Buffer 
cannot exceed the existing length', 
/Users/holanda/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-6.2.0/src/buffer/immutable.rs:142:9
   thread 'tokio-runtime-worker' panicked at 'the offset of the new Buffer 
cannot exceed the existing length', 
/Users/holanda/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-6.2.0/src/buffer/immutable.rs:142:9
   Traceback (most recent call last):
     File "/Users/holanda/Documents/Projects/tryit.py", line 31, in <module>
       result_df = ctx.sql(query).collect()
   Exception: Arrow error: External error: oneshot canceled
   ```
   
   Thanks for your time! Always great to learn a bit about the other db engines 
out there :-)
   
   **To Reproduce**
   ```python
   from datafusion import ExecutionContext
   import pyarrow.parquet as pq
   import pyarrow as pa
   import time
   import duckdb
   
   
   # wget -q 
https://github.com/cwida/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet
   ctx = ExecutionContext()
   conn = duckdb.connect()
   
   query = """SELECT sum(l_extendedprice * l_discount) AS revenue
               FROM
                   lineitem
               WHERE
                   l_shipdate >= CAST('1994-01-01' AS date)
                   AND l_shipdate < CAST('1995-01-01' AS date)
                   AND l_discount BETWEEN 0.05
                   AND 0.07
                   AND l_quantity < 24; """
   
   # for i in range (10,20):
   #     batch_size = pow (2,i)
   batch_size = '-'
   lineitem_fusion = pq.read_table('lineitemsf1.snappy.parquet').to_batches()
   lineitem = pa.Table.from_batches(lineitem_fusion)
   
   start_time = time.monotonic()
   ctx.register_record_batches("lineitem", [lineitem_fusion])
   result_df = ctx.sql(query).collect()
   print("DataFusion ("+str(batch_size)+"): "+ str(time.monotonic()-start_time))
   
   
   start_time = time.monotonic()
   result_duck =  conn.execute(query).fetchall()
   print("DuckDB ("+str(batch_size)+"): "+ str(time.monotonic()-start_time))
   
   print (result_df[0].column(0))
   print (result_duck)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to