Repository: arrow Updated Branches: refs/heads/master 990e2bde7 -> 292618327
ARROW-347: Add method to pass CallBack when creating a transfer pair supersedes and closes #182 Author: Julien Le Dem <[email protected]> Closes #425 from julienledem/arrow_347 and squashes the following commits: 3c47b82 [Julien Le Dem] ARROW-347: Add method to pass CallBack when creating a transfer pair Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/29261832 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/29261832 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/29261832 Branch: refs/heads/master Commit: 2926183276e69390bd84569c364b4e8fb316db53 Parents: 990e2bd Author: Julien Le Dem <[email protected]> Authored: Wed Mar 22 23:09:10 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Wed Mar 22 23:09:10 2017 -0400 ---------------------------------------------------------------------- .../src/main/codegen/templates/UnionVector.java | 13 ++- .../arrow/vector/BaseDataValueVector.java | 7 ++ .../org/apache/arrow/vector/ValueVector.java | 3 + .../org/apache/arrow/vector/ZeroVector.java | 6 ++ .../vector/complex/BaseRepeatedValueVector.java | 14 +++- .../apache/arrow/vector/complex/ListVector.java | 34 +++++--- .../apache/arrow/vector/complex/MapVector.java | 5 ++ .../arrow/vector/complex/NullableMapVector.java | 7 +- .../vector/complex/impl/PromotableWriter.java | 2 +- .../complex/writer/TestComplexWriter.java | 86 ++++++++++++++++++-- 10 files changed, 146 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/29261832/java/vector/src/main/codegen/templates/UnionVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index 076ed93..d17935b 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -236,12 +236,17 @@ public class UnionVector implements FieldVector { @Override public TransferPair getTransferPair(BufferAllocator allocator) { - return new TransferImpl(name, allocator); + return getTransferPair(name, allocator); } @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator) { - return new TransferImpl(ref, allocator); + return getTransferPair(ref, allocator, null); + } + + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { + return new org.apache.arrow.vector.complex.UnionVector.TransferImpl(ref, allocator, callBack); } @Override @@ -276,8 +281,8 @@ public class UnionVector implements FieldVector { private final TransferPair typeVectorTransferPair; private final UnionVector to; - public TransferImpl(String name, BufferAllocator allocator) { - to = new UnionVector(name, allocator, null); + public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) { + to = new UnionVector(name, allocator, callBack); internalMapVectorTransferPair = internalMap.makeTransferPair(to.internalMap); typeVectorTransferPair = typeVector.makeTransferPair(to.typeVector); } http://git-wip-us.apache.org/repos/asf/arrow/blob/29261832/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java index 7fe1615..6d7d3f0 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java @@ -24,6 +24,8 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.schema.ArrowFieldNode; import io.netty.buffer.ArrowBuf; +import org.apache.arrow.vector.util.CallBack; +import org.apache.arrow.vector.util.TransferPair; public abstract class BaseDataValueVector extends BaseValueVector implements BufferBacked { @@ -88,6 +90,11 @@ public abstract class BaseDataValueVector extends BaseValueVector implements Buf } @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { + return getTransferPair(ref, allocator); + } + + @Override public ArrowBuf[] getBuffers(boolean clear) { ArrowBuf[] out; if (getBufferSize() == 0) { http://git-wip-us.apache.org/repos/asf/arrow/blob/29261832/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java index ff7b94c..8e35398 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java @@ -24,6 +24,7 @@ import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.CallBack; import org.apache.arrow.vector.util.TransferPair; import io.netty.buffer.ArrowBuf; @@ -106,6 +107,8 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> { TransferPair getTransferPair(String ref, BufferAllocator allocator); + TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack); + /** * Returns a new {@link org.apache.arrow.vector.util.TransferPair transfer pair} that is used to transfer underlying * buffers into the target vector. http://git-wip-us.apache.org/repos/asf/arrow/blob/29261832/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java index e163b4f..73f858e 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java @@ -29,6 +29,7 @@ import org.apache.arrow.vector.schema.ArrowFieldNode; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType.Null; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.CallBack; import org.apache.arrow.vector.util.TransferPair; import io.netty.buffer.ArrowBuf; @@ -160,6 +161,11 @@ public class ZeroVector implements FieldVector { } @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { + return defaultPair; + } + + @Override public TransferPair makeTransferPair(ValueVector target) { return defaultPair; } http://git-wip-us.apache.org/repos/asf/arrow/blob/29261832/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java index eeb8f58..eda1f3b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java @@ -29,6 +29,7 @@ import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.ZeroVector; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.util.CallBack; import org.apache.arrow.vector.util.SchemaChangeRuntimeException; import com.google.common.base.Preconditions; @@ -44,15 +45,17 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements protected final UInt4Vector offsets; protected FieldVector vector; + protected final CallBack callBack; - protected BaseRepeatedValueVector(String name, BufferAllocator allocator) { - this(name, allocator, DEFAULT_DATA_VECTOR); + protected BaseRepeatedValueVector(String name, BufferAllocator allocator, CallBack callBack) { + this(name, allocator, DEFAULT_DATA_VECTOR, callBack); } - protected BaseRepeatedValueVector(String name, BufferAllocator allocator, FieldVector vector) { + protected BaseRepeatedValueVector(String name, BufferAllocator allocator, FieldVector vector, CallBack callBack) { super(name, allocator); this.offsets = new UInt4Vector(OFFSETS_VECTOR_NAME, allocator); this.vector = Preconditions.checkNotNull(vector, "data vector cannot be null"); + this.callBack = callBack; } @Override @@ -154,9 +157,12 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType, DictionaryEncoding dictionary) { boolean created = false; if (vector instanceof ZeroVector) { - vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, dictionary, null); + vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, dictionary, callBack); // returned vector must have the same field created = true; + if (callBack != null) { + callBack.doWork(); + } } if (vector.getField().getType().getTypeID() != minorType.getType().getTypeID()) { http://git-wip-us.apache.org/repos/asf/arrow/blob/29261832/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index a12440e..54b051b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -24,10 +24,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ObjectArrays; - -import io.netty.buffer.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.vector.AddOrGetResult; @@ -52,6 +48,11 @@ import org.apache.arrow.vector.util.CallBack; import org.apache.arrow.vector.util.JsonStringArrayList; import org.apache.arrow.vector.util.TransferPair; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ObjectArrays; + +import io.netty.buffer.ArrowBuf; + public class ListVector extends BaseRepeatedValueVector implements FieldVector { final UInt4Vector offsets; @@ -59,17 +60,15 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector { private final List<BufferBacked> innerVectors; private Mutator mutator = new Mutator(); private Accessor accessor = new Accessor(); - private UnionListWriter writer; private UnionListReader reader; private CallBack callBack; private final DictionaryEncoding dictionary; public ListVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack) { - super(name, allocator); + super(name, allocator, callBack); this.bits = new BitVector("$bits$", allocator); this.offsets = getOffsetVector(); this.innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(bits, offsets)); - this.writer = new UnionListWriter(this); this.reader = new UnionListReader(this); this.dictionary = dictionary; this.callBack = callBack; @@ -86,6 +85,8 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector { if (!addOrGetVector.isCreated()) { throw new IllegalArgumentException("Child vector already existed: " + addOrGetVector.getVector()); } + + addOrGetVector.getVector().initializeChildrenFromFields(field.getChildren()); } @Override @@ -111,7 +112,7 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector { } public UnionListWriter getWriter() { - return writer; + return new UnionListWriter(this); } @Override @@ -139,7 +140,12 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector { @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator) { - return new TransferImpl(ref, allocator); + return getTransferPair(ref, allocator, null); + } + + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { + return new TransferImpl(ref, allocator, callBack); } @Override @@ -152,8 +158,8 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector { ListVector to; TransferPair pairs[] = new TransferPair[3]; - public TransferImpl(String name, BufferAllocator allocator) { - this(new ListVector(name, allocator, dictionary, null)); + public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) { + this(new ListVector(name, allocator, dictionary, callBack)); } public TransferImpl(ListVector to) { @@ -172,6 +178,7 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector { for (TransferPair pair : pairs) { pair.transfer(); } + to.lastSet = lastSet; } @Override @@ -282,9 +289,12 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector { } public UnionVector promoteToUnion() { - UnionVector vector = new UnionVector(name, allocator, null); + UnionVector vector = new UnionVector(name, allocator, callBack); replaceDataVector(vector); reader = new UnionListReader(this); + if (callBack != null) { + callBack.doWork(); + } return vector; } http://git-wip-us.apache.org/repos/asf/arrow/blob/29261832/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java index 4d750ca..cb67537 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java @@ -115,6 +115,11 @@ public class MapVector extends AbstractMapVector { @Override public TransferPair getTransferPair(BufferAllocator allocator) { + return getTransferPair(name, allocator, null); + } + + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { return new MapTransferPair(this, new MapVector(name, allocator, callBack), false); } http://git-wip-us.apache.org/repos/asf/arrow/blob/29261832/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java index bb1fdf8..de1d185 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java @@ -86,7 +86,7 @@ public class NullableMapVector extends MapVector implements FieldVector { @Override public TransferPair getTransferPair(BufferAllocator allocator) { - return new NullableMapTransferPair(this, new NullableMapVector(name, allocator, dictionary, callBack), false); + return new NullableMapTransferPair(this, new NullableMapVector(name, allocator, dictionary, null), false); } @Override @@ -96,6 +96,11 @@ public class NullableMapVector extends MapVector implements FieldVector { @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator) { + return new NullableMapTransferPair(this, new NullableMapVector(ref, allocator, dictionary, null), false); + } + + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { return new NullableMapTransferPair(this, new NullableMapVector(ref, allocator, dictionary, callBack), false); } http://git-wip-us.apache.org/repos/asf/arrow/blob/29261832/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java index e33319a..1880c9b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java @@ -142,7 +142,7 @@ public class PromotableWriter extends AbstractPromotableFieldWriter { } protected FieldWriter getWriter() { - return getWriter(type); + return writer; } private FieldWriter promoteToUnion() { http://git-wip-us.apache.org/repos/asf/arrow/blob/29261832/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java index a8a2d51..99ba19b 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java @@ -29,8 +29,10 @@ import java.util.Set; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.SchemaChangeCallBack; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl; @@ -49,7 +51,11 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Int; import org.apache.arrow.vector.types.pojo.ArrowType.Union; import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.CallBack; +import org.apache.arrow.vector.util.JsonStringArrayList; +import org.apache.arrow.vector.util.JsonStringHashMap; import org.apache.arrow.vector.util.Text; +import org.apache.arrow.vector.util.TransferPair; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Assert; @@ -65,7 +71,38 @@ public class TestComplexWriter { @Test public void simpleNestedTypes() { - MapVector parent = new MapVector("parent", allocator, null); + MapVector parent = populateMapVector(null); + MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); + for (int i = 0; i < COUNT; i++) { + rootReader.setPosition(i); + Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue()); + Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue()); + } + + parent.close(); + } + + @Test + public void transferPairSchemaChange() { + SchemaChangeCallBack callBack1 = new SchemaChangeCallBack(); + SchemaChangeCallBack callBack2 = new SchemaChangeCallBack(); + MapVector parent = populateMapVector(callBack1); + + TransferPair tp = parent.getTransferPair("newVector", allocator, callBack2); + + ComplexWriter writer = new ComplexWriterImpl("newWriter", parent); + MapWriter rootWriter = writer.rootAsMap(); + IntWriter intWriter = rootWriter.integer("newInt"); + intWriter.writeInt(1); + writer.setValueCount(1); + + assertTrue(callBack1.getSchemaChangedAndReset()); + // The second vector should not have registered a schema change + assertFalse(callBack1.getSchemaChangedAndReset()); + } + + private MapVector populateMapVector(CallBack callBack) { + MapVector parent = new MapVector("parent", allocator, callBack); ComplexWriter writer = new ComplexWriterImpl("root", parent); MapWriter rootWriter = writer.rootAsMap(); IntWriter intWriter = rootWriter.integer("int"); @@ -77,14 +114,7 @@ public class TestComplexWriter { rootWriter.end(); } writer.setValueCount(COUNT); - MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); - for (int i = 0; i < COUNT; i++) { - rootReader.setPosition(i); - Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue()); - Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue()); - } - - parent.close(); + return parent; } @Test @@ -646,4 +676,42 @@ public class TestComplexWriter { long nanoLong = nanoReader.readLong(); Assert.assertEquals(expectedNanos, nanoLong); } + + @Test + public void complexCopierWithList() { + MapVector parent = new MapVector("parent", allocator, null); + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + ListWriter listWriter = rootWriter.list("list"); + MapWriter innerMapWriter = listWriter.map(); + IntWriter outerIntWriter = listWriter.integer(); + rootWriter.start(); + listWriter.startList(); + outerIntWriter.writeInt(1); + outerIntWriter.writeInt(2); + innerMapWriter.start(); + IntWriter intWriter = innerMapWriter.integer("a"); + intWriter.writeInt(1); + innerMapWriter.end(); + innerMapWriter.start(); + intWriter = innerMapWriter.integer("a"); + intWriter.writeInt(2); + innerMapWriter.end(); + listWriter.endList(); + rootWriter.end(); + writer.setValueCount(1); + + NullableMapVector mapVector = (NullableMapVector) parent.getChild("root"); + TransferPair tp = mapVector.getTransferPair(allocator); + tp.splitAndTransfer(0, 1); + MapVector toMapVector = (MapVector) tp.getTo(); + JsonStringHashMap<?,?> toMapValue = (JsonStringHashMap<?,?>) toMapVector.getAccessor().getObject(0); + JsonStringArrayList<?> object = (JsonStringArrayList<?>) toMapValue.get("list"); + assertEquals(1, object.get(0)); + assertEquals(2, object.get(1)); + JsonStringHashMap<?,?> innerMap = (JsonStringHashMap<?,?>) object.get(2); + assertEquals(1, innerMap.get("a")); + innerMap = (JsonStringHashMap<?,?>) object.get(3); + assertEquals(2, innerMap.get("a")); + } } \ No newline at end of file
