This is an automated email from the ASF dual-hosted git repository.

ravindra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b0335f  ARROW-5818: [Java][Gandiva] support varlen output vectors
7b0335f is described below

commit 7b0335ffb8cae84e401ce62e5d6d199407a8c621
Author: Pindikura Ravindra <ravin...@dremio.com>
AuthorDate: Tue Jul 2 18:46:28 2019 +0530

    ARROW-5818: [Java][Gandiva] support varlen output vectors
    
    callback to java for resizing varlen vectors
    
    Author: Pindikura Ravindra <ravin...@dremio.com>
    
    Closes #4771 from pravindra/jvarlen and squashes the following commits:
    
    d9954e865 <Pindikura Ravindra> add check for null expander
    b710a7792 <Pindikura Ravindra> ARROW-5818:  support varlen output vectors
---
 cpp/src/gandiva/jni/jni_common.cc                  | 106 +++++++++++++++++----
 .../apache/arrow/gandiva/evaluator/JniWrapper.java |   3 +-
 .../apache/arrow/gandiva/evaluator/Projector.java  |  22 ++++-
 .../arrow/gandiva/evaluator/VectorExpander.java    |  69 ++++++++++++++
 .../arrow/gandiva/evaluator/ProjectorTest.java     |   2 -
 5 files changed, 177 insertions(+), 25 deletions(-)

diff --git a/cpp/src/gandiva/jni/jni_common.cc 
b/cpp/src/gandiva/jni/jni_common.cc
index 2ff4bc9..eeaaca7 100644
--- a/cpp/src/gandiva/jni/jni_common.cc
+++ b/cpp/src/gandiva/jni/jni_common.cc
@@ -72,6 +72,11 @@ jclass configuration_builder_class_;
 
 // refs for self.
 static jclass gandiva_exception_;
+static jclass vector_expander_class_;
+static jclass vector_expander_ret_class_;
+static jmethodID vector_expander_method_;
+static jfieldID vector_expander_ret_address_;
+static jfieldID vector_expander_ret_capacity_;
 
 // module maps
 gandiva::IdToModuleMap<std::shared_ptr<ProjectorHolder>> projector_modules_;
@@ -91,10 +96,27 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
   jclass localExceptionClass =
       env->FindClass("org/apache/arrow/gandiva/exceptions/GandivaException");
   gandiva_exception_ = (jclass)env->NewGlobalRef(localExceptionClass);
+  env->ExceptionDescribe();
   env->DeleteLocalRef(localExceptionClass);
 
-  env->ExceptionDescribe();
+  jclass local_expander_class =
+      env->FindClass("org/apache/arrow/gandiva/evaluator/VectorExpander");
+  vector_expander_class_ = (jclass)env->NewGlobalRef(local_expander_class);
+  env->DeleteLocalRef(local_expander_class);
+
+  vector_expander_method_ = env->GetMethodID(
+      vector_expander_class_, "expandOutputVectorAtIndex",
+      "(II)Lorg/apache/arrow/gandiva/evaluator/VectorExpander$ExpandResult;");
 
+  jclass local_expander_ret_class =
+      
env->FindClass("org/apache/arrow/gandiva/evaluator/VectorExpander$ExpandResult");
+  vector_expander_ret_class_ = 
(jclass)env->NewGlobalRef(local_expander_ret_class);
+  env->DeleteLocalRef(local_expander_ret_class);
+
+  vector_expander_ret_address_ =
+      env->GetFieldID(vector_expander_ret_class_, "address", "J");
+  vector_expander_ret_capacity_ =
+      env->GetFieldID(vector_expander_ret_class_, "capacity", "I");
   return JNI_VERSION;
 }
 
@@ -103,6 +125,8 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
   vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION);
   env->DeleteGlobalRef(configuration_builder_class_);
   env->DeleteGlobalRef(gandiva_exception_);
