Chaokun Yang created ARROW-5230:
-----------------------------------

             Summary: [Java] Read Struct Vector from ArrowStreamReader bugs
                 Key: ARROW-5230
                 URL: https://issues.apache.org/jira/browse/ARROW-5230
             Project: Apache Arrow
          Issue Type: Bug
          Components: Java
         Environment: Mac OS 10.13.6, Arrow 0.13.0, JDK8
            Reporter: Chaokun Yang


After writing struct vector using ArrowStreamWriter to a file, read it back 
using ArrowStreamReader throws exception:
{quote}Exception in thread "main" java.lang.IllegalArgumentException: not all 
nodes and buffers were consumed. nodes: [ArrowFieldNode [length=100, 
nullCount=0], ArrowFieldNode [length=100, nullCount=0]] buffers: [ArrowBuf[26], 
udle: [11 16..29], ArrowBuf[27], udle: [11 32..432], ArrowBuf[28], udle: [11 
432..445], ArrowBuf[29], udle: [11 448..848]]
 at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:64)
 at 
org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:219)
 at 
org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:121)
{quote}
Here's the code to reproduce this exception:
{code:java}

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

public class StructTest {

    public static void writeBatch(OutputStream os) throws IOException {
        List<Field> fields = Collections.singletonList(new Field("f-Struct(Int, 
Int)", FieldType.nullable(ArrowType.Struct.INSTANCE), null));
        Schema schema = new Schema(fields);
        VectorSchemaRoot root = VectorSchemaRoot.create(schema, new 
RootAllocator(Integer.MAX_VALUE));
        DictionaryProvider.MapDictionaryProvider provider = new 
DictionaryProvider.MapDictionaryProvider();
        ArrowStreamWriter writer = new ArrowStreamWriter(root, provider, os);
        writer.start();
        for (int i = 0; i < 2; i++) {
            root.setRowCount(100);
            List<FieldVector> vectors = root.getFieldVectors();
            StructVector vector = (StructVector) vectors.get(0);
            fillVector(vector, 100);
            for (int j = 0; j < 100; j++) {
                if (!vector.isNull(j)) {
                    System.out.println(vector.getObject(j));
                }
            }
            writer.writeBatch();
        }
        writer.end();
        writer.close();
    }

    public static void fillVector(StructVector vector, int batchSize) {
        vector.setInitialCapacity(batchSize);
        vector.allocateNew();
        vector.addOrGet("s1", FieldType.nullable(new ArrowType.Int(32, true)), 
IntVector.class);
        vector.addOrGet("s2", FieldType.nullable(new ArrowType.Int(32, true)), 
IntVector.class);
        fillVector((IntVector)(vector.getChild("s1")), batchSize);
        fillVector((IntVector) (vector.getChild("s2")), batchSize);
        for (int i = 0; i < batchSize; i++) {
            vector.setIndexDefined(i);
        }
        vector.setValueCount(batchSize);
    }

    public static void fillVector(IntVector vector, int batchSize) {
        vector.setInitialCapacity(batchSize);
        vector.allocateNew();
        for (int i = 0; i < batchSize; i++) {
            vector.setSafe(i, 1, ThreadLocalRandom.current().nextInt());
        }
        vector.setValueCount(batchSize);
    }

    public static void main(String[] args) throws IOException {
        try (FileOutputStream fos = new 
FileOutputStream("result/struct.arrow")) {
            writeBatch(fos);
            System.out.println("write succeed");
            fos.flush();
        }

        RootAllocator allocator = new RootAllocator(1000000000);

        ByteArrayInputStream in = new 
ByteArrayInputStream(Files.readAllBytes(Paths.get("result/struct.arrow")));

        ArrowStreamReader reader = new ArrowStreamReader(in, allocator);

        reader.loadNextBatch();

    }
}
{code}
 If I make struct record batches in python, java can  read it back:

Write data:

 
{code:java}
def make_struct(path, batch_size=200, num_batch=2):
    obj = get_struct_obj(batch_size)
    batch = pa.RecordBatch.from_arrays([obj], ['fo'])
    writer = pa.RecordBatchStreamWriter(path, batch.schema)
    for _ in range(num_batch):
        writer.write_batch(batch)
    writer.close()

make_struct("struct.arrow")

{code}
Read back:
{code:java}
RootAllocator allocator = new RootAllocator(1000000000);
ByteArrayInputStream in = new 
ByteArrayInputStream(Files.readAllBytes(Paths.get("struct.arrow")));
ArrowStreamReader reader = new ArrowStreamReader(in, allocator);
reader.loadNextBatch();
{code}
 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to