abeel walga created ARROW-13481:
-----------------------------------

             Summary: [JAVA][VECTOR]: stream write/read when writing batches of 
different row count
                 Key: ARROW-13481
                 URL: https://issues.apache.org/jira/browse/ARROW-13481
             Project: Apache Arrow
          Issue Type: Bug
          Components: Java
            Reporter: abeel walga


I am assuming ArrowStreamWriter/ArrowStreamReader support writing batches with 
different record count. 
When reading from VectorSchemaRoot readBatch = reader.getVectorSchemaRoot(), 
readBatch.getRowCount() returns the max of all batch row count.
This limit the ability to loop on the vectors contained in the batch.

A sample code:

{code:java}
package io.scribe.blckch.core.io;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Field;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;

public class Arrow {

    public static void main(String[] args) throws Exception {
        RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);

        BitVector bitVector = new BitVector("boolean", allocator);
        VarCharVector varCharVector = new VarCharVector("varchar", allocator);
        bitVector.allocateNew();
        varCharVector.allocateNew();

        int l0 = Double.valueOf(Math.floor(Math.random() * 10)).intValue() + 1;
        System.out.println("Write Batch " + l0 + " length " + l0);

        for (int j = 0; j < l0; ++j) {
            bitVector.setSafe(j, j);
            varCharVector.setSafe(j, String.format("char-%s-%s", 0, 
j).getBytes(StandardCharsets.UTF_8));
        }
        bitVector.setValueCount(l0);
        varCharVector.setValueCount(l0);

        List<Field> fields = Arrays.asList(bitVector.getField(), 
varCharVector.getField());
        List<FieldVector> vectors = Arrays.asList(bitVector, varCharVector);

        VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);

        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try (ArrowStreamWriter writer = new ArrowStreamWriter(root, 
/*DictionaryProvider=*/null, Channels.newChannel(out))) {
            writer.start();
            writer.writeBatch();
            for (int i = 1; i < 5; ++i) {
                final BitVector bits = (BitVector) root.getVector(0);
                final VarCharVector chars = (VarCharVector) root.getVector(1);
                bits.reset();
                chars.reset();

                int l = Double.valueOf(Math.floor(Math.random() * 
10)).intValue() + 1;
                System.out.println("Write Batch " + i + " length " + l);

                for (int j = 0; j < l; ++j) {
                    bits.setSafe(j, j);
                    chars.setSafe(j, String.format("char-%s-%s", i, 
j).getBytes(StandardCharsets.UTF_8));
                }
                bits.setValueCount(l);
                chars.setValueCount(l);

                writer.writeBatch();
            }
            writer.end();
        }

        try (ArrowStreamReader reader = new ArrowStreamReader(new 
ByteArrayInputStream(out.toByteArray()), allocator)) {

            for (int i = 0; i < 5; i++) {
                // This will be loaded with new values on every call to 
loadNextBatch
                reader.loadNextBatch();
                VectorSchemaRoot readBatch = reader.getVectorSchemaRoot();

                final BitVector bits = (BitVector) readBatch.getVector(0);
                final VarCharVector chars = (VarCharVector) 
readBatch.getVector(1);
                int l = readBatch.getRowCount();
                // int l = bits.getValueCount();
                System.out.println("Batch " + i + " length " + l);

                for (int j = 0; j < l; ++j) {
                    System.out.println(bits.get(j) + " " + new 
String(chars.get(j)));
                }
            }
        }

        out.close();
        root.clear();
        root.close();
    }
}

{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to