+  env->DeleteGlobalRef(vector_expander_class_);
+  env->DeleteGlobalRef(vector_expander_ret_class_);
 }
 
 DataTypePtr ProtoTypeToTime32(const types::ExtGandivaType& ext_type) {
@@ -637,27 +661,62 @@ err_out:
 ///
 class JavaResizableBuffer : public arrow::ResizableBuffer {
  public:
-  JavaResizableBuffer(uint8_t* buffer, int32_t len) : ResizableBuffer(buffer, 
len) {
+  JavaResizableBuffer(JNIEnv* env, jobject jexpander, int32_t vector_idx, 
uint8_t* buffer,
+                      int32_t len)
+      : ResizableBuffer(buffer, len),
+        env_(env),
+        jexpander_(jexpander),
+        vector_idx_(vector_idx) {
     size_ = 0;
   }
 
-  Status Resize(const int64_t new_size, bool shrink_to_fit) override {
-    if (shrink_to_fit == true) {
-      return Status::NotImplemented("shrink not implemented");
-    } else if (new_size < capacity()) {
-      size_ = new_size;
-      return Status::OK();
-    } else {
-      // TODO: callback into java to re-alloc the buffer.
-      return Status::NotImplemented("buffer expand not implemented");
-    }
-  }
+  Status Resize(const int64_t new_size, bool shrink_to_fit) override;
 
   Status Reserve(const int64_t new_capacity) override {
     return Status::NotImplemented("reserve not implemented");
   }
+
+ private:
+  JNIEnv* env_;
+  jobject jexpander_;
+  int32_t vector_idx_;
 };
 
+Status JavaResizableBuffer::Resize(const int64_t new_size, bool shrink_to_fit) 
{
+  if (shrink_to_fit == true) {
+    return Status::NotImplemented("shrink not implemented");
+  }
+
+  if (ARROW_PREDICT_TRUE(new_size < capacity())) {
+    // no need to expand.
+    size_ = new_size;
+    return Status::OK();
+  }
+
+  if (new_size > INT32_MAX) {
+    return Status::OutOfMemory("java supports buffer sizes upto 2GB only");
+  }
+
+  // callback into java to expand the buffer
+  int32_t updated_capacity = static_cast<int32_t>(new_size);
+  jobject ret = env_->CallObjectMethod(jexpander_, vector_expander_method_, 
vector_idx_,
+                                       updated_capacity);
+  if (env_->ExceptionCheck()) {
+    env_->ExceptionDescribe();
+    env_->ExceptionClear();
+    return Status::OutOfMemory("buffer expand failed in java");
+  }
+
+  jlong ret_address = env_->GetLongField(ret, vector_expander_ret_address_);
+  jint ret_capacity = env_->GetIntField(ret, vector_expander_ret_capacity_);
+  DCHECK_GE(ret_capacity, updated_capacity);
+
+  data_ = mutable_data_ = reinterpret_cast<uint8_t*>(ret_address);
+  size_ = new_size;
+  capacity_ = ret_capacity;
+  return Status::OK();
+}
+
 #define CHECK_OUT_BUFFER_IDX_AND_BREAK(idx, len)                               
\
   if (idx >= len) {                                                            
\
     status = gandiva::Status::Invalid("insufficient number of out_buf_addrs"); 
\
@@ -666,9 +725,10 @@ class JavaResizableBuffer : public arrow::ResizableBuffer {
 
 JNIEXPORT void JNICALL
 Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
-    JNIEnv* env, jobject cls, jlong module_id, jint num_rows, jlongArray 
buf_addrs,
-    jlongArray buf_sizes, jint sel_vec_type, jint sel_vec_rows, jlong 
sel_vec_addr,
-    jlong sel_vec_size, jlongArray out_buf_addrs, jlongArray out_buf_sizes) {
+    JNIEnv* env, jobject object, jobject jexpander, jlong module_id, jint 
num_rows,
+    jlongArray buf_addrs, jlongArray buf_sizes, jint sel_vec_type, jint 
sel_vec_rows,
+    jlong sel_vec_addr, jlong sel_vec_size, jlongArray out_buf_addrs,
+    jlongArray out_buf_sizes) {
   Status status;
   std::shared_ptr<ProjectorHolder> holder = 
projector_modules_.Lookup(module_id);
   if (holder == nullptr) {
@@ -735,6 +795,7 @@ 
Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
     ArrayDataVector output;
     int buf_idx = 0;
     int sz_idx = 0;
+    int output_vector_idx = 0;
     for (FieldPtr field : ret_types) {
       std::vector<std::shared_ptr<arrow::Buffer>> buffers;
 
@@ -755,13 +816,24 @@ 
Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
       uint8_t* value_buf = reinterpret_cast<uint8_t*>(out_bufs[buf_idx++]);
       jlong data_sz = out_sizes[sz_idx++];
       if (arrow::is_binary_like(field->type()->id())) {
-        buffers.push_back(std::make_shared<JavaResizableBuffer>(value_buf, 
data_sz));
+        if (jexpander == nullptr) {
+          status = Status::Invalid(
+              "expression has variable len output columns, but the expander 
object is "
+              "null");
+          break;
+        }
+        buffers.push_back(std::make_shared<JavaResizableBuffer>(
+            env, jexpander, output_vector_idx, value_buf, data_sz));
       } else {
         buffers.push_back(std::make_shared<arrow::MutableBuffer>(value_buf, 
data_sz));
       }
 
       auto array_data = arrow::ArrayData::Make(field->type(), 
output_row_count, buffers);
       output.push_back(array_data);
+      ++output_vector_idx;
+    }
+    if (!status.ok()) {
+      break;
     }
     status = holder->projector()->Evaluate(*in_batch, selection_vector.get(), 
output);
   } while (0);
diff --git 
a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java 
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java
index ef1d63a..520ef5f 100644
--- 
a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java
+++ 
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java
@@ -48,6 +48,7 @@ public class JniWrapper {
    * Evaluate the expressions represented by the moduleId on a record batch
    * and store the output in ValueVectors. Throws an exception in case of 
errors
    *
+   * @param expander VectorExpander object. Used for callbacks from cpp.
    * @param moduleId moduleId representing expressions. Created using a call to
    *                 buildNativeCode
    * @param numRows Number of rows in the record batch
@@ -61,7 +62,7 @@ public class JniWrapper {
    * @param outSizes The allocated size of the output buffers. On successful 
evaluation,
    *                 the result is stored in the output buffers
    */
-  native void evaluateProjector(long moduleId, int numRows,
+  native void evaluateProjector(Object expander, long moduleId, int numRows,
                                 long[] bufAddrs, long[] bufSizes,
                                 int selectionVectorType, int 
selectionVectorSize,
                                 long selectionVectorBufferAddr, long 
selectionVectorBufferSize,
diff --git 
a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java 
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java
index 93657e6..c15d474 100644
--- 
a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java
+++ 
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java
@@ -27,6 +27,7 @@ import org.apache.arrow.gandiva.expression.ArrowTypeHelper;
 import org.apache.arrow.gandiva.expression.ExpressionTree;
 import org.apache.arrow.gandiva.ipc.GandivaTypes;
 import org.apache.arrow.gandiva.ipc.GandivaTypes.SelectionVectorType;
+import org.apache.arrow.vector.BaseVariableWidthVector;
 import org.apache.arrow.vector.FixedWidthVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VariableWidthVector;
@@ -236,14 +237,18 @@ public class Projector {
       bufSizes[idx++] = bufLayout.getSize();
     }
 
+    boolean hasVariableWidthColumns = false;
+    BaseVariableWidthVector[] resizableVectors = new 
BaseVariableWidthVector[outColumns.size()];
     long[] outAddrs = new long[3 * outColumns.size()];
     long[] outSizes = new long[3 * outColumns.size()];
     idx = 0;
+    int outColumnIdx = 0;
     for (ValueVector valueVector : outColumns) {
       boolean isFixedWith = valueVector instanceof FixedWidthVector;
       boolean isVarWidth = valueVector instanceof VariableWidthVector;
       if (!isFixedWith && !isVarWidth) {
-        throw new UnsupportedTypeException("Unsupported value vector type " + 
valueVector.getField().getFieldType());
+        throw new UnsupportedTypeException(
+            "Unsupported value vector type " + 
valueVector.getField().getFieldType());
       }
 
       outAddrs[idx] = valueVector.getValidityBuffer().memoryAddress();
@@ -251,17 +256,24 @@ public class Projector {
       if (isVarWidth) {
         outAddrs[idx] = valueVector.getOffsetBuffer().memoryAddress();
         outSizes[idx++] = valueVector.getOffsetBuffer().capacity();
+        hasVariableWidthColumns = true;
+
+        // save vector to allow for resizing.
+        resizableVectors[outColumnIdx] = (BaseVariableWidthVector)valueVector;
       }
       outAddrs[idx] = valueVector.getDataBuffer().memoryAddress();
       outSizes[idx++] = valueVector.getDataBuffer().capacity();
 
       valueVector.setValueCount(selectionVectorRecordCount);
+      outColumnIdx++;
     }
 
-    wrapper.evaluateProjector(this.moduleId, numRows, bufAddrs, bufSizes,
-            selectionVectorType, selectionVectorRecordCount,
-            selectionVectorAddr, selectionVectorSize, 
-            outAddrs, outSizes);
+    wrapper.evaluateProjector(
+        hasVariableWidthColumns ? new VectorExpander(resizableVectors) : null,
+        this.moduleId, numRows, bufAddrs, bufSizes,
+        selectionVectorType, selectionVectorRecordCount,
+        selectionVectorAddr, selectionVectorSize,
+        outAddrs, outSizes);
   }
 
   /**
diff --git 
a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/VectorExpander.java
 
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/VectorExpander.java
new file mode 100644
index 0000000..2414144
--- /dev/null
+++ 
b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/VectorExpander.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.gandiva.evaluator;
+
+import org.apache.arrow.vector.BaseVariableWidthVector;
+
+/**
+ * This class provides the functionality to expand output vectors using a 
callback mechanism from
+ * gandiva.
+ */
+public class VectorExpander {
+  private final BaseVariableWidthVector[] vectors;
+
+  public VectorExpander(BaseVariableWidthVector[] vectors) {
+    this.vectors = vectors;
+  }
+
+  /**
+   * Result of vector expansion.
+   */
+  public static class ExpandResult {
+    public long address;
+    public int capacity;
+
+    public ExpandResult(long address, int capacity) {
+      this.address = address;
+      this.capacity = capacity;
+    }
+  }
+
+  /**
+   * Expand vector at specified index. This is used as a back call from jni, 
and is only
+   * relevant for variable width vectors.
+   *
+   * @param index index of buffer in the list passed to jni.
+   * @param toCapacity the size to which the buffer should be expanded to.
+   *
+   * @return address and size  of the buffer after expansion.
+   */
+  public ExpandResult expandOutputVectorAtIndex(int index, int toCapacity) {
+    if (index >= vectors.length || vectors[index] == null) {
+      throw new IllegalArgumentException("invalid index " + index);
+    }
+
+    BaseVariableWidthVector vector = vectors[index];
+    while (vector.getDataBuffer().capacity() < toCapacity) {
+      vector.reallocDataBuffer();
+    }
+    return new ExpandResult(
+        vector.getDataBuffer().memoryAddress(),
+        vector.getDataBuffer().capacity());
+  }
+
+}
diff --git 
a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java
 
b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java
index 2fd8091..52eeb16 100644
--- 
a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java
+++ 
b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java
@@ -580,8 +580,6 @@ public class ProjectorTest extends BaseEvaluatorTest {
     // test with insufficient data buffer.
     try {
       outVector.allocateNew(4, numRows);
-      thrown.expect(GandivaException.class);
-      thrown.expectMessage("expand not implemented");
       eval.evaluate(batch, output);
     } finally {
       releaseRecordBatch(batch);

Reply via email to