This is an automated email from the ASF dual-hosted git repository.
ravindra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7b0335f ARROW-5818: [Java][Gandiva] support varlen output vectors
7b0335f is described below
commit 7b0335ffb8cae84e401ce62e5d6d199407a8c621
Author: Pindikura Ravindra <[email protected]>
AuthorDate: Tue Jul 2 18:46:28 2019 +0530
ARROW-5818: [Java][Gandiva] support varlen output vectors
callback to java for resizing varlen vectors
Author: Pindikura Ravindra <[email protected]>
Closes #4771 from pravindra/jvarlen and squashes the following commits:
d9954e865 <Pindikura Ravindra> add check for null expander
b710a7792 <Pindikura Ravindra> ARROW-5818: support varlen output vectors
---
cpp/src/gandiva/jni/jni_common.cc | 106 +++++++++++++++++----
.../apache/arrow/gandiva/evaluator/JniWrapper.java | 3 +-
.../apache/arrow/gandiva/evaluator/Projector.java | 22 ++++-
.../arrow/gandiva/evaluator/VectorExpander.java | 69 ++++++++++++++
.../arrow/gandiva/evaluator/ProjectorTest.java | 2 -
5 files changed, 177 insertions(+), 25 deletions(-)
diff --git a/cpp/src/gandiva/jni/jni_common.cc
b/cpp/src/gandiva/jni/jni_common.cc
index 2ff4bc9..eeaaca7 100644
--- a/cpp/src/gandiva/jni/jni_common.cc
+++ b/cpp/src/gandiva/jni/jni_common.cc
@@ -72,6 +72,11 @@ jclass configuration_builder_class_;
// refs for self.
static jclass gandiva_exception_;
+static jclass vector_expander_class_;
+static jclass vector_expander_ret_class_;
+static jmethodID vector_expander_method_;
+static jfieldID vector_expander_ret_address_;
+static jfieldID vector_expander_ret_capacity_;
// module maps
gandiva::IdToModuleMap<std::shared_ptr<ProjectorHolder>> projector_modules_;
@@ -91,10 +96,27 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
jclass localExceptionClass =
env->FindClass("org/apache/arrow/gandiva/exceptions/GandivaException");
gandiva_exception_ = (jclass)env->NewGlobalRef(localExceptionClass);
+ env->ExceptionDescribe();
env->DeleteLocalRef(localExceptionClass);
- env->ExceptionDescribe();
+ jclass local_expander_class =
+ env->FindClass("org/apache/arrow/gandiva/evaluator/VectorExpander");
+ vector_expander_class_ = (jclass)env->NewGlobalRef(local_expander_class);
+ env->DeleteLocalRef(local_expander_class);
+
+ vector_expander_method_ = env->GetMethodID(
+ vector_expander_class_, "expandOutputVectorAtIndex",
+ "(II)Lorg/apache/arrow/gandiva/evaluator/VectorExpander$ExpandResult;");
+ jclass local_expander_ret_class =
+
env->FindClass("org/apache/arrow/gandiva/evaluator/VectorExpander$ExpandResult");
+ vector_expander_ret_class_ =
(jclass)env->NewGlobalRef(local_expander_ret_class);
+ env->DeleteLocalRef(local_expander_ret_class);
+
+ vector_expander_ret_address_ =
+ env->GetFieldID(vector_expander_ret_class_, "address", "J");
+ vector_expander_ret_capacity_ =
+ env->GetFieldID(vector_expander_ret_class_, "capacity", "I");
return JNI_VERSION;
}
@@ -103,6 +125,8 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION);
env->DeleteGlobalRef(configuration_builder_class_);
env->DeleteGlobalRef(gandiva_exception_);
+ env->DeleteGlobalRef(vector_expander_class_);
+ env->DeleteGlobalRef(vector_expander_ret_class_);
}
DataTypePtr ProtoTypeToTime32(const types::ExtGandivaType& ext_type) {
@@ -637,27 +661,62 @@ err_out:
///
class JavaResizableBuffer : public arrow::ResizableBuffer {
public:
- JavaResizableBuffer(uint8_t* buffer, int32_t len) : ResizableBuffer(buffer,
len) {
+ JavaResizableBuffer(JNIEnv* env, jobject jexpander, int32_t vector_idx,
uint8_t* buffer,
+ int32_t len)
+ : ResizableBuffer(buffer, len),
+ env_(env),
+ jexpander_(jexpander),
+ vector_idx_(vector_idx) {
size_ = 0;
}
- Status Resize(const int64_t new_size, bool shrink_to_fit) override {
- if (shrink_to_fit == true) {
- return Status::NotImplemented("shrink not implemented");
- } else if (new_size < capacity()) {
- size_ = new_size;
- return Status::OK();
- } else {
- // TODO: callback into java to re-alloc the buffer.
- return Status::NotImplemented("buffer expand not implemented");
- }
- }
+ Status Resize(const int64_t new_size, bool shrink_to_fit) override;
Status Reserve(const int64_t new_capacity) override {
return Status::NotImplemented("reserve not implemented");
}
+
+ private:
+ JNIEnv* env_;
+ jobject jexpander_;
+ int32_t vector_idx_;
};
+Status JavaResizableBuffer::Resize(const int64_t new_size, bool shrink_to_fit)
{
+ if (shrink_to_fit == true) {
+ return Status::NotImplemented("shrink not implemented");
+ }
+
+ if (ARROW_PREDICT_TRUE(new_size < capacity())) {
+ // no need to expand.
+ size_ = new_size;
+ return Status::OK();
+ }
+
+ if (new_size > INT32_MAX) {
+ return Status::OutOfMemory("java supports buffer sizes upto 2GB only");
+ }
+
+ // callback into java to expand the buffer
+ int32_t updated_capacity = static_cast<int32_t>(new_size);
+ jobject ret = env_->CallObjectMethod(jexpander_, vector_expander_method_,
vector_idx_,
+ updated_capacity);
+ if (env_->ExceptionCheck()) {
+ env_->ExceptionDescribe();
+ env_->ExceptionClear();
+ return Status::OutOfMemory("buffer expand failed in java");
+ }
+
+ jlong ret_address = env_->GetLongField(ret, vector_expander_ret_address_);
+ jint ret_capacity = env_->GetIntField(ret, vector_expander_ret_capacity_);
+ DCHECK_GE(ret_capacity, updated_capacity);
+
+ data_ = mutable_data_ = reinterpret_cast<uint8_t*>(ret_address);
+ size_ = new_size;
+ capacity_ = ret_capacity;
+ return Status::OK();
+}
+
#define CHECK_OUT_BUFFER_IDX_AND_BREAK(idx, len)
\
if (idx >= len) {
\
status = gandiva::Status::Invalid("insufficient number of out_buf_addrs");
\
@@ -666,9 +725,10 @@ class JavaResizableBuffer : public arrow::ResizableBuffer {
JNIEXPORT void JNICALL
Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
- JNIEnv* env, jobject cls, jlong module_id, jint num_rows, jlongArray
buf_addrs,
- jlongArray buf_sizes, jint sel_vec_type, jint sel_vec_rows, jlong
sel_vec_addr,
- jlong sel_vec_size, jlongArray out_buf_addrs, jlongArray out_buf_sizes) {
+ JNIEnv* env, jobject object, jobject jexpander, jlong module_id, jint
num_rows,
+ jlongArray buf_addrs, jlongArray buf_sizes, jint sel_vec_type, jint
sel_vec_rows,
+ jlong sel_vec_addr, jlong sel_vec_size, jlongArray out_buf_addrs,
+ jlongArray out_buf_sizes) {
Status status;
std::shared_ptr<ProjectorHolder> holder =
projector_modules_.Lookup(module_id);
if (holder == nullptr) {
@@ -735,6 +795,7 @@
Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
ArrayDataVector output;
int buf_idx = 0;
int sz_idx = 0;
+ int output_vector_idx = 0;
for (FieldPtr field : ret_types) {
std::vector<std::shared_ptr<arrow::Buffer>> buffers;
@@ -755,13 +816,24 @@
Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
uint8_t* value_buf = reinterpret_cast<uint8_t*>(out_bufs[buf_idx++]);
jlong data_sz = out_sizes[sz_idx++];
if (arrow::is_binary_like(field->type()->id())) {
- buffers.push_back(std::make_shared<JavaResizableBuffer>(value_buf,
data_sz));
+ if (jexpander == nullptr) {
+ status = Status::Invalid(
+ "expression has variable len output columns, but the expander
object is "
+ "null");
+ break;
+ }
+ buffers.push_back(std::make_shared<JavaResizableBuffer>(
+ env, jexpander, output_vector_idx, value_buf, data_sz));
} else {
buffers.push_back(std::make_shared<arrow::MutableBuffer>(value_buf,
data_sz));
}
auto array_data = arrow::ArrayData::Make(field->type(),
output_row_count, buffers);
output.push_back(array_data);
+ ++output_vector_idx;
+ }
+ if (!status.ok()) {
+ break;
}
status = holder->projector()->Evaluate(*in_batch, selection_vector.get(),
output);
} while (0);
diff --git
a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java
index ef1d63a..520ef5f 100644
---
a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java
+++
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java
@@ -48,6 +48,7 @@ public class JniWrapper {
* Evaluate the expressions represented by the moduleId on a record batch
* and store the output in ValueVectors. Throws an exception in case of
errors
*
+ * @param expander VectorExpander object. Used for callbacks from cpp.
* @param moduleId moduleId representing expressions. Created using a call to
* buildNativeCode
* @param numRows Number of rows in the record batch
@@ -61,7 +62,7 @@ public class JniWrapper {
* @param outSizes The allocated size of the output buffers. On successful
evaluation,
* the result is stored in the output buffers
*/
- native void evaluateProjector(long moduleId, int numRows,
+ native void evaluateProjector(Object expander, long moduleId, int numRows,
long[] bufAddrs, long[] bufSizes,
int selectionVectorType, int
selectionVectorSize,
long selectionVectorBufferAddr, long
selectionVectorBufferSize,
diff --git
a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java
index 93657e6..c15d474 100644
---
a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java
+++
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java
@@ -27,6 +27,7 @@ import org.apache.arrow.gandiva.expression.ArrowTypeHelper;
import org.apache.arrow.gandiva.expression.ExpressionTree;
import org.apache.arrow.gandiva.ipc.GandivaTypes;
import org.apache.arrow.gandiva.ipc.GandivaTypes.SelectionVectorType;
+import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.FixedWidthVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VariableWidthVector;
@@ -236,14 +237,18 @@ public class Projector {
bufSizes[idx++] = bufLayout.getSize();
}
+ boolean hasVariableWidthColumns = false;
+ BaseVariableWidthVector[] resizableVectors = new
BaseVariableWidthVector[outColumns.size()];
long[] outAddrs = new long[3 * outColumns.size()];
long[] outSizes = new long[3 * outColumns.size()];
idx = 0;
+ int outColumnIdx = 0;
for (ValueVector valueVector : outColumns) {
boolean isFixedWith = valueVector instanceof FixedWidthVector;
boolean isVarWidth = valueVector instanceof VariableWidthVector;
if (!isFixedWith && !isVarWidth) {
- throw new UnsupportedTypeException("Unsupported value vector type " +
valueVector.getField().getFieldType());
+ throw new UnsupportedTypeException(
+ "Unsupported value vector type " +
valueVector.getField().getFieldType());
}
outAddrs[idx] = valueVector.getValidityBuffer().memoryAddress();
@@ -251,17 +256,24 @@ public class Projector {
if (isVarWidth) {
outAddrs[idx] = valueVector.getOffsetBuffer().memoryAddress();
outSizes[idx++] = valueVector.getOffsetBuffer().capacity();
+ hasVariableWidthColumns = true;
+
+ // save vector to allow for resizing.
+ resizableVectors[outColumnIdx] = (BaseVariableWidthVector)valueVector;
}
outAddrs[idx] = valueVector.getDataBuffer().memoryAddress();
outSizes[idx++] = valueVector.getDataBuffer().capacity();
valueVector.setValueCount(selectionVectorRecordCount);
+ outColumnIdx++;
}
- wrapper.evaluateProjector(this.moduleId, numRows, bufAddrs, bufSizes,
- selectionVectorType, selectionVectorRecordCount,
- selectionVectorAddr, selectionVectorSize,
- outAddrs, outSizes);
+ wrapper.evaluateProjector(
+ hasVariableWidthColumns ? new VectorExpander(resizableVectors) : null,
+ this.moduleId, numRows, bufAddrs, bufSizes,
+ selectionVectorType, selectionVectorRecordCount,
+ selectionVectorAddr, selectionVectorSize,
+ outAddrs, outSizes);
}
/**
diff --git
a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/VectorExpander.java
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/VectorExpander.java
new file mode 100644
index 0000000..2414144
--- /dev/null
+++
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/VectorExpander.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.gandiva.evaluator;
+
+import org.apache.arrow.vector.BaseVariableWidthVector;
+
+/**
+ * This class provides the functionality to expand output vectors using a
callback mechanism from
+ * gandiva.
+ */
+public class VectorExpander {
+ private final BaseVariableWidthVector[] vectors;
+
+ public VectorExpander(BaseVariableWidthVector[] vectors) {
+ this.vectors = vectors;
+ }
+
+ /**
+ * Result of vector expansion.
+ */
+ public static class ExpandResult {
+ public long address;
+ public int capacity;
+
+ public ExpandResult(long address, int capacity) {
+ this.address = address;
+ this.capacity = capacity;
+ }
+ }
+
+ /**
+ * Expand vector at specified index. This is used as a back call from jni,
and is only
+ * relevant for variable width vectors.
+ *
+ * @param index index of buffer in the list passed to jni.
+ * @param toCapacity the size to which the buffer should be expanded to.
+ *
+ * @return address and size of the buffer after expansion.
+ */
+ public ExpandResult expandOutputVectorAtIndex(int index, int toCapacity) {
+ if (index >= vectors.length || vectors[index] == null) {
+ throw new IllegalArgumentException("invalid index " + index);
+ }
+
+ BaseVariableWidthVector vector = vectors[index];
+ while (vector.getDataBuffer().capacity() < toCapacity) {
+ vector.reallocDataBuffer();
+ }
+ return new ExpandResult(
+ vector.getDataBuffer().memoryAddress(),
+ vector.getDataBuffer().capacity());
+ }
+
+}
diff --git
a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java
b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java
index 2fd8091..52eeb16 100644
---
a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java
+++
b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java
@@ -580,8 +580,6 @@ public class ProjectorTest extends BaseEvaluatorTest {
// test with insufficient data buffer.
try {
outVector.allocateNew(4, numRows);
- thrown.expect(GandivaException.class);
- thrown.expectMessage("expand not implemented");
eval.evaluate(batch, output);
} finally {
releaseRecordBatch(batch);