hu6360567 opened a new issue, #36443:
URL: https://github.com/apache/arrow/issues/36443
### Describe the bug, including details regarding any error messages,
version, and platform.
I'm trying to import/export data to database in python through `ArrayStream`
over pyarrow.jvm and JDBC.
In order to export ArrowVectorIterator as stream without unloading to
RecordBatch on java side before it export to stream, I wrap ArrowVectorIterator
into ArrowReader as below:
```java
public class ArrowVectorIteratorReader extends ArrowReader {
private final Iterator<VectorSchemaRoot> iterator;
private final Schema schema;
private VectorSchemaRoot root;
public ArrowVectorIteratorReader(BufferAllocator allocator,
Iterator<VectorSchemaRoot> iterator, Schema schema) {
super(allocator);
this.iterator = iterator;
this.schema = schema;
this.root = null;
}
@Override
public VectorSchemaRoot getVectorSchemaRoot() throws IOException {
if (root == null) return super.getVectorSchemaRoot();
return root;
}
@Override
public boolean loadNextBatch() throws IOException {
if (iterator.hasNext()) {
// if (root != null) root.close(); if root is not reuse, it
has to be close manually
root = iterator.next();
return true;
} else {
return false;
}
}
@Override
public long bytesRead() {
return 0;
}
@Override
protected void closeReadSource() throws IOException {
if (iterator instanceof AutoCloseable) {
try {
((AutoCloseable) iterator).close();
} catch (Exception e) {
throw new IOException(e);
}
}
root.close();
}
@Override
protected Schema readSchema() throws IOException {
return schema;
}
}
```
When ArrowVectorIterator use the config with `reuseVectorSchemaRoot` is
enabled, utf8 array may crushed on python side, but works as expectred on java
side.
Java code as below
```java
try (final ArrowReader source = porter.importData(1); returns
ArrowVectorIteratorReader with batchSize=1
final ArrowArrayStream stream =
ArrowArrayStream.allocateNew(allocator)) {
Data.exportArrayStream(allocator, source, stream);
try (final ArrowReader reader =
Data.importArrayStream(allocator, stream)) {
while (reader.loadNextBatch()) {
// root from getVectorSchemaRoot() is legal on every
vector
totalRecord +=
reader.getVectorSchemaRoot().getRowCount();
}
}
}
```
On Python side, the situation is unexplainable.
The exported stream from Java in wrapped into a RecordBatchReader and write
into different file formats.
```python
def wrap_from_java_stream_to_generator(java_arrow_stream, allocator=None,
yield_schema=False):
if allocator is None:
allocator = get_java_root_allocator().allocator
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
org = jpype. JPackage("org")
java_wrapped_stream =
org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)
org.apache.arrow.c.Data.exportArrayStream(allocator, java_arrow_stream,
java_wrapped_stream)
# noinspection PyProtectedMember
with pa. RecordBatchReader._import_from_c(c_stream_ptr) as reader: #
type: pa. RecordBatchReader
if yield_schema:
yield reader.schema
yield from reader
def wrap_from_java_stream(java_arrow_stream, allocator=None):
generator = wrap_from_java_stream_to_generator(java_arrow_stream,
allocator, yield_schema=True)
schema = next(generator)
return pa. RecordBatchReader.from_batches(schema, generator)
```
For CSV, works as expected
```python
with wrap_from_java_stream(java_arrow_stream, allocator) as stream:
with pa.csv.CSVWriter(csv_path, stream.schema) as writer:
for record_batch in stream:
writer.write_batch(record_batch)
```
For Parquet, writing with dataset api as below
```python
with wrap_from_java_stream(java_arrow_stream, allocator) as stream:
pa.dataset.write_dataset(stream, data_path, format="parquet")
```
```
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
......./python3.8/site-packages/pyarrow/dataset.py:999: in write_dataset
_filesystemdataset_write(
pyarrow/_dataset.pyx:3655: in pyarrow._dataset._filesystemdataset_write
???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
> ???
E pyarrow.lib.ArrowInvalid: Parquet cannot store strings with size 2GB or
more
```
In order to making out which record raises error, RecordBatchReader is
wrapped into a smaller batch size and log the content as below:
```python
with wrap_from_java_stream(java_arrow_stream, allocator) as stream:
def generator():
for rb in stream:
for i in range(rb.num_rows):
slice = rb.slice(i,1)
logger.info(slice.to_pylist())
yield slice
pa.dataset.write_dataset(pa.RecordBatchReader.from_batches(stream.schema,
generator(), data_path, format="parquet")
```
Although the logger can print the slice, but write_dataset fails
```
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
......./python3.8/site-packages/pyarrow/dataset.py:999: in write_dataset
_filesystemdataset_write(
pyarrow/_dataset.pyx:3655: in pyarrow._dataset._filesystemdataset_write
???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
> ???
E pyarrow.lib.ArrowInvalid: Column 1: In chunk 0: Invalid: First or last
binary offset out of bounds
```
For arrow/feather format, it seems directly write record_batch into files,
but when record_batch is invalid when reading from file (code is similar as
above)
Then, if I create the ArrowVectorIteratorReader without
reuseVectorSchemaRoot, everything works fine on Python side (need to close the
previous root each time on loadNext, to avoid memory leak).
Also, if I uncomment the code to close previous root when
reuseVectorSchemaRoot is enabled, it got another error
```
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
xxxxxx/python3.8/site-packages/pyarrow/dataset.py:999: in write_dataset
_filesystemdataset_write(
pyarrow/_dataset.pyx:3655: in pyarrow._dataset._filesystemdataset_write
???
server/jvm_wrapper/test_data_porter.py:176: in generator
for rb in reader:
pyarrow/ipc.pxi:657: in __iter__
???
pyarrow/ipc.pxi:693: in pyarrow.lib.RecordBatchReader.read_next_batch
???
../fair_panama/server/jvm_wrapper/stream_wrapper.py:30: in
wrap_from_java_stream_to_generator
yield from reader
pyarrow/ipc.pxi:657: in __iter__
???
pyarrow/ipc.pxi:693: in pyarrow.lib.RecordBatchReader.read_next_batch
???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
> ???
E OSError: java.lang.RuntimeException: Error occurred while getting next
schema root.
E at
org.apache.arrow.adapter.jdbc.ArrowVectorIterator.next(ArrowVectorIterator.java:181)
E at
org.apache.arrow.adapter.jdbc.ArrowVectorIterator.next(ArrowVectorIterator.java:40)
E at
org.alipay.fair.panama.server_wrapper.porter.ArrowVectorIteratorReader.loadNextBatch(ArrowVectorIteratorReader.java:37)
E at
org.alipay.fair.panama.server_wrapper.porter.JdbcDatasetPorter$AutoClosableWrapperReader.loadNextBatch(JdbcDatasetPorter.java:117)
E at
org.apache.arrow.c.ArrayStreamExporter$ExportedArrayStreamPrivateData.getNext(ArrayStreamExporter.java:72)
E Caused by: java.lang.RuntimeException: Error occurred while consuming
data.
E at
org.apache.arrow.adapter.jdbc.ArrowVectorIterator.consumeData(ArrowVectorIterator.java:117)
E at
org.apache.arrow.adapter.jdbc.ArrowVectorIterator.load(ArrowVectorIterator.java:159)
E at
org.apache.arrow.adapter.jdbc.ArrowVectorIterator.next(ArrowVectorIterator.java:177)
E ... 4 more
E Caused by: java.lang.IndexOutOfBoundsException: index: 0, length: 1
(expected: range(0, 0))
E at org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
E at org.apache.arrow.memory.ArrowBuf.chk(ArrowBuf.java:306)
E at org.apache.arrow.memory.ArrowBuf.getByte(ArrowBuf.java:508)
E at
org.apache.arrow.vector.BitVectorHelper.setBit(BitVectorHelper.java:82)
E at org.apache.arrow.vector.IntVector.set(IntVector.java:167)
E at
org.apache.arrow.adapter.jdbc.consumer.IntConsumer$NonNullableIntConsumer.consume(IntConsumer.java:83)
E at
org.apache.arrow.adapter.jdbc.consumer.CompositeJdbcConsumer.consume(CompositeJdbcConsumer.java:46)
E at
org.apache.arrow.adapter.jdbc.ArrowVectorIterator.consumeData(ArrowVectorIterator.java:106)
E ... 6 more
pyarrow/error.pxi:115: OSError
```
### Component(s)
Java, Python
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]