[
https://issues.apache.org/jira/browse/ARROW-13481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17389079#comment-17389079
]
abeel walga commented on ARROW-13481:
-------------------------------------
setting row count on batch resolves the issue
{code:java}
root.setRowCount(l);{code}
> [JAVA][VECTOR]: stream write/read when writing batches of different row count
> -----------------------------------------------------------------------------
>
> Key: ARROW-13481
> URL: https://issues.apache.org/jira/browse/ARROW-13481
> Project: Apache Arrow
> Issue Type: Bug
> Components: Java
> Reporter: abeel walga
> Priority: Major
>
> I am assuming ArrowStreamWriter/ArrowStreamReader support writing batches
> with different record count.
> When reading from VectorSchemaRoot readBatch = reader.getVectorSchemaRoot(),
> readBatch.getRowCount() returns the max of all batch row count.
> This limit the ability to loop on the vectors contained in the batch.
> A sample code:
> {code:java}
> package io.scribe.blckch.core.io;
> import org.apache.arrow.memory.RootAllocator;
> import org.apache.arrow.vector.BitVector;
> import org.apache.arrow.vector.FieldVector;
> import org.apache.arrow.vector.VarCharVector;
> import org.apache.arrow.vector.VectorSchemaRoot;
> import org.apache.arrow.vector.ipc.ArrowStreamReader;
> import org.apache.arrow.vector.ipc.ArrowStreamWriter;
> import org.apache.arrow.vector.types.pojo.Field;
> import java.io.ByteArrayInputStream;
> import java.io.ByteArrayOutputStream;
> import java.nio.channels.Channels;
> import java.nio.charset.StandardCharsets;
> import java.util.Arrays;
> import java.util.List;
> public class Arrow {
> public static void main(String[] args) throws Exception {
> RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
> BitVector bitVector = new BitVector("boolean", allocator);
> VarCharVector varCharVector = new VarCharVector("varchar", allocator);
> bitVector.allocateNew();
> varCharVector.allocateNew();
> int l0 = Double.valueOf(Math.floor(Math.random() * 10)).intValue() +
> 1;
> System.out.println("Write Batch " + l0 + " length " + l0);
> for (int j = 0; j < l0; ++j) {
> bitVector.setSafe(j, j);
> varCharVector.setSafe(j, String.format("char-%s-%s", 0,
> j).getBytes(StandardCharsets.UTF_8));
> }
> bitVector.setValueCount(l0);
> varCharVector.setValueCount(l0);
> List<Field> fields = Arrays.asList(bitVector.getField(),
> varCharVector.getField());
> List<FieldVector> vectors = Arrays.asList(bitVector, varCharVector);
> VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> try (ArrowStreamWriter writer = new ArrowStreamWriter(root,
> /*DictionaryProvider=*/null, Channels.newChannel(out))) {
> writer.start();
> writer.writeBatch();
> for (int i = 1; i < 5; ++i) {
> final BitVector bits = (BitVector) root.getVector(0);
> final VarCharVector chars = (VarCharVector) root.getVector(1);
> bits.reset();
> chars.reset();
> int l = Double.valueOf(Math.floor(Math.random() *
> 10)).intValue() + 1;
> System.out.println("Write Batch " + i + " length " + l);
> for (int j = 0; j < l; ++j) {
> bits.setSafe(j, j);
> chars.setSafe(j, String.format("char-%s-%s", i,
> j).getBytes(StandardCharsets.UTF_8));
> }
> bits.setValueCount(l);
> chars.setValueCount(l);
> writer.writeBatch();
> }
> writer.end();
> }
> try (ArrowStreamReader reader = new ArrowStreamReader(new
> ByteArrayInputStream(out.toByteArray()), allocator)) {
> for (int i = 0; i < 5; i++) {
> // This will be loaded with new values on every call to
> loadNextBatch
> reader.loadNextBatch();
> VectorSchemaRoot readBatch = reader.getVectorSchemaRoot();
> final BitVector bits = (BitVector) readBatch.getVector(0);
> final VarCharVector chars = (VarCharVector)
> readBatch.getVector(1);
> int l = readBatch.getRowCount();
> // int l = bits.getValueCount();
> System.out.println("Batch " + i + " length " + l);
> for (int j = 0; j < l; ++j) {
> System.out.println(bits.get(j) + " " + new
> String(chars.get(j)));
> }
> }
> }
> out.close();
> root.clear();
> root.close();
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)