abeel walga created ARROW-13481:
-----------------------------------
Summary: [JAVA][VECTOR]: stream write/read when writing batches of
different row count
Key: ARROW-13481
URL: https://issues.apache.org/jira/browse/ARROW-13481
Project: Apache Arrow
Issue Type: Bug
Components: Java
Reporter: abeel walga
I am assuming ArrowStreamWriter/ArrowStreamReader support writing batches with
different record count.
When reading from VectorSchemaRoot readBatch = reader.getVectorSchemaRoot(),
readBatch.getRowCount() returns the max of all batch row count.
This limit the ability to loop on the vectors contained in the batch.
A sample code:
{code:java}
package io.scribe.blckch.core.io;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Field;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
public class Arrow {
public static void main(String[] args) throws Exception {
RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
BitVector bitVector = new BitVector("boolean", allocator);
VarCharVector varCharVector = new VarCharVector("varchar", allocator);
bitVector.allocateNew();
varCharVector.allocateNew();
int l0 = Double.valueOf(Math.floor(Math.random() * 10)).intValue() + 1;
System.out.println("Write Batch " + l0 + " length " + l0);
for (int j = 0; j < l0; ++j) {
bitVector.setSafe(j, j);
varCharVector.setSafe(j, String.format("char-%s-%s", 0,
j).getBytes(StandardCharsets.UTF_8));
}
bitVector.setValueCount(l0);
varCharVector.setValueCount(l0);
List<Field> fields = Arrays.asList(bitVector.getField(),
varCharVector.getField());
List<FieldVector> vectors = Arrays.asList(bitVector, varCharVector);
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (ArrowStreamWriter writer = new ArrowStreamWriter(root,
/*DictionaryProvider=*/null, Channels.newChannel(out))) {
writer.start();
writer.writeBatch();
for (int i = 1; i < 5; ++i) {
final BitVector bits = (BitVector) root.getVector(0);
final VarCharVector chars = (VarCharVector) root.getVector(1);
bits.reset();
chars.reset();
int l = Double.valueOf(Math.floor(Math.random() *
10)).intValue() + 1;
System.out.println("Write Batch " + i + " length " + l);
for (int j = 0; j < l; ++j) {
bits.setSafe(j, j);
chars.setSafe(j, String.format("char-%s-%s", i,
j).getBytes(StandardCharsets.UTF_8));
}
bits.setValueCount(l);
chars.setValueCount(l);
writer.writeBatch();
}
writer.end();
}
try (ArrowStreamReader reader = new ArrowStreamReader(new
ByteArrayInputStream(out.toByteArray()), allocator)) {
for (int i = 0; i < 5; i++) {
// This will be loaded with new values on every call to
loadNextBatch
reader.loadNextBatch();
VectorSchemaRoot readBatch = reader.getVectorSchemaRoot();
final BitVector bits = (BitVector) readBatch.getVector(0);
final VarCharVector chars = (VarCharVector)
readBatch.getVector(1);
int l = readBatch.getRowCount();
// int l = bits.getValueCount();
System.out.println("Batch " + i + " length " + l);
for (int j = 0; j < l; ++j) {
System.out.println(bits.get(j) + " " + new
String(chars.get(j)));
}
}
}
out.close();
root.clear();
root.close();
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)