This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 0c96be3590 GH-41730: [Java] Adding variadicBufferCounts to RecordBatch
(#41732)
0c96be3590 is described below
commit 0c96be359084e727b82482d69f58aa79199d703d
Author: Vibhatha Lakmal Abeykoon <[email protected]>
AuthorDate: Fri May 24 09:32:32 2024 +0530
GH-41730: [Java] Adding variadicBufferCounts to RecordBatch (#41732)
### Rationale for this change
This PR adds the `variadicBufferCounts` attribute to `ArrowRecordBatch` in
Java module. Furthermore, it also updates the `TypeLayout` functions
`getTypeBufferCount` and `getTypeLayout` functions along with the corresponding
test cases. Previously these changes were listed as issues
https://github.com/apache/arrow/issues/40934,
https://github.com/apache/arrow/issues/40935 and
https://github.com/apache/arrow/issues/40931. These two tickets will also be
closed by this PR.
### What changes are included in this PR?
The introduced two functions to `TypeLayout` is deprecating the old API and
adds a new API. In this PR we are updating a few modules to use the new API.
Corresponding tests for the changed functions have also been added.
This also updates the usage of `ArrowRecordBatch` across other modules and
`TypeLayout` usage across a few modules. Some modules were excluded as
mentioned in the issues non-goals section to be completed in a follow up effort
as the scope and required tasks remain at large. These modules will still use
the deprecated API for TypeLayouts, but documented in the code for updating to
the new API in a follow up effort.
### Closing Subtasks
- [X] https://github.com/apache/arrow/issues/40934
- [X] https://github.com/apache/arrow/issues/40935
- [X] https://github.com/apache/arrow/issues/40931
### Are these changes tested?
The changes are tested using existing tests and new tests
### Are there any user-facing changes?
Yes
**This PR includes breaking changes to public APIs.**
* GitHub Issue: #41730
Lead-authored-by: Vibhatha Lakmal Abeykoon <[email protected]>
Co-authored-by: Vibhatha Abeykoon <[email protected]>
Signed-off-by: David Li <[email protected]>
---
.../org/apache/arrow/c/StructVectorLoader.java | 17 ++-
.../org/apache/arrow/c/StructVectorUnloader.java | 22 +++-
.../arrow/vector/BaseVariableWidthViewVector.java | 19 +++-
.../java/org/apache/arrow/vector/TypeLayout.java | 22 ++--
.../java/org/apache/arrow/vector/VectorLoader.java | 19 +++-
.../org/apache/arrow/vector/VectorUnloader.java | 23 +++-
.../org/apache/arrow/vector/ViewVarCharVector.java | 2 +-
.../apache/arrow/vector/ipc/JsonFileReader.java | 1 +
.../apache/arrow/vector/ipc/JsonFileWriter.java | 1 +
.../arrow/vector/ipc/message/ArrowRecordBatch.java | 89 ++++++++++++++-
.../validate/ValidateVectorBufferVisitor.java | 1 +
.../org/apache/arrow/vector/TestTypeLayout.java | 122 +++++++++++++++++----
.../apache/arrow/vector/TestVarCharViewVector.java | 66 +++++++++++
13 files changed, 343 insertions(+), 61 deletions(-)
diff --git a/java/c/src/main/java/org/apache/arrow/c/StructVectorLoader.java
b/java/c/src/main/java/org/apache/arrow/c/StructVectorLoader.java
index d9afd0189d..27acf84d30 100644
--- a/java/c/src/main/java/org/apache/arrow/c/StructVectorLoader.java
+++ b/java/c/src/main/java/org/apache/arrow/c/StructVectorLoader.java
@@ -90,8 +90,12 @@ public class StructVectorLoader {
.fromCompressionType(recordBatch.getBodyCompression().getCodec());
decompressionNeeded = codecType !=
CompressionUtil.CodecType.NO_COMPRESSION;
CompressionCodec codec = decompressionNeeded ?
factory.createCodec(codecType) : NoCompressionCodec.INSTANCE;
+ Iterator<Long> variadicBufferCounts = null;
+ if (recordBatch.getVariadicBufferCounts() != null &&
!recordBatch.getVariadicBufferCounts().isEmpty()) {
+ variadicBufferCounts = recordBatch.getVariadicBufferCounts().iterator();
+ }
for (FieldVector fieldVector : result.getChildrenFromFields()) {
- loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec);
+ loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec,
variadicBufferCounts);
}
result.loadFieldBuffers(new ArrowFieldNode(recordBatch.getLength(), 0),
Collections.singletonList(null));
if (nodes.hasNext() || buffers.hasNext()) {
@@ -102,10 +106,15 @@ public class StructVectorLoader {
}
private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf>
buffers, Iterator<ArrowFieldNode> nodes,
- CompressionCodec codec) {
+ CompressionCodec codec, Iterator<Long> variadicBufferCounts) {
checkArgument(nodes.hasNext(), "no more field nodes for field %s and
vector %s", field, vector);
ArrowFieldNode fieldNode = nodes.next();
- int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType());
+ // variadicBufferLayoutCount will be 0 for vectors of type except
BaseVariableWidthViewVector
+ long variadicBufferLayoutCount = 0;
+ if (variadicBufferCounts != null) {
+ variadicBufferLayoutCount = variadicBufferCounts.next();
+ }
+ int bufferLayoutCount = (int) (variadicBufferLayoutCount +
TypeLayout.getTypeBufferCount(field.getType()));
List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
for (int j = 0; j < bufferLayoutCount; j++) {
ArrowBuf nextBuf = buffers.next();
@@ -138,7 +147,7 @@ public class StructVectorLoader {
for (int i = 0; i < childrenFromFields.size(); i++) {
Field child = children.get(i);
FieldVector fieldVector = childrenFromFields.get(i);
- loadBuffers(fieldVector, child, buffers, nodes, codec);
+ loadBuffers(fieldVector, child, buffers, nodes, codec,
variadicBufferCounts);
}
}
}
diff --git a/java/c/src/main/java/org/apache/arrow/c/StructVectorUnloader.java
b/java/c/src/main/java/org/apache/arrow/c/StructVectorUnloader.java
index aa6d9b4d0f..8d015157eb 100644
--- a/java/c/src/main/java/org/apache/arrow/c/StructVectorUnloader.java
+++ b/java/c/src/main/java/org/apache/arrow/c/StructVectorUnloader.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.vector.BaseVariableWidthViewVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TypeLayout;
import org.apache.arrow.vector.complex.StructVector;
@@ -87,17 +88,28 @@ public class StructVectorUnloader {
public ArrowRecordBatch getRecordBatch() {
List<ArrowFieldNode> nodes = new ArrayList<>();
List<ArrowBuf> buffers = new ArrayList<>();
+ List<Long> variadicBufferCounts = new ArrayList<>();
for (FieldVector vector : root.getChildrenFromFields()) {
- appendNodes(vector, nodes, buffers);
+ appendNodes(vector, nodes, buffers, variadicBufferCounts);
}
return new ArrowRecordBatch(root.getValueCount(), nodes, buffers,
CompressionUtil.createBodyCompression(codec),
- alignBuffers);
+ variadicBufferCounts, alignBuffers);
}
- private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes,
List<ArrowBuf> buffers) {
+ private long getVariadicBufferCount(FieldVector vector) {
+ if (vector instanceof BaseVariableWidthViewVector) {
+ return ((BaseVariableWidthViewVector) vector).getDataBuffers().size();
+ }
+ return 0L;
+ }
+
+ private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes,
List<ArrowBuf> buffers,
+ List<Long> variadicBufferCounts) {
nodes.add(new ArrowFieldNode(vector.getValueCount(), includeNullCount ?
vector.getNullCount() : -1));
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
- int expectedBufferCount =
TypeLayout.getTypeBufferCount(vector.getField().getType());
+ long variadicBufferCount = getVariadicBufferCount(vector);
+ int expectedBufferCount = (int)
(TypeLayout.getTypeBufferCount(vector.getField().getType()) +
variadicBufferCount);
+ variadicBufferCounts.add(variadicBufferCount);
if (fieldBuffers.size() != expectedBufferCount) {
throw new IllegalArgumentException(String.format("wrong number of
buffers for field %s in vector %s. found: %s",
vector.getField(), vector.getClass().getSimpleName(), fieldBuffers));
@@ -106,7 +118,7 @@ public class StructVectorUnloader {
buffers.add(codec.compress(vector.getAllocator(), buf));
}
for (FieldVector child : vector.getChildrenFromFields()) {
- appendNodes(child, nodes, buffers);
+ appendNodes(child, nodes, buffers, variadicBufferCounts);
}
}
}
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthViewVector.java
b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthViewVector.java
index 620b998994..b3e86fab05 100644
---
a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthViewVector.java
+++
b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthViewVector.java
@@ -161,7 +161,7 @@ public abstract class BaseVariableWidthViewVector extends
BaseValueVector implem
/**
* Get the buffers that store the data for views in the vector.
*
- * @return buffer
+ * @return list of ArrowBuf
*/
public List<ArrowBuf> getDataBuffers() {
return dataBuffers;
@@ -368,8 +368,21 @@ public abstract class BaseVariableWidthViewVector extends
BaseValueVector implem
*/
@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf>
ownBuffers) {
- // TODO: https://github.com/apache/arrow/issues/40931
- throw new UnsupportedOperationException("loadFieldBuffers is not supported
for BaseVariableWidthViewVector");
+ ArrowBuf bitBuf = ownBuffers.get(0);
+ ArrowBuf viewBuf = ownBuffers.get(1);
+ List<ArrowBuf> dataBufs = ownBuffers.subList(2, ownBuffers.size());
+
+ this.clear();
+
+ this.viewBuffer = viewBuf.getReferenceManager().retain(viewBuf, allocator);
+ this.validityBuffer = BitVectorHelper.loadValidityBuffer(fieldNode,
bitBuf, allocator);
+
+ for (ArrowBuf dataBuf : dataBufs) {
+ this.dataBuffers.add(dataBuf.getReferenceManager().retain(dataBuf,
allocator));
+ }
+
+ lastSet = fieldNode.getLength() - 1;
+ valueCount = fieldNode.getLength();
}
/**
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java
b/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java
index ea92efdc55..0d01d77632 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/TypeLayout.java
@@ -28,6 +28,7 @@ import org.apache.arrow.vector.BufferLayout.BufferType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeVisitor;
import org.apache.arrow.vector.types.pojo.ArrowType.Binary;
+import org.apache.arrow.vector.types.pojo.ArrowType.BinaryView;
import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
import org.apache.arrow.vector.types.pojo.ArrowType.Date;
import org.apache.arrow.vector.types.pojo.ArrowType.Decimal;
@@ -186,8 +187,7 @@ public class TypeLayout {
@Override
public TypeLayout visit(ArrowType.BinaryView type) {
- // TODO: https://github.com/apache/arrow/issues/40934
- throw new UnsupportedOperationException("BinaryView not supported");
+ return newVariableWidthViewTypeLayout();
}
@Override
@@ -197,8 +197,7 @@ public class TypeLayout {
@Override
public TypeLayout visit(Utf8View type) {
- // TODO: https://github.com/apache/arrow/issues/40934
- throw new UnsupportedOperationException("Utf8View not supported");
+ return newVariableWidthViewTypeLayout();
}
@Override
@@ -216,7 +215,12 @@ public class TypeLayout {
BufferLayout.byteVector());
}
+ private TypeLayout newVariableWidthViewTypeLayout() {
+ return newPrimitiveTypeLayout(BufferLayout.validityVector(),
BufferLayout.byteVector());
+ }
+
private TypeLayout newLargeVariableWidthTypeLayout() {
+ // NOTE: only considers the non variadic buffers
return newPrimitiveTypeLayout(BufferLayout.validityVector(),
BufferLayout.largeOffsetBuffer(),
BufferLayout.byteVector());
}
@@ -377,9 +381,9 @@ public class TypeLayout {
}
@Override
- public Integer visit(ArrowType.BinaryView type) {
- // TODO: https://github.com/apache/arrow/issues/40935
- return VARIABLE_WIDTH_BUFFER_COUNT;
+ public Integer visit(BinaryView type) {
+ // NOTE: only consider the validity and view buffers
+ return 2;
}
@Override
@@ -389,8 +393,8 @@ public class TypeLayout {
@Override
public Integer visit(Utf8View type) {
- // TODO: https://github.com/apache/arrow/issues/40935
- return VARIABLE_WIDTH_BUFFER_COUNT;
+ // NOTE: only consider the validity and view buffers
+ return 2;
}
@Override
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
index 510cef24c7..9590e70f46 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -80,8 +80,13 @@ public class VectorLoader {
CompressionUtil.CodecType.fromCompressionType(recordBatch.getBodyCompression().getCodec());
decompressionNeeded = codecType !=
CompressionUtil.CodecType.NO_COMPRESSION;
CompressionCodec codec = decompressionNeeded ?
factory.createCodec(codecType) : NoCompressionCodec.INSTANCE;
+ Iterator<Long> variadicBufferCounts = null;
+ if (recordBatch.getVariadicBufferCounts() != null &&
!recordBatch.getVariadicBufferCounts().isEmpty()) {
+ variadicBufferCounts = recordBatch.getVariadicBufferCounts().iterator();
+ }
+
for (FieldVector fieldVector : root.getFieldVectors()) {
- loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec);
+ loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec,
variadicBufferCounts);
}
root.setRowCount(recordBatch.getLength());
if (nodes.hasNext() || buffers.hasNext()) {
@@ -95,10 +100,16 @@ public class VectorLoader {
Field field,
Iterator<ArrowBuf> buffers,
Iterator<ArrowFieldNode> nodes,
- CompressionCodec codec) {
+ CompressionCodec codec,
+ Iterator<Long> variadicBufferCounts) {
checkArgument(nodes.hasNext(), "no more field nodes for field %s and
vector %s", field, vector);
ArrowFieldNode fieldNode = nodes.next();
- int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType());
+ // variadicBufferLayoutCount will be 0 for vectors of type except
BaseVariableWidthViewVector
+ long variadicBufferLayoutCount = 0;
+ if (variadicBufferCounts != null) {
+ variadicBufferLayoutCount = variadicBufferCounts.next();
+ }
+ int bufferLayoutCount = (int) (variadicBufferLayoutCount +
TypeLayout.getTypeBufferCount(field.getType()));
List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
for (int j = 0; j < bufferLayoutCount; j++) {
ArrowBuf nextBuf = buffers.next();
@@ -130,7 +141,7 @@ public class VectorLoader {
for (int i = 0; i < childrenFromFields.size(); i++) {
Field child = children.get(i);
FieldVector fieldVector = childrenFromFields.get(i);
- loadBuffers(fieldVector, child, buffers, nodes, codec);
+ loadBuffers(fieldVector, child, buffers, nodes, codec,
variadicBufferCounts);
}
}
}
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
index 1d44e37ac7..8528099b6d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -80,19 +80,30 @@ public class VectorUnloader {
public ArrowRecordBatch getRecordBatch() {
List<ArrowFieldNode> nodes = new ArrayList<>();
List<ArrowBuf> buffers = new ArrayList<>();
+ List<Long> variadicBufferCounts = new ArrayList<>();
for (FieldVector vector : root.getFieldVectors()) {
- appendNodes(vector, nodes, buffers);
+ appendNodes(vector, nodes, buffers, variadicBufferCounts);
}
// Do NOT retain buffers in ArrowRecordBatch constructor since we have
already retained them.
return new ArrowRecordBatch(
- root.getRowCount(), nodes, buffers,
CompressionUtil.createBodyCompression(codec), alignBuffers,
- /*retainBuffers*/ false);
+ root.getRowCount(), nodes, buffers,
CompressionUtil.createBodyCompression(codec),
+ variadicBufferCounts, alignBuffers, /*retainBuffers*/ false);
}
- private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes,
List<ArrowBuf> buffers) {
+ private long getVariadicBufferCount(FieldVector vector) {
+ if (vector instanceof BaseVariableWidthViewVector) {
+ return ((BaseVariableWidthViewVector) vector).getDataBuffers().size();
+ }
+ return 0L;
+ }
+
+ private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes,
List<ArrowBuf> buffers,
+ List<Long> variadicBufferCounts) {
nodes.add(new ArrowFieldNode(vector.getValueCount(), includeNullCount ?
vector.getNullCount() : -1));
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
- int expectedBufferCount =
TypeLayout.getTypeBufferCount(vector.getField().getType());
+ long variadicBufferCount = getVariadicBufferCount(vector);
+ int expectedBufferCount = (int)
(TypeLayout.getTypeBufferCount(vector.getField().getType()) +
variadicBufferCount);
+ variadicBufferCounts.add(variadicBufferCount);
if (fieldBuffers.size() != expectedBufferCount) {
throw new IllegalArgumentException(String.format(
"wrong number of buffers for field %s in vector %s. found: %s",
@@ -107,7 +118,7 @@ public class VectorUnloader {
buffers.add(codec.compress(vector.getAllocator(), buf));
}
for (FieldVector child : vector.getChildrenFromFields()) {
- appendNodes(child, nodes, buffers);
+ appendNodes(child, nodes, buffers, variadicBufferCounts);
}
}
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/ViewVarCharVector.java
b/java/vector/src/main/java/org/apache/arrow/vector/ViewVarCharVector.java
index 010df02e0b..d35bf9e4b5 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ViewVarCharVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ViewVarCharVector.java
@@ -48,7 +48,7 @@ public final class ViewVarCharVector extends
BaseVariableWidthViewVector {
* @param allocator allocator for memory management.
*/
public ViewVarCharVector(String name, BufferAllocator allocator) {
- this(name, FieldType.nullable(MinorType.VARCHAR.getType()), allocator);
+ this(name, FieldType.nullable(MinorType.VIEWVARCHAR.getType()), allocator);
}
/**
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
index e927acd481..5c9efc445e 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
@@ -715,6 +715,7 @@ public class JsonFileReader implements AutoCloseable,
DictionaryProvider {
private void readFromJsonIntoVector(Field field, FieldVector vector) throws
JsonParseException, IOException {
ArrowType type = field.getType();
+ // TODO: https://github.com/apache/arrow/issues/41733
TypeLayout typeLayout = TypeLayout.getTypeLayout(type);
List<BufferType> vectorTypes = typeLayout.getBufferTypes();
ArrowBuf[] vectorBuffers = new ArrowBuf[vectorTypes.size()];
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java
index f5e267e812..670881b238 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java
@@ -208,6 +208,7 @@ public class JsonFileWriter implements AutoCloseable {
}
private void writeFromVectorIntoJson(Field field, FieldVector vector) throws
IOException {
+ // TODO: https://github.com/apache/arrow/issues/41733
List<BufferType> vectorTypes =
TypeLayout.getTypeLayout(field.getType()).getBufferTypes();
List<ArrowBuf> vectorBuffers = vector.getFieldBuffers();
if (vectorTypes.size() != vectorBuffers.size()) {
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
index f81d049a92..b910cfc6ec 100644
---
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
+++
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
@@ -56,17 +56,19 @@ public class ArrowRecordBatch implements ArrowMessage {
private final List<ArrowBuffer> buffersLayout;
+ private final List<Long> variadicBufferCounts;
+
private boolean closed = false;
public ArrowRecordBatch(
int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
- this(length, nodes, buffers, NoCompressionCodec.DEFAULT_BODY_COMPRESSION,
true);
+ this(length, nodes, buffers, NoCompressionCodec.DEFAULT_BODY_COMPRESSION,
null, true);
}
public ArrowRecordBatch(
int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers,
ArrowBodyCompression bodyCompression) {
- this(length, nodes, buffers, bodyCompression, true);
+ this(length, nodes, buffers, bodyCompression, null, true);
}
/**
@@ -81,7 +83,7 @@ public class ArrowRecordBatch implements ArrowMessage {
public ArrowRecordBatch(
int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers,
ArrowBodyCompression bodyCompression, boolean alignBuffers) {
- this(length, nodes, buffers, bodyCompression, alignBuffers,
/*retainBuffers*/ true);
+ this(length, nodes, buffers, bodyCompression, null, alignBuffers,
/*retainBuffers*/ true);
}
/**
@@ -98,12 +100,48 @@ public class ArrowRecordBatch implements ArrowMessage {
public ArrowRecordBatch(
int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers,
ArrowBodyCompression bodyCompression, boolean alignBuffers, boolean
retainBuffers) {
+ this(length, nodes, buffers, bodyCompression, null, alignBuffers,
retainBuffers);
+ }
+
+ /**
+ * Construct a record batch from nodes.
+ *
+ * @param length how many rows in this batch
+ * @param nodes field level info
+ * @param buffers will be retained until this recordBatch is closed
+ * @param bodyCompression compression info.
+ * @param variadicBufferCounts the number of buffers in each variadic
section.
+ * @param alignBuffers Whether to align buffers to an 8 byte boundary.
+ */
+ public ArrowRecordBatch(
+ int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers,
+ ArrowBodyCompression bodyCompression, List<Long> variadicBufferCounts,
boolean alignBuffers) {
+ this(length, nodes, buffers, bodyCompression, variadicBufferCounts,
alignBuffers, /*retainBuffers*/ true);
+ }
+
+ /**
+ * Construct a record batch from nodes.
+ *
+ * @param length how many rows in this batch
+ * @param nodes field level info
+ * @param buffers will be retained until this recordBatch is closed
+ * @param bodyCompression compression info.
+ * @param variadicBufferCounts the number of buffers in each variadic
section.
+ * @param alignBuffers Whether to align buffers to an 8 byte boundary.
+ * @param retainBuffers Whether to retain() each source buffer in the
constructor. If false, the caller is
+ * responsible for retaining the buffers beforehand.
+ */
+ public ArrowRecordBatch(
+ int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers,
+ ArrowBodyCompression bodyCompression, List<Long> variadicBufferCounts,
boolean alignBuffers,
+ boolean retainBuffers) {
super();
this.length = length;
this.nodes = nodes;
this.buffers = buffers;
Preconditions.checkArgument(bodyCompression != null, "body compression
cannot be null");
this.bodyCompression = bodyCompression;
+ this.variadicBufferCounts = variadicBufferCounts;
List<ArrowBuffer> arrowBuffers = new ArrayList<>(buffers.size());
long offset = 0;
for (ArrowBuf arrowBuf : buffers) {
@@ -129,12 +167,14 @@ public class ArrowRecordBatch implements ArrowMessage {
// to distinguish this from the public constructor.
private ArrowRecordBatch(
boolean dummy, int length, List<ArrowFieldNode> nodes,
- List<ArrowBuf> buffers, ArrowBodyCompression bodyCompression) {
+ List<ArrowBuf> buffers, ArrowBodyCompression bodyCompression,
+ List<Long> variadicBufferCounts) {
this.length = length;
this.nodes = nodes;
this.buffers = buffers;
Preconditions.checkArgument(bodyCompression != null, "body compression
cannot be null");
this.bodyCompression = bodyCompression;
+ this.variadicBufferCounts = variadicBufferCounts;
this.closed = false;
List<ArrowBuffer> arrowBuffers = new ArrayList<>();
long offset = 0;
@@ -179,6 +219,14 @@ public class ArrowRecordBatch implements ArrowMessage {
return buffers;
}
+ /**
+ * Get the record batch variadic buffer counts.
+ * @return the variadic buffer counts
+ */
+ public List<Long> getVariadicBufferCounts() {
+ return variadicBufferCounts;
+ }
+
/**
* Create a new ArrowRecordBatch which has the same information as this
batch but whose buffers
* are owned by that Allocator.
@@ -195,7 +243,7 @@ public class ArrowRecordBatch implements ArrowMessage {
.writerIndex(buf.writerIndex()))
.collect(Collectors.toList());
close();
- return new ArrowRecordBatch(false, length, nodes, newBufs,
bodyCompression);
+ return new ArrowRecordBatch(false, length, nodes, newBufs,
bodyCompression, variadicBufferCounts);
}
/**
@@ -217,6 +265,24 @@ public class ArrowRecordBatch implements ArrowMessage {
if (bodyCompression.getCodec() != NoCompressionCodec.COMPRESSION_TYPE) {
compressOffset = bodyCompression.writeTo(builder);
}
+
+ // Start the variadicBufferCounts vector.
+ int variadicBufferCountsOffset = 0;
+ if (variadicBufferCounts != null && !variadicBufferCounts.isEmpty()) {
+ variadicBufferCountsOffset = variadicBufferCounts.size();
+ int elementSizeInBytes = 8; // Size of long in bytes
+ builder.startVector(elementSizeInBytes, variadicBufferCountsOffset,
elementSizeInBytes);
+
+ // Add each long to the builder. Note that elements should be added in
reverse order.
+ for (int i = variadicBufferCounts.size() - 1; i >= 0; i--) {
+ long value = variadicBufferCounts.get(i);
+ builder.addLong(value);
+ }
+
+ // End the vector. This returns an offset that you can use to refer to
the vector.
+ variadicBufferCountsOffset = builder.endVector();
+ }
+
RecordBatch.startRecordBatch(builder);
RecordBatch.addLength(builder, length);
RecordBatch.addNodes(builder, nodesOffset);
@@ -224,6 +290,12 @@ public class ArrowRecordBatch implements ArrowMessage {
if (bodyCompression.getCodec() != NoCompressionCodec.COMPRESSION_TYPE) {
RecordBatch.addCompression(builder, compressOffset);
}
+
+ // Add the variadicBufferCounts to the RecordBatch
+ if (variadicBufferCounts != null && !variadicBufferCounts.isEmpty()) {
+ RecordBatch.addVariadicBufferCounts(builder, variadicBufferCountsOffset);
+ }
+
return RecordBatch.endRecordBatch(builder);
}
@@ -247,8 +319,13 @@ public class ArrowRecordBatch implements ArrowMessage {
@Override
public String toString() {
+ int variadicBufCount = 0;
+ if (variadicBufferCounts != null && !variadicBufferCounts.isEmpty()) {
+ variadicBufCount = variadicBufferCounts.size();
+ }
return "ArrowRecordBatch [length=" + length + ", nodes=" + nodes + ",
#buffers=" + buffers.size() +
- ", buffersLayout=" + buffersLayout + ", closed=" + closed + "]";
+ ", #variadicBufferCounts=" + variadicBufCount + ", buffersLayout=" +
buffersLayout +
+ ", closed=" + closed + "]";
}
/**
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java
b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java
index 0a67db0455..af5a67049f 100644
---
a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java
+++
b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java
@@ -51,6 +51,7 @@ public class ValidateVectorBufferVisitor implements
VectorVisitor<Void, Void> {
if (vector instanceof FieldVector) {
FieldVector fieldVector = (FieldVector) vector;
+ // TODO: https://github.com/apache/arrow/issues/41734
int typeBufferCount = TypeLayout.getTypeBufferCount(arrowType);
validateOrThrow(fieldVector.getFieldBuffers().size() == typeBufferCount,
"Expected %s buffers in vector of type %s, got %s.",
diff --git
a/java/vector/src/test/java/org/apache/arrow/vector/TestTypeLayout.java
b/java/vector/src/test/java/org/apache/arrow/vector/TestTypeLayout.java
index 97930f433d..5a58133f2e 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestTypeLayout.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestTypeLayout.java
@@ -17,82 +17,158 @@
package org.apache.arrow.vector;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.util.Random;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.IntervalUnit;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.UnionMode;
import org.apache.arrow.vector.types.pojo.ArrowType;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
public class TestTypeLayout {
+ private BufferAllocator allocator;
+
+ @BeforeEach
+ public void prepare() {
+ allocator = new RootAllocator(Integer.MAX_VALUE);
+ }
+
+ @AfterEach
+ public void shutdown() {
+ allocator.close();
+ }
+
+
@Test
public void testTypeBufferCount() {
ArrowType type = new ArrowType.Int(8, true);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Union(UnionMode.Sparse, new int[2]);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Union(UnionMode.Dense, new int[1]);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Struct();
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.List();
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.FixedSizeList(5);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Map(false);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Decimal(10, 10, 128);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Decimal(10, 10, 256);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.FixedSizeBinary(5);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Bool();
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Binary();
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Utf8();
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Null();
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Date(DateUnit.DAY);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Time(TimeUnit.MILLISECOND, 32);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Interval(IntervalUnit.DAY_TIME);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
type = new ArrowType.Duration(TimeUnit.MILLISECOND);
- assertEquals(TypeLayout.getTypeBufferCount(type),
TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ }
+
+ private String generateRandomString(int length) {
+ Random random = new Random();
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ sb.append(random.nextInt(10)); // 0-9
+ }
+ return sb.toString();
+ }
+
+ @Test
+ public void testTypeBufferCountInVectorsWithVariadicBuffers() {
+ // empty vector
+ try (ViewVarCharVector viewVarCharVector = new
ViewVarCharVector("myvector", allocator)) {
+ ArrowType type = viewVarCharVector.getMinorType().getType();
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ }
+ // vector with long strings
+ try (ViewVarCharVector viewVarCharVector = new
ViewVarCharVector("myvector", allocator)) {
+ viewVarCharVector.allocateNew(32, 6);
+
+ viewVarCharVector.setSafe(0, generateRandomString(8).getBytes());
+ viewVarCharVector.setSafe(1, generateRandomString(12).getBytes());
+ viewVarCharVector.setSafe(2, generateRandomString(14).getBytes());
+ viewVarCharVector.setSafe(3, generateRandomString(18).getBytes());
+ viewVarCharVector.setSafe(4, generateRandomString(22).getBytes());
+ viewVarCharVector.setSafe(5, generateRandomString(24).getBytes());
+
+ viewVarCharVector.setValueCount(6);
+
+ ArrowType type = viewVarCharVector.getMinorType().getType();
+ assertEquals(TypeLayout.getTypeBufferCount(type),
+ TypeLayout.getTypeLayout(type).getBufferLayouts().size());
+ }
}
}
diff --git
a/java/vector/src/test/java/org/apache/arrow/vector/TestVarCharViewVector.java
b/java/vector/src/test/java/org/apache/arrow/vector/TestVarCharViewVector.java
index efb5afac91..2d37b0b4eb 100644
---
a/java/vector/src/test/java/org/apache/arrow/vector/TestVarCharViewVector.java
+++
b/java/vector/src/test/java/org/apache/arrow/vector/TestVarCharViewVector.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@@ -41,8 +42,11 @@ import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.memory.util.ArrowBufPointer;
import org.apache.arrow.memory.util.CommonUtil;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.testing.ValueVectorDataPopulator;
import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ReusableByteArray;
import org.apache.arrow.vector.util.Text;
import org.junit.jupiter.api.AfterEach;
@@ -1451,6 +1455,68 @@ public class TestVarCharViewVector {
}
}
+ @Test
+ public void testVectorLoadUnload() {
+
+ try (final ViewVarCharVector vector1 = new ViewVarCharVector("myvector",
allocator)) {
+
+ setVector(vector1, STR1, STR2, STR3, STR4, STR5, STR6);
+
+ assertEquals(5, vector1.getLastSet());
+ vector1.setValueCount(15);
+ assertEquals(14, vector1.getLastSet());
+
+ /* Check the vector output */
+ assertArrayEquals(STR1, vector1.get(0));
+ assertArrayEquals(STR2, vector1.get(1));
+ assertArrayEquals(STR3, vector1.get(2));
+ assertArrayEquals(STR4, vector1.get(3));
+ assertArrayEquals(STR5, vector1.get(4));
+ assertArrayEquals(STR6, vector1.get(5));
+
+ Field field = vector1.getField();
+ String fieldName = field.getName();
+
+ List<Field> fields = new ArrayList<>();
+ List<FieldVector> fieldVectors = new ArrayList<>();
+
+ fields.add(field);
+ fieldVectors.add(vector1);
+
+ Schema schema = new Schema(fields);
+
+ VectorSchemaRoot schemaRoot1 = new VectorSchemaRoot(schema,
fieldVectors, vector1.getValueCount());
+ VectorUnloader vectorUnloader = new VectorUnloader(schemaRoot1);
+
+ try (
+ ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
+ BufferAllocator finalVectorsAllocator =
allocator.newChildAllocator("new vector", 0, Long.MAX_VALUE);
+ VectorSchemaRoot schemaRoot2 = VectorSchemaRoot.create(schema,
finalVectorsAllocator);
+ ) {
+
+ VectorLoader vectorLoader = new VectorLoader(schemaRoot2);
+ vectorLoader.load(recordBatch);
+
+ ViewVarCharVector vector2 = (ViewVarCharVector)
schemaRoot2.getVector(fieldName);
+ /*
+ * lastSet would have internally been set by VectorLoader.load() when
it invokes
+ * loadFieldBuffers.
+ */
+ assertEquals(14, vector2.getLastSet());
+ vector2.setValueCount(25);
+ assertEquals(24, vector2.getLastSet());
+
+ /* Check the vector output */
+ assertArrayEquals(STR1, vector2.get(0));
+ assertArrayEquals(STR2, vector2.get(1));
+ assertArrayEquals(STR3, vector2.get(2));
+ assertArrayEquals(STR4, vector2.get(3));
+ assertArrayEquals(STR5, vector2.get(4));
+ assertArrayEquals(STR6, vector2.get(5));
+ }
+ }
+ }
+
private String generateRandomString(int length) {
Random random = new Random();
StringBuilder sb = new StringBuilder(length);