Chaokun Yang created ARROW-5230: ----------------------------------- Summary: [Java] Read Struct Vector from ArrowStreamReader bugs Key: ARROW-5230 URL: https://issues.apache.org/jira/browse/ARROW-5230 Project: Apache Arrow Issue Type: Bug Components: Java Environment: Mac OS 10.13.6, Arrow 0.13.0, JDK8 Reporter: Chaokun Yang
After writing struct vector using ArrowStreamWriter to a file, read it back using ArrowStreamReader throws exception: {quote}Exception in thread "main" java.lang.IllegalArgumentException: not all nodes and buffers were consumed. nodes: [ArrowFieldNode [length=100, nullCount=0], ArrowFieldNode [length=100, nullCount=0]] buffers: [ArrowBuf[26], udle: [11 16..29], ArrowBuf[27], udle: [11 32..432], ArrowBuf[28], udle: [11 432..445], ArrowBuf[29], udle: [11 448..848]] at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:64) at org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:219) at org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:121) {quote} Here's the code to reproduce this exception: {code:java} import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; import java.io.ByteArrayInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadLocalRandom; public class StructTest { public static void writeBatch(OutputStream os) throws IOException { List<Field> fields = Collections.singletonList(new Field("f-Struct(Int, Int)", FieldType.nullable(ArrowType.Struct.INSTANCE), null)); Schema schema = new Schema(fields); VectorSchemaRoot root = VectorSchemaRoot.create(schema, new RootAllocator(Integer.MAX_VALUE)); DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider(); ArrowStreamWriter writer = new ArrowStreamWriter(root, provider, os); writer.start(); for (int i = 0; i < 2; i++) { root.setRowCount(100); List<FieldVector> vectors = root.getFieldVectors(); StructVector vector = (StructVector) vectors.get(0); fillVector(vector, 100); for (int j = 0; j < 100; j++) { if (!vector.isNull(j)) { System.out.println(vector.getObject(j)); } } writer.writeBatch(); } writer.end(); writer.close(); } public static void fillVector(StructVector vector, int batchSize) { vector.setInitialCapacity(batchSize); vector.allocateNew(); vector.addOrGet("s1", FieldType.nullable(new ArrowType.Int(32, true)), IntVector.class); vector.addOrGet("s2", FieldType.nullable(new ArrowType.Int(32, true)), IntVector.class); fillVector((IntVector)(vector.getChild("s1")), batchSize); fillVector((IntVector) (vector.getChild("s2")), batchSize); for (int i = 0; i < batchSize; i++) { vector.setIndexDefined(i); } vector.setValueCount(batchSize); } public static void fillVector(IntVector vector, int batchSize) { vector.setInitialCapacity(batchSize); vector.allocateNew(); for (int i = 0; i < batchSize; i++) { vector.setSafe(i, 1, ThreadLocalRandom.current().nextInt()); } vector.setValueCount(batchSize); } public static void main(String[] args) throws IOException { try (FileOutputStream fos = new FileOutputStream("result/struct.arrow")) { writeBatch(fos); System.out.println("write succeed"); fos.flush(); } RootAllocator allocator = new RootAllocator(1000000000); ByteArrayInputStream in = new ByteArrayInputStream(Files.readAllBytes(Paths.get("result/struct.arrow"))); ArrowStreamReader reader = new ArrowStreamReader(in, allocator); reader.loadNextBatch(); } } {code} If I make struct record batches in python, java can read it back: Write data: {code:java} def make_struct(path, batch_size=200, num_batch=2): obj = get_struct_obj(batch_size) batch = pa.RecordBatch.from_arrays([obj], ['fo']) writer = pa.RecordBatchStreamWriter(path, batch.schema) for _ in range(num_batch): writer.write_batch(batch) writer.close() make_struct("struct.arrow") {code} Read back: {code:java} RootAllocator allocator = new RootAllocator(1000000000); ByteArrayInputStream in = new ByteArrayInputStream(Files.readAllBytes(Paths.get("struct.arrow"))); ArrowStreamReader reader = new ArrowStreamReader(in, allocator); reader.loadNextBatch(); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)