Rock Wang created ARROW-522:
-------------------------------
Summary: VectorLoader throws exception data schema contains list
of maps.
Key: ARROW-522
URL: https://issues.apache.org/jira/browse/ARROW-522
Project: Apache Arrow
Issue Type: Bug
Components: Java - Vectors
Affects Versions: 0.1.0
Reporter: Rock Wang
Priority: Critical
I encountered this exception
{code:java}
Exception in thread "main" java.lang.IllegalArgumentException: should have as
many children as in the schema: found 0 expected 2
at
com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:91)
at org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:95)
at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:69)
{code}
The test code is
{code:java}
public class ArrowTest {
public static class ByteArrayReadableSeekableByteChannel implements
SeekableByteChannel {
private byte[] byteArray;
private int position = 0;
public ByteArrayReadableSeekableByteChannel(byte[] byteArray) {
if (byteArray == null) {
throw new NullPointerException();
}
this.byteArray = byteArray;
}
@Override
public boolean isOpen() {
return byteArray != null;
}
@Override
public void close() throws IOException {
byteArray = null;
}
@Override
public int read(final ByteBuffer dst) throws IOException {
int remainingInBuf = byteArray.length - this.position;
int length = Math.min(dst.remaining(), remainingInBuf);
dst.put(this.byteArray, this.position, length);
this.position += length;
return length;
}
@Override
public long position() throws IOException {
return this.position;
}
@Override
public SeekableByteChannel position(final long newPosition) throws
IOException {
this.position = (int) newPosition;
return this;
}
@Override
public long size() throws IOException {
return this.byteArray.length;
}
@Override
public int write(final ByteBuffer src) throws IOException {
throw new UnsupportedOperationException("Read only");
}
@Override
public SeekableByteChannel truncate(final long size) throws IOException
{
throw new UnsupportedOperationException("Read only");
}
}
public static void main(String[] argv) throws Exception {
ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
// write
try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
BufferAllocator originalVectorAllocator = allocator
.newChildAllocator("child allocator", 1024,
Integer.MAX_VALUE);
MapVector parent = new MapVector("parent",
originalVectorAllocator, null)
) {
writeData(10, parent);
write(parent.getChild("root"),
Channels.newChannel(byteArrayOutputStream));
}
byte[] data = byteArrayOutputStream.toByteArray();
// read
try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
BufferAllocator readerAllocator =
allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
ArrowReader arrowReader = new ArrowReader(new
ByteArrayReadableSeekableByteChannel(data),
readerAllocator);
BufferAllocator vectorAllocator =
allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
MapVector parent = new MapVector("parent", vectorAllocator,
null)
) {
ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
NullableMapVector root = parent.addOrGet("root",
Types.MinorType.MAP, NullableMapVector.class);
VectorLoader vectorLoader = new VectorLoader(schema, root);
List<ArrowBlock> recordBatches = footer.getRecordBatches();
for (ArrowBlock rbBlock : recordBatches) {
try (ArrowRecordBatch recordBatch =
arrowReader.readRecordBatch(rbBlock)) {
vectorLoader.load(recordBatch);
}
readData(10, parent);
}
}
}
private static void writeData(int count, MapVector parent) throws Exception
{
BaseWriter.ComplexWriter writer = new ComplexWriterImpl("root", parent,
true);
BaseWriter.MapWriter rootWriter = writer.rootAsMap();
IntWriter intWriter = rootWriter.integer("id");
BaseWriter.ListWriter listWriter = rootWriter.list("list");
BaseWriter.MapWriter mapFromList = listWriter.map();
for (int i = 0; i < count; i++) {
rootWriter.start();
intWriter.setPosition(i);
intWriter.writeInt(i);
listWriter.setPosition(i);
listWriter.startList();
for (int j = 0; j < 2; j++) {
mapFromList.start();
mapFromList.integer("type").writeInt(j);
mapFromList.bigInt("id").writeBigInt(j * 1000L);
mapFromList.end();
}
listWriter.endList();
rootWriter.end();
}
writer.setValueCount(count);
}
private static void readData(int count, MapVector parent) {
BaseReader.MapReader rootReader = new
SingleMapReaderImpl(parent).reader("root");
FieldReader listReader = rootReader.reader("list");
for (int i = 0; i < count; i++) {
listReader.setPosition(i);
while (listReader.next()) {
System.out.println(i + " id " +
listReader.reader().reader("id").readLong());
System.out.println(i + " type " +
listReader.reader().reader("type").readInteger());
}
}
}
private static void write(FieldVector parent, WritableByteChannel out)
throws IOException {
VectorUnloader vectorUnloader = new VectorUnloader(parent);
Schema schema = vectorUnloader.getSchema();
try (
ArrowWriter arrowWriter = new ArrowWriter(out, schema);
ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
) {
arrowWriter.writeRecordBatch(recordBatch);
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)