This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 14615b81442ef07e1e517f5b8e0f46b1cdfff1d2 Author: Pindikura Ravindra <[email protected]> AuthorDate: Sun Sep 23 15:42:22 2018 +0530 [Gandiva] add evaluate variant that skips refs - converting buffers to an ArrowRecordBatch involves talking refs for every batch, which impacts the perf on gandiva. --- .../org/apache/arrow/gandiva/evaluator/Filter.java | 36 +++++++++++++++++----- .../apache/arrow/gandiva/evaluator/Projector.java | 31 ++++++++++++++++--- 2 files changed, 56 insertions(+), 11 deletions(-) diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Filter.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Filter.java index de4a24e..02de6e5 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Filter.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Filter.java @@ -31,6 +31,7 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; /** @@ -92,27 +93,48 @@ public class Filter { } /** - * Invoke this function to evaluate a set of expressions against a recordBatch. + * Invoke this function to evaluate a filter against a recordBatch. * * @param recordBatch Record batch including the data * @param selectionVector Result of applying the filter on the data */ public void evaluate(ArrowRecordBatch recordBatch, SelectionVector selectionVector) throws GandivaException { + evaluate(recordBatch.getLength(), recordBatch.getBuffers(), recordBatch.getBuffersLayout(), + selectionVector); + } + /** + * Invoke this function to evaluate filter against a set of arrow buffers. + * (this is an optimised version that skips taking references). + * + * @param numRows number of rows. + * @param buffers List of input arrow buffers + * @param selectionVector Result of applying the filter on the data + */ + public void evaluate(int numRows, List<ArrowBuf> buffers, + SelectionVector selectionVector) throws GandivaException { + List<ArrowBuffer> buffersLayout = new ArrayList<>(); + long offset = 0; + for (ArrowBuf arrowBuf : buffers) { + long size = arrowBuf.readableBytes(); + buffersLayout.add(new ArrowBuffer(offset, size)); + offset += size; + } + evaluate(numRows, buffers, buffersLayout, selectionVector); + } + + private void evaluate(int numRows, List<ArrowBuf> buffers, List<ArrowBuffer> buffersLayout, + SelectionVector selectionVector) throws GandivaException { if (this.closed) { throw new EvaluatorClosedException(); } - int numRows = recordBatch.getLength(); if (selectionVector.getMaxRecords() < numRows) { - logger.error("selectionVector has capacity for " + numRows - + " rows, minimum required " + recordBatch.getLength()); + logger.error("selectionVector has capacity for " + selectionVector.getMaxRecords() + + " rows, minimum required " + numRows); throw new GandivaException("SelectionVector too small"); } - List<ArrowBuf> buffers = recordBatch.getBuffers(); - List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); - long[] bufAddrs = new long[buffers.size()]; long[] bufSizes = new long[buffers.size()]; diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java index 7213b67..246f71c 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java @@ -31,6 +31,7 @@ import org.apache.arrow.vector.ipc.message.ArrowBuffer; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; +import java.util.ArrayList; import java.util.List; /** @@ -110,6 +111,32 @@ public class Projector { */ public void evaluate(ArrowRecordBatch recordBatch, List<ValueVector> outColumns) throws GandivaException { + evaluate(recordBatch.getLength(), recordBatch.getBuffers(), recordBatch.getBuffersLayout(), + outColumns); + } + + /** + * Invoke this function to evaluate a set of expressions against a set of arrow buffers. + * (this is an optimised version that skips taking references). + * + * @param numRows number of rows. + * @param buffers List of input arrow buffers + * @param outColumns Result of applying the project on the data + */ + public void evaluate(int numRows, List<ArrowBuf> buffers, + List<ValueVector> outColumns) throws GandivaException { + List<ArrowBuffer> buffersLayout = new ArrayList<>(); + long offset = 0; + for (ArrowBuf arrowBuf : buffers) { + long size = arrowBuf.readableBytes(); + buffersLayout.add(new ArrowBuffer(offset, size)); + offset += size; + } + evaluate(numRows, buffers, buffersLayout, outColumns); + } + + private void evaluate(int numRows, List<ArrowBuf> buffers, List<ArrowBuffer> buffersLayout, + List<ValueVector> outColumns) throws GandivaException { if (this.closed) { throw new EvaluatorClosedException(); } @@ -119,9 +146,6 @@ public class Projector { throw new GandivaException("Incorrect number of columns for the output vector"); } - List<ArrowBuf> buffers = recordBatch.getBuffers(); - List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout(); - long[] bufAddrs = new long[buffers.size()]; long[] bufSizes = new long[buffers.size()]; @@ -135,7 +159,6 @@ public class Projector { bufSizes[idx++] = bufLayout.getSize(); } - int numRows = recordBatch.getLength(); long[] outAddrs = new long[2 * outColumns.size()]; long[] outSizes = new long[2 * outColumns.size()]; idx = 0;
