This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 225f699f5a Clean up TransformEvaluator (#13516)
225f699f5a is described below
commit 225f699f5ae227f0fa2d052bcabefb14d335cb53
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Jul 1 17:07:18 2024 -0700
Clean up TransformEvaluator (#13516)
---
.../apache/pinot/core/common/DataBlockCache.java | 122 ----
.../org/apache/pinot/core/common/DataFetcher.java | 227 +------
.../evaluators/DefaultJsonPathEvaluator.java | 704 ---------------------
.../core/operator/blocks/ProjectionBlock.java | 123 ----
.../function/IdentifierTransformFunction.java | 65 +-
.../JsonExtractScalarTransformFunction.java | 264 +++-----
.../function/PushDownTransformFunction.java | 135 ----
.../evaluators/DefaultJsonPathEvaluatorTest.java | 146 -----
.../query/runtime/queries/QueryRunnerTest.java | 2 +-
.../segment/spi/evaluator/TransformEvaluator.java | 168 -----
.../spi/evaluator/json/JsonPathEvaluator.java | 30 -
.../evaluator/json/JsonPathEvaluatorProvider.java | 33 -
.../spi/evaluator/json/JsonPathEvaluators.java | 147 -----
13 files changed, 96 insertions(+), 2070 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
index 58bcc42bf1..3e474da62b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
import org.apache.pinot.spi.data.FieldSpec;
@@ -134,17 +133,6 @@ public class DataBlockCache {
return intValues;
}
- /**
- * Get the int values for a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- public void fillValues(String column, TransformEvaluator evaluator, int[]
buffer) {
- _dataFetcher.fetchIntValues(column, evaluator, _docIds, _length, buffer);
- }
-
/**
* Get the long values for a single-valued column.
*
@@ -163,17 +151,6 @@ public class DataBlockCache {
return longValues;
}
- /**
- * Get the long values for a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- public void fillValues(String column, TransformEvaluator evaluator, long[]
buffer) {
- _dataFetcher.fetchLongValues(column, evaluator, _docIds, _length, buffer);
- }
-
/**
* Get the float values for a single-valued column.
*
@@ -192,17 +169,6 @@ public class DataBlockCache {
return floatValues;
}
- /**
- * Get the float values for a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- public void fillValues(String column, TransformEvaluator evaluator, float[]
buffer) {
- _dataFetcher.fetchFloatValues(column, evaluator, _docIds, _length, buffer);
- }
-
/**
* Get the double values for a single-valued column.
*
@@ -221,17 +187,6 @@ public class DataBlockCache {
return doubleValues;
}
- /**
- * Get the double values for a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- public void fillValues(String column, TransformEvaluator evaluator, double[]
buffer) {
- _dataFetcher.fetchDoubleValues(column, evaluator, _docIds, _length,
buffer);
- }
-
/**
* Get the BigDecimal values for a single-valued column.
*
@@ -250,17 +205,6 @@ public class DataBlockCache {
return bigDecimalValues;
}
- /**
- * Get the BigDecimal values for a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- public void fillValues(String column, TransformEvaluator evaluator,
BigDecimal[] buffer) {
- _dataFetcher.fetchBigDecimalValues(column, evaluator, _docIds, _length,
buffer);
- }
-
/**
* Get the string values for a single-valued column.
*
@@ -279,17 +223,6 @@ public class DataBlockCache {
return stringValues;
}
- /**
- * Get the string values for a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- public void fillValues(String column, TransformEvaluator evaluator, String[]
buffer) {
- _dataFetcher.fetchStringValues(column, evaluator, _docIds, _length,
buffer);
- }
-
/**
* Get byte[] values for the given single-valued column.
*
@@ -348,17 +281,6 @@ public class DataBlockCache {
return intValues;
}
- /**
- * Get the int[][] values for a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- public void fillValues(String column, TransformEvaluator evaluator, int[][]
buffer) {
- _dataFetcher.fetchIntValues(column, evaluator, _docIds, _length, buffer);
- }
-
/**
* Get the long values for a multi-valued column.
*
@@ -377,17 +299,6 @@ public class DataBlockCache {
return longValues;
}
- /**
- * Get the long[][] values for a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- public void fillValues(String column, TransformEvaluator evaluator, long[][]
buffer) {
- _dataFetcher.fetchLongValues(column, evaluator, _docIds, _length, buffer);
- }
-
/**
* Get the float values for a multi-valued column.
*
@@ -406,17 +317,6 @@ public class DataBlockCache {
return floatValues;
}
- /**
- * Get the float[][] values for a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- public void fillValues(String column, TransformEvaluator evaluator,
float[][] buffer) {
- _dataFetcher.fetchFloatValues(column, evaluator, _docIds, _length, buffer);
- }
-
/**
* Get the double values for a multi-valued column.
*
@@ -435,17 +335,6 @@ public class DataBlockCache {
return doubleValues;
}
- /**
- * Get the double[][] values for a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- public void fillValues(String column, TransformEvaluator evaluator,
double[][] buffer) {
- _dataFetcher.fetchDoubleValues(column, evaluator, _docIds, _length,
buffer);
- }
-
/**
* Get the string values for a multi-valued column.
*
@@ -464,17 +353,6 @@ public class DataBlockCache {
return stringValues;
}
- /**
- * Get the String[][] values for a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- public void fillValues(String column, TransformEvaluator evaluator,
String[][] buffer) {
- _dataFetcher.fetchStringValues(column, evaluator, _docIds, _length,
buffer);
- }
-
/**
* Get the bytes values for a multi-valued column.
*
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
index 4e2d13a72c..6ed1822d83 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
@@ -29,7 +29,6 @@ import javax.annotation.Nullable;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
-import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
@@ -71,8 +70,7 @@ public class DataFetcher {
ForwardIndexReader<?> forwardIndexReader = dataSource.getForwardIndex();
Preconditions.checkState(forwardIndexReader != null,
"Forward index disabled for column: %s, cannot create DataFetcher!",
column);
- ColumnValueReader columnValueReader =
- new ColumnValueReader(forwardIndexReader,
dataSource.getDictionary());
+ ColumnValueReader columnValueReader = new
ColumnValueReader(forwardIndexReader, dataSource.getDictionary());
_columnValueReaderMap.put(column, columnValueReader);
if (!dataSourceMetadata.isSingleValue()) {
maxNumValuesPerMVEntry = Math.max(maxNumValuesPerMVEntry,
dataSourceMetadata.getMaxNumValuesPerMVEntry());
@@ -110,19 +108,6 @@ public class DataFetcher {
_columnValueReaderMap.get(column).readIntValues(inDocIds, length,
outValues);
}
- /**
- * Fetch and transform the int values from a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param inDocIds Input document Ids buffer
- * @param length Number of input document Ids
- * @param outValues Buffer for output
- */
- public void fetchIntValues(String column, TransformEvaluator evaluator,
int[] inDocIds, int length, int[] outValues) {
- _columnValueReaderMap.get(column).readIntValues(evaluator, inDocIds,
length, outValues);
- }
-
/**
* Fetch the long values for a single-valued column.
*
@@ -135,20 +120,6 @@ public class DataFetcher {
_columnValueReaderMap.get(column).readLongValues(inDocIds, length,
outValues);
}
- /**
- * Fetch and transform the int values from a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param inDocIds Input document Ids buffer
- * @param length Number of input document Ids
- * @param outValues Buffer for output
- */
- public void fetchLongValues(String column, TransformEvaluator evaluator,
int[] inDocIds, int length,
- long[] outValues) {
- _columnValueReaderMap.get(column).readLongValues(evaluator, inDocIds,
length, outValues);
- }
-
/**
* Fetch long values for a single-valued column.
*
@@ -161,20 +132,6 @@ public class DataFetcher {
_columnValueReaderMap.get(column).readFloatValues(inDocIds, length,
outValues);
}
- /**
- * Fetch and transform float values from a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param inDocIds Input document Ids buffer
- * @param length Number of input document Ids
- * @param outValues Buffer for output
- */
- public void fetchFloatValues(String column, TransformEvaluator evaluator,
int[] inDocIds, int length,
- float[] outValues) {
- _columnValueReaderMap.get(column).readFloatValues(evaluator, inDocIds,
length, outValues);
- }
-
/**
* Fetch the double values for a single-valued column.
*
@@ -187,20 +144,6 @@ public class DataFetcher {
_columnValueReaderMap.get(column).readDoubleValues(inDocIds, length,
outValues);
}
- /**
- * Fetch and transform double values from a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param inDocIds Input document Ids buffer
- * @param length Number of input document Ids
- * @param outValues Buffer for output
- */
- public void fetchDoubleValues(String column, TransformEvaluator evaluator,
int[] inDocIds, int length,
- double[] outValues) {
- _columnValueReaderMap.get(column).readDoubleValues(evaluator, inDocIds,
length, outValues);
- }
-
/**
* Fetch the BigDecimal values for a single-valued column.
*
@@ -213,20 +156,6 @@ public class DataFetcher {
_columnValueReaderMap.get(column).readBigDecimalValues(inDocIds, length,
outValues);
}
- /**
- * Fetch and transform BigDecimal values from a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param inDocIds Input document Ids buffer
- * @param length Number of input document Ids
- * @param outValues Buffer for output
- */
- public void fetchBigDecimalValues(String column, TransformEvaluator
evaluator, int[] inDocIds, int length,
- BigDecimal[] outValues) {
- _columnValueReaderMap.get(column).readBigDecimalValues(evaluator,
inDocIds, length, outValues);
- }
-
/**
* Fetch the string values for a single-valued column.
*
@@ -239,20 +168,6 @@ public class DataFetcher {
_columnValueReaderMap.get(column).readStringValues(inDocIds, length,
outValues);
}
- /**
- * Fetch and transform String values from a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param inDocIds Input document Ids buffer
- * @param length Number of input document Ids
- * @param outValues Buffer for output
- */
- public void fetchStringValues(String column, TransformEvaluator evaluator,
int[] inDocIds, int length,
- String[] outValues) {
- _columnValueReaderMap.get(column).readStringValues(evaluator, inDocIds,
length, outValues);
- }
-
/**
* Fetch byte[] values for a single-valued column.
*
@@ -293,20 +208,6 @@ public class DataFetcher {
_columnValueReaderMap.get(column).readIntValuesMV(inDocIds, length,
outValues);
}
- /**
- * Fetch int[] values from a JSON column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param inDocIds Input document Ids buffer
- * @param length Number of input document Ids
- * @param outValues Buffer for output
- */
- public void fetchIntValues(String column, TransformEvaluator evaluator,
int[] inDocIds, int length,
- int[][] outValues) {
- _columnValueReaderMap.get(column).readIntValuesMV(evaluator, inDocIds,
length, outValues);
- }
-
/**
* Fetch the long values for a multi-valued column.
*
@@ -319,20 +220,6 @@ public class DataFetcher {
_columnValueReaderMap.get(column).readLongValuesMV(inDocIds, length,
outValues);
}
- /**
- * Fetch and transform long[] values from a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param inDocIds Input document Ids buffer
- * @param length Number of input document Ids
- * @param outValues Buffer for output
- */
- public void fetchLongValues(String column, TransformEvaluator evaluator,
int[] inDocIds, int length,
- long[][] outValues) {
- _columnValueReaderMap.get(column).readLongValuesMV(evaluator, inDocIds,
length, outValues);
- }
-
/**
* Fetch the float values for a multi-valued column.
*
@@ -345,20 +232,6 @@ public class DataFetcher {
_columnValueReaderMap.get(column).readFloatValuesMV(inDocIds, length,
outValues);
}
- /**
- * Fetch and transform float[] values from a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param inDocIds Input document Ids buffer
- * @param length Number of input document Ids
- * @param outValues Buffer for output
- */
- public void fetchFloatValues(String column, TransformEvaluator evaluator,
int[] inDocIds, int length,
- float[][] outValues) {
- _columnValueReaderMap.get(column).readFloatValuesMV(evaluator, inDocIds,
length, outValues);
- }
-
/**
* Fetch the double values for a multi-valued column.
*
@@ -371,20 +244,6 @@ public class DataFetcher {
_columnValueReaderMap.get(column).readDoubleValuesMV(inDocIds, length,
outValues);
}
- /**
- * Fetch and transform double[] values from a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param inDocIds Input document Ids buffer
- * @param length Number of input document Ids
- * @param outValues Buffer for output
- */
- public void fetchDoubleValues(String column, TransformEvaluator evaluator,
int[] inDocIds, int length,
- double[][] outValues) {
- _columnValueReaderMap.get(column).readDoubleValuesMV(evaluator, inDocIds,
length, outValues);
- }
-
/**
* Fetch the string values for a multi-valued column.
*
@@ -397,20 +256,6 @@ public class DataFetcher {
_columnValueReaderMap.get(column).readStringValuesMV(inDocIds, length,
outValues);
}
- /**
- * Fetch and transform String[][] values from a column.
- *
- * @param column Column name
- * @param evaluator transform evaluator
- * @param inDocIds Input document Ids buffer
- * @param length Number of input document Ids
- * @param outValues Buffer for output
- */
- public void fetchStringValues(String column, TransformEvaluator evaluator,
int[] inDocIds, int length,
- String[][] outValues) {
- _columnValueReaderMap.get(column).readStringValuesMV(evaluator, inDocIds,
length, outValues);
- }
-
/**
* Fetch the bytes values for a multi-valued column.
*
@@ -484,12 +329,6 @@ public class DataFetcher {
}
}
- void readIntValues(TransformEvaluator evaluator, int[] docIds, int length,
int[] valueBuffer) {
- Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
- evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
- valueBuffer);
- }
-
void readLongValues(int[] docIds, int length, long[] valueBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
@@ -502,12 +341,6 @@ public class DataFetcher {
}
}
- void readLongValues(TransformEvaluator evaluator, int[] docIds, int
length, long[] valueBuffer) {
- Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
- evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
- valueBuffer);
- }
-
void readFloatValues(int[] docIds, int length, float[] valueBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
@@ -520,12 +353,6 @@ public class DataFetcher {
}
}
- void readFloatValues(TransformEvaluator evaluator, int[] docIds, int
length, float[] valueBuffer) {
- Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
- evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
- valueBuffer);
- }
-
void readDoubleValues(int[] docIds, int length, double[] valueBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
@@ -538,12 +365,6 @@ public class DataFetcher {
}
}
- void readDoubleValues(TransformEvaluator evaluator, int[] docIds, int
length, double[] valueBuffer) {
- Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
- evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
- valueBuffer);
- }
-
void readBigDecimalValues(int[] docIds, int length, BigDecimal[]
valueBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
@@ -556,12 +377,6 @@ public class DataFetcher {
}
}
- void readBigDecimalValues(TransformEvaluator evaluator, int[] docIds, int
length, BigDecimal[] valueBuffer) {
- Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
- evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
- valueBuffer);
- }
-
void readStringValues(int[] docIds, int length, String[] valueBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
@@ -612,12 +427,6 @@ public class DataFetcher {
}
}
- void readStringValues(TransformEvaluator evaluator, int[] docIds, int
length, String[] valueBuffer) {
- Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
- evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
- valueBuffer);
- }
-
void readBytesValues(int[] docIds, int length, byte[][] valueBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
@@ -656,12 +465,6 @@ public class DataFetcher {
}
}
- void readIntValuesMV(TransformEvaluator evaluator, int[] docIds, int
length, int[][] valuesBuffer) {
- Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
- evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
- valuesBuffer);
- }
-
void readLongValuesMV(int[] docIds, int length, long[][] valuesBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
@@ -677,12 +480,6 @@ public class DataFetcher {
}
}
- void readLongValuesMV(TransformEvaluator evaluator, int[] docIds, int
length, long[][] valuesBuffer) {
- Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
- evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
- valuesBuffer);
- }
-
void readFloatValuesMV(int[] docIds, int length, float[][] valuesBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
@@ -698,12 +495,6 @@ public class DataFetcher {
}
}
- void readFloatValuesMV(TransformEvaluator evaluator, int[] docIds, int
length, float[][] valuesBuffer) {
- Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
- evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
- valuesBuffer);
- }
-
void readDoubleValuesMV(int[] docIds, int length, double[][] valuesBuffer)
{
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
@@ -719,12 +510,6 @@ public class DataFetcher {
}
}
- void readDoubleValuesMV(TransformEvaluator evaluator, int[] docIds, int
length, double[][] valuesBuffer) {
- Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
- evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
- valuesBuffer);
- }
-
void readStringValuesMV(int[] docIds, int length, String[][] valuesBuffer)
{
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
@@ -740,12 +525,6 @@ public class DataFetcher {
}
}
- void readStringValuesMV(TransformEvaluator evaluator, int[] docIds, int
length, String[][] valuesBuffer) {
- Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
- evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
- valuesBuffer);
- }
-
void readBytesValuesMV(int[] docIds, int length, byte[][][] valuesBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
@@ -768,10 +547,6 @@ public class DataFetcher {
}
}
- private int[] getSVDictIdsBuffer() {
- return _dictionary == null ? null : THREAD_LOCAL_DICT_IDS.get();
- }
-
@Override
public void close()
throws IOException {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
deleted file mode 100644
index cae1e3c17b..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
+++ /dev/null
@@ -1,704 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.common.evaluators;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.jayway.jsonpath.Configuration;
-import com.jayway.jsonpath.InvalidPathException;
-import com.jayway.jsonpath.JsonPath;
-import com.jayway.jsonpath.Option;
-import com.jayway.jsonpath.ParseContext;
-import com.jayway.jsonpath.TypeRef;
-import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
-import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
-import java.math.BigDecimal;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.pinot.common.function.JsonPathCache;
-import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluator;
-import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.utils.JsonUtils;
-
-
-public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
-
- // This ObjectMapper requires special configurations, hence we can't use
pinot JsonUtils here.
- private static final ObjectMapper OBJECT_MAPPER_WITH_BIG_DECIMAL =
- new
ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
-
- private static final ParseContext JSON_PARSER_CONTEXT = JsonPath.using(
- new Configuration.ConfigurationBuilder().jsonProvider(new
JacksonJsonProvider())
- .mappingProvider(new
JacksonMappingProvider()).options(Option.SUPPRESS_EXCEPTIONS).build());
-
- private static final ParseContext JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL =
JsonPath.using(
- new Configuration.ConfigurationBuilder().jsonProvider(new
JacksonJsonProvider(OBJECT_MAPPER_WITH_BIG_DECIMAL))
- .mappingProvider(new
JacksonMappingProvider()).options(Option.SUPPRESS_EXCEPTIONS).build());
-
- private static final int[] EMPTY_INTS = new int[0];
- private static final long[] EMPTY_LONGS = new long[0];
- private static final float[] EMPTY_FLOATS = new float[0];
- private static final double[] EMPTY_DOUBLES = new double[0];
- private static final String[] EMPTY_STRINGS = new String[0];
- private static final TypeRef<List<Integer>> INTEGER_LIST_TYPE = new
TypeRef<List<Integer>>() {
- };
- private static final TypeRef<List<Long>> LONG_LIST_TYPE = new
TypeRef<List<Long>>() {
- };
- private static final TypeRef<List<Float>> FLOAT_LIST_TYPE = new
TypeRef<List<Float>>() {
- };
- private static final TypeRef<List<Double>> DOUBLE_LIST_TYPE = new
TypeRef<List<Double>>() {
- };
- private static final TypeRef<List<String>> STRING_LIST_TYPE = new
TypeRef<List<String>>() {
- };
-
- public static JsonPathEvaluator create(String jsonPath, @Nullable Object
defaultValue) {
- try {
- return new
DefaultJsonPathEvaluator(JsonPathCache.INSTANCE.getOrCompute(jsonPath),
defaultValue);
- } catch (InvalidPathException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- private final JsonPath _jsonPath;
- private final Object _defaultValue;
-
- private DefaultJsonPathEvaluator(JsonPath jsonPath, @Nullable Object
defaultValue) {
- _jsonPath = jsonPath;
- _defaultValue = defaultValue;
- }
-
- public <T extends ForwardIndexReaderContext> void evaluateBlock(int[]
docIds, int length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, int[] valueBuffer) {
- int defaultValue = (_defaultValue instanceof Number) ? ((Number)
_defaultValue).intValue() : 0;
- if (reader.isDictionaryEncoded()) {
- reader.readDictIds(docIds, length, dictIdsBuffer, context);
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytes(dictionary, dictIdsBuffer[i]),
defaultValue, valueBuffer);
- }
- } else {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromString(dictionary, dictIdsBuffer[i]),
defaultValue, valueBuffer);
- }
- }
- } else {
- switch (reader.getStoredType()) {
- case STRING:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromString(reader, context, docIds[i]),
defaultValue, valueBuffer);
- }
- break;
- case BYTES:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytes(reader, context, docIds[i]),
defaultValue, valueBuffer);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
- }
-
- public <T extends ForwardIndexReaderContext> void evaluateBlock(int[]
docIds, int length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, long[] valueBuffer) {
- long defaultValue = (_defaultValue instanceof Number) ? ((Number)
_defaultValue).longValue() : 0L;
- if (reader.isDictionaryEncoded()) {
- reader.readDictIds(docIds, length, dictIdsBuffer, context);
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytes(dictionary, dictIdsBuffer[i]),
defaultValue, valueBuffer);
- }
- } else {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromString(dictionary, dictIdsBuffer[i]),
defaultValue, valueBuffer);
- }
- }
- } else {
- switch (reader.getStoredType()) {
- case STRING:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromString(reader, context, docIds[i]),
defaultValue, valueBuffer);
- }
- break;
- case BYTES:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytes(reader, context, docIds[i]),
defaultValue, valueBuffer);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
- }
-
- public <T extends ForwardIndexReaderContext> void evaluateBlock(int[]
docIds, int length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, float[] valueBuffer) {
- float defaultValue = (_defaultValue instanceof Number) ? ((Number)
_defaultValue).floatValue() : 0F;
- if (reader.isDictionaryEncoded()) {
- reader.readDictIds(docIds, length, dictIdsBuffer, context);
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytes(dictionary, dictIdsBuffer[i]),
defaultValue, valueBuffer);
- }
- } else {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromString(dictionary, dictIdsBuffer[i]),
defaultValue, valueBuffer);
- }
- }
- } else {
- switch (reader.getStoredType()) {
- case STRING:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromString(reader, context, docIds[i]),
defaultValue, valueBuffer);
- }
- break;
- case BYTES:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytes(reader, context, docIds[i]),
defaultValue, valueBuffer);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
- }
-
- public <T extends ForwardIndexReaderContext> void evaluateBlock(int[]
docIds, int length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, double[] valueBuffer) {
- double defaultValue = (_defaultValue instanceof Number) ? ((Number)
_defaultValue).doubleValue() : 0D;
- if (reader.isDictionaryEncoded()) {
- reader.readDictIds(docIds, length, dictIdsBuffer, context);
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytes(dictionary, dictIdsBuffer[i]),
defaultValue, valueBuffer);
- }
- } else {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromString(dictionary, dictIdsBuffer[i]),
defaultValue, valueBuffer);
- }
- }
- } else {
- switch (reader.getStoredType()) {
- case STRING:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromString(reader, context, docIds[i]),
defaultValue, valueBuffer);
- }
- break;
- case BYTES:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytes(reader, context, docIds[i]),
defaultValue, valueBuffer);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
- }
-
- public <T extends ForwardIndexReaderContext> void evaluateBlock(int[]
docIds, int length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, BigDecimal[] valueBuffer) {
- BigDecimal defaultValue = (_defaultValue instanceof BigDecimal) ?
((BigDecimal) _defaultValue) : BigDecimal.ZERO;
- if (reader.isDictionaryEncoded()) {
- reader.readDictIds(docIds, length, dictIdsBuffer, context);
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytesWithExactBigDecimal(dictionary,
dictIdsBuffer[i]), defaultValue, valueBuffer);
- }
- } else {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromStringWithExactBigDecimal(dictionary,
dictIdsBuffer[i]), defaultValue,
- valueBuffer);
- }
- }
- } else {
- switch (reader.getStoredType()) {
- case STRING:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromStringWithExactBigDecimal(reader,
context, docIds[i]), defaultValue,
- valueBuffer);
- }
- break;
- case BYTES:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytesWithExactBigDecimal(reader,
context, docIds[i]), defaultValue, valueBuffer);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
- }
-
- public <T extends ForwardIndexReaderContext> void evaluateBlock(int[]
docIds, int length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, String[] valueBuffer) {
- if (reader.isDictionaryEncoded()) {
- reader.readDictIds(docIds, length, dictIdsBuffer, context);
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytes(dictionary, dictIdsBuffer[i]),
valueBuffer);
- }
- } else {
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromString(dictionary, dictIdsBuffer[i]),
valueBuffer);
- }
- }
- } else {
- switch (reader.getStoredType()) {
- case STRING:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromString(reader, context, docIds[i]),
valueBuffer);
- }
- break;
- case BYTES:
- for (int i = 0; i < length; i++) {
- processValue(i, extractFromBytes(reader, context, docIds[i]),
valueBuffer);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
- }
-
- public <T extends ForwardIndexReaderContext> void evaluateBlock(int[]
docIds, int length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, int[][] valueBuffer) {
- if (reader.isDictionaryEncoded()) {
- reader.readDictIds(docIds, length, dictIdsBuffer, context);
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
- for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(dictionary, dictIdsBuffer[i],
INTEGER_LIST_TYPE), valueBuffer);
- }
- } else {
- for (int i = 0; i < length; i++) {
- processList(i, extractFromString(dictionary, dictIdsBuffer[i],
INTEGER_LIST_TYPE), valueBuffer);
- }
- }
- } else {
- switch (reader.getStoredType()) {
- case STRING:
- for (int i = 0; i < length; i++) {
- processList(i, extractFromString(reader, context, docIds[i],
INTEGER_LIST_TYPE), valueBuffer);
- }
- break;
- case BYTES:
- for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(reader, context, docIds[i],
INTEGER_LIST_TYPE), valueBuffer);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
- }
-
- public <T extends ForwardIndexReaderContext> void evaluateBlock(int[]
docIds, int length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, long[][] valueBuffer) {
- if (reader.isDictionaryEncoded()) {
- reader.readDictIds(docIds, length, dictIdsBuffer, context);
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
- for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(dictionary, dictIdsBuffer[i],
LONG_LIST_TYPE), valueBuffer);
- }
- } else {
- for (int i = 0; i < length; i++) {
- processList(i, extractFromString(dictionary, dictIdsBuffer[i],
LONG_LIST_TYPE), valueBuffer);
- }
- }
- } else {
- switch (reader.getStoredType()) {
- case STRING:
- for (int i = 0; i < length; i++) {
- processList(i, extractFromString(reader, context, docIds[i],
LONG_LIST_TYPE), valueBuffer);
- }
- break;
- case BYTES:
- for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(reader, context, docIds[i],
LONG_LIST_TYPE), valueBuffer);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
- }
-
- public <T extends ForwardIndexReaderContext> void evaluateBlock(int[]
docIds, int length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, float[][] valueBuffer) {
- if (reader.isDictionaryEncoded()) {
- reader.readDictIds(docIds, length, dictIdsBuffer, context);
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
- for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(dictionary, dictIdsBuffer[i],
FLOAT_LIST_TYPE), valueBuffer);
- }
- } else {
- for (int i = 0; i < length; i++) {
- processList(i, extractFromString(dictionary, dictIdsBuffer[i],
FLOAT_LIST_TYPE), valueBuffer);
- }
- }
- } else {
- switch (reader.getStoredType()) {
- case STRING:
- for (int i = 0; i < length; i++) {
- processList(i, extractFromString(reader, context, docIds[i],
FLOAT_LIST_TYPE), valueBuffer);
- }
- break;
- case BYTES:
- for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(reader, context, docIds[i],
FLOAT_LIST_TYPE), valueBuffer);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
- }
-
- public <T extends ForwardIndexReaderContext> void evaluateBlock(int[]
docIds, int length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, double[][] valueBuffer) {
- if (reader.isDictionaryEncoded()) {
- reader.readDictIds(docIds, length, dictIdsBuffer, context);
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
- for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(dictionary, dictIdsBuffer[i],
DOUBLE_LIST_TYPE), valueBuffer);
- }
- } else {
- for (int i = 0; i < length; i++) {
- processList(i, extractFromString(dictionary, dictIdsBuffer[i],
DOUBLE_LIST_TYPE), valueBuffer);
- }
- }
- } else {
- switch (reader.getStoredType()) {
- case STRING:
- for (int i = 0; i < length; i++) {
- processList(i, extractFromString(reader, context, docIds[i],
DOUBLE_LIST_TYPE), valueBuffer);
- }
- break;
- case BYTES:
- for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(reader, context, docIds[i],
DOUBLE_LIST_TYPE), valueBuffer);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
- }
-
- public <T extends ForwardIndexReaderContext> void evaluateBlock(int[]
docIds, int length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, String[][] valueBuffer) {
- if (reader.isDictionaryEncoded()) {
- reader.readDictIds(docIds, length, dictIdsBuffer, context);
- if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
- for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(dictionary, dictIdsBuffer[i],
STRING_LIST_TYPE), valueBuffer);
- }
- } else {
- for (int i = 0; i < length; i++) {
- processList(i, extractFromString(dictionary, dictIdsBuffer[i],
STRING_LIST_TYPE), valueBuffer);
- }
- }
- } else {
- switch (reader.getStoredType()) {
- case STRING:
- for (int i = 0; i < length; i++) {
- processList(i, extractFromString(reader, context, docIds[i],
STRING_LIST_TYPE), valueBuffer);
- }
- break;
- case BYTES:
- for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(reader, context, docIds[i],
STRING_LIST_TYPE), valueBuffer);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- }
- }
-
- @Nullable
- private <T> T extractFromBytes(Dictionary dictionary, int dictId) {
- try {
- return
JSON_PARSER_CONTEXT.parseUtf8(dictionary.getBytesValue(dictId)).read(_jsonPath);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Nullable
- private <T> T extractFromBytes(Dictionary dictionary, int dictId, TypeRef<T>
ref) {
- try {
- return
JSON_PARSER_CONTEXT.parseUtf8(dictionary.getBytesValue(dictId)).read(_jsonPath,
ref);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Nullable
- private <T, R extends ForwardIndexReaderContext> T
extractFromBytes(ForwardIndexReader<R> reader, R context,
- int docId) {
- try {
- return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId,
context)).read(_jsonPath);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Nullable
- private <T, R extends ForwardIndexReaderContext> T
extractFromBytes(ForwardIndexReader<R> reader, R context,
- int docId, TypeRef<T> ref) {
- try {
- return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId,
context)).read(_jsonPath, ref);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Nullable
- private <T> T extractFromBytesWithExactBigDecimal(Dictionary dictionary, int
dictId) {
- try {
- return
JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parseUtf8(dictionary.getBytesValue(dictId)).read(_jsonPath);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Nullable
- private <R extends ForwardIndexReaderContext> BigDecimal
extractFromBytesWithExactBigDecimal(
- ForwardIndexReader<R> reader, R context, int docId) {
- try {
- return
JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parseUtf8(reader.getBytes(docId,
context)).read(_jsonPath);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Nullable
- private <T> T extractFromString(Dictionary dictionary, int dictId) {
- try {
- return
JSON_PARSER_CONTEXT.parse(dictionary.getStringValue(dictId)).read(_jsonPath);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Nullable
- private <T> T extractFromString(Dictionary dictionary, int dictId,
TypeRef<T> ref) {
- try {
- return
JSON_PARSER_CONTEXT.parse(dictionary.getStringValue(dictId)).read(_jsonPath,
ref);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Nullable
- private <T, R extends ForwardIndexReaderContext> T
extractFromString(ForwardIndexReader<R> reader, R context,
- int docId) {
- try {
- return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId,
context)).read(_jsonPath);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Nullable
- private <T, R extends ForwardIndexReaderContext> T
extractFromString(ForwardIndexReader<R> reader, R context,
- int docId, TypeRef<T> ref) {
- try {
- return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId,
context)).read(_jsonPath, ref);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Nullable
- private <T> T extractFromStringWithExactBigDecimal(Dictionary dictionary,
int dictId) {
- try {
- return
JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parse(dictionary.getStringValue(dictId)).read(_jsonPath);
- } catch (Exception e) {
- return null;
- }
- }
-
- @Nullable
- private <R extends ForwardIndexReaderContext> BigDecimal
extractFromStringWithExactBigDecimal(
- ForwardIndexReader<R> reader, R context, int docId) {
- try {
- return
JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parseUtf8(reader.getBytes(docId,
context)).read(_jsonPath);
- } catch (Exception e) {
- return null;
- }
- }
-
- private void processValue(int index, Object value, int defaultValue, int[]
valueBuffer) {
- if (value instanceof Number) {
- valueBuffer[index] = ((Number) value).intValue();
- } else if (value == null) {
- if (_defaultValue != null) {
- valueBuffer[index] = defaultValue;
- } else {
- throwPathNotFoundException();
- }
- } else {
- valueBuffer[index] = Integer.parseInt(value.toString());
- }
- }
-
- private void processValue(int index, Object value, long defaultValue, long[]
valueBuffer) {
- if (value instanceof Number) {
- valueBuffer[index] = ((Number) value).longValue();
- } else if (value == null) {
- if (_defaultValue != null) {
- valueBuffer[index] = defaultValue;
- } else {
- throwPathNotFoundException();
- }
- } else {
- // Handle scientific notation
- valueBuffer[index] = (long) Double.parseDouble(value.toString());
- }
- }
-
- private void processValue(int index, Object value, float defaultValue,
float[] valueBuffer) {
- if (value instanceof Number) {
- valueBuffer[index] = ((Number) value).floatValue();
- } else if (value == null) {
- if (_defaultValue != null) {
- valueBuffer[index] = defaultValue;
- } else {
- throwPathNotFoundException();
- }
- } else {
- valueBuffer[index] = Float.parseFloat(value.toString());
- }
- }
-
- private void processValue(int index, Object value, double defaultValue,
double[] valueBuffer) {
- if (value instanceof Number) {
- valueBuffer[index] = ((Number) value).doubleValue();
- } else if (value == null) {
- if (_defaultValue != null) {
- valueBuffer[index] = defaultValue;
- } else {
- throwPathNotFoundException();
- }
- } else {
- valueBuffer[index] = Double.parseDouble(value.toString());
- }
- }
-
- private void processValue(int index, Object value, BigDecimal defaultValue,
BigDecimal[] valueBuffer) {
- if (value instanceof BigDecimal) {
- valueBuffer[index] = (BigDecimal) value;
- } else if (value == null) {
- if (_defaultValue != null) {
- valueBuffer[index] = defaultValue;
- } else {
- throwPathNotFoundException();
- }
- } else {
- valueBuffer[index] = new BigDecimal(value.toString());
- }
- }
-
- private void processValue(int index, Object value, String[] valueBuffer) {
- if (value instanceof String) {
- valueBuffer[index] = (String) value;
- } else if (value == null) {
- if (_defaultValue != null) {
- valueBuffer[index] = _defaultValue.toString();
- } else {
- throwPathNotFoundException();
- }
- } else {
- valueBuffer[index] = JsonUtils.objectToJsonNode(value).toString();
- }
- }
-
- private void processList(int index, List<Integer> value, int[][]
valuesBuffer) {
- if (value == null) {
- valuesBuffer[index] = EMPTY_INTS;
- } else {
- int numValues = value.size();
- int[] values = new int[numValues];
- for (int j = 0; j < numValues; j++) {
- values[j] = value.get(j);
- }
- valuesBuffer[index] = values;
- }
- }
-
- private void processList(int index, List<Long> value, long[][] valuesBuffer)
{
- if (value == null) {
- valuesBuffer[index] = EMPTY_LONGS;
- } else {
- int numValues = value.size();
- long[] values = new long[numValues];
- for (int j = 0; j < numValues; j++) {
- values[j] = value.get(j);
- }
- valuesBuffer[index] = values;
- }
- }
-
- private void processList(int index, List<Float> value, float[][]
valuesBuffer) {
- if (value == null) {
- valuesBuffer[index] = EMPTY_FLOATS;
- } else {
- int numValues = value.size();
- float[] values = new float[numValues];
- for (int j = 0; j < numValues; j++) {
- values[j] = value.get(j);
- }
- valuesBuffer[index] = values;
- }
- }
-
- private void processList(int index, List<Double> value, double[][]
valuesBuffer) {
- if (value == null) {
- valuesBuffer[index] = EMPTY_DOUBLES;
- } else {
- int numValues = value.size();
- double[] values = new double[numValues];
- for (int j = 0; j < numValues; j++) {
- values[j] = value.get(j);
- }
- valuesBuffer[index] = values;
- }
- }
-
- private void processList(int index, List<String> value, String[][]
valuesBuffer) {
- if (value == null) {
- valuesBuffer[index] = EMPTY_STRINGS;
- } else {
- int numValues = value.size();
- String[] values = new String[numValues];
- for (int j = 0; j < numValues; j++) {
- values[j] = value.get(j);
- }
- valuesBuffer[index] = values;
- }
- }
-
- private void throwPathNotFoundException() {
- throw new IllegalArgumentException("Illegal Json Path: " +
_jsonPath.getPath() + " does not match document");
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
index efef8693d0..b46d275ded 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
@@ -18,14 +18,12 @@
*/
package org.apache.pinot.core.operator.blocks;
-import java.math.BigDecimal;
import java.util.Map;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.DataBlockCache;
import org.apache.pinot.core.operator.docvalsets.ProjectionBlockValSet;
import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
/**
@@ -61,125 +59,4 @@ public class ProjectionBlock implements ValueBlock {
public BlockValSet getBlockValueSet(String column) {
return new ProjectionBlockValSet(_dataBlockCache, column,
_dataSourceMap.get(column));
}
-
- /**
- * Pushes a {@see TransformEvaluator} which will produce an int value down
- * to be evaluated against the column. This is an unstable API.
- * @param column column to evaluate against
- * @param evaluator the evaluator which produces values from the storage in
the column
- * @param buffer the buffer to write outputs into
- */
- public void fillValues(String column, TransformEvaluator evaluator, int[]
buffer) {
- _dataBlockCache.fillValues(column, evaluator, buffer);
- }
-
- /**
- * Pushes a {@see TransformEvaluator} which will produce a long value down
- * to be evaluated against the column. This is an unstable API.
- * @param column column to evaluate against
- * @param evaluator the evaluator which produces values from the storage in
the column
- * @param buffer the buffer to write outputs into
- */
- public void fillValues(String column, TransformEvaluator evaluator, long[]
buffer) {
- _dataBlockCache.fillValues(column, evaluator, buffer);
- }
-
- /**
- * Pushes a {@see TransformEvaluator} which will produce a float value down
- * to be evaluated against the column. This is an unstable API.
- * @param column column to evaluate against
- * @param evaluator the evaluator which produces values from the storage in
the column
- * @param buffer the buffer to write outputs into
- */
- public void fillValues(String column, TransformEvaluator evaluator, float[]
buffer) {
- _dataBlockCache.fillValues(column, evaluator, buffer);
- }
-
- /**
- * Pushes a {@see TransformEvaluator} which will produce a double value down
- * to be evaluated against the column. This is an unstable API.
- * @param column column to evaluate against
- * @param evaluator the evaluator which produces values from the storage in
the column
- * @param buffer the buffer to write outputs into
- */
- public void fillValues(String column, TransformEvaluator evaluator, double[]
buffer) {
- _dataBlockCache.fillValues(column, evaluator, buffer);
- }
-
- /**
- * Pushes a {@see TransformEvaluator} which will produce a BigDecimal value
down
- * to be evaluated against the column. This is an unstable API.
- * @param column column to evaluate against
- * @param evaluator the evaluator which produces values from the storage in
the column
- * @param buffer the buffer to write outputs into
- */
- public void fillValues(String column, TransformEvaluator evaluator,
BigDecimal[] buffer) {
- _dataBlockCache.fillValues(column, evaluator, buffer);
- }
-
- /**
- * Pushes a {@see TransformEvaluator} which will produce a String value down
- * to be evaluated against the column. This is an unstable API.
- * @param column column to evaluate against
- * @param evaluator the evaluator which produces values from the storage in
the column
- * @param buffer the buffer to write outputs into
- */
- public void fillValues(String column, TransformEvaluator evaluator, String[]
buffer) {
- _dataBlockCache.fillValues(column, evaluator, buffer);
- }
-
- /**
- * Pushes a {@see TransformEvaluator} which will produce an int[] array
value down
- * to be evaluated against the column. This is an unstable API.
- * @param column column to evaluate against
- * @param evaluator the evaluator which produces values from the storage in
the column
- * @param buffer the buffer to write outputs into
- */
- public void fillValues(String column, TransformEvaluator evaluator, int[][]
buffer) {
- _dataBlockCache.fillValues(column, evaluator, buffer);
- }
-
- /**
- * Pushes a {@see TransformEvaluator} which will produce a long[] value down
- * to be evaluated against the column. This is an unstable API.
- * @param column column to evaluate against
- * @param evaluator the evaluator which produces values from the storage in
the column
- * @param buffer the buffer to write outputs into
- */
- public void fillValues(String column, TransformEvaluator evaluator, long[][]
buffer) {
- _dataBlockCache.fillValues(column, evaluator, buffer);
- }
-
- /**
- * Pushes a {@see TransformEvaluator} which will produce a float[] value down
- * to be evaluated against the column. This is an unstable API.
- * @param column column to evaluate against
- * @param evaluator the evaluator which produces values from the storage in
the column
- * @param buffer the buffer to write outputs into
- */
- public void fillValues(String column, TransformEvaluator evaluator,
float[][] buffer) {
- _dataBlockCache.fillValues(column, evaluator, buffer);
- }
-
- /**
- * Pushes a {@see TransformEvaluator} which will produce a double[] value
down
- * to be evaluated against the column. This is an unstable API.
- * @param column column to evaluate against
- * @param evaluator the evaluator which produces values from the storage in
the column
- * @param buffer the buffer to write outputs into
- */
- public void fillValues(String column, TransformEvaluator evaluator,
double[][] buffer) {
- _dataBlockCache.fillValues(column, evaluator, buffer);
- }
-
- /**
- * Pushes a {@see TransformEvaluator} which will produce a String[] value
down
- * to be evaluated against the column. This is an unstable API.
- * @param column column to evaluate against
- * @param evaluator the evaluator which produces values from the storage in
the column
- * @param buffer the buffer to write outputs into
- */
- public void fillValues(String column, TransformEvaluator evaluator,
String[][] buffer) {
- _dataBlockCache.fillValues(column, evaluator, buffer);
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java
index 1d0cd84fb5..1773af53cd 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java
@@ -23,10 +23,8 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.core.operator.ColumnContext;
-import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
-import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.roaringbitmap.RoaringBitmap;
@@ -35,7 +33,7 @@ import org.roaringbitmap.RoaringBitmap;
* The <code>IdentifierTransformFunction</code> class is a special transform
function which is a wrapper on top of an
* IDENTIFIER (column), and directly return the column value without any
transformation.
*/
-public class IdentifierTransformFunction implements TransformFunction,
PushDownTransformFunction {
+public class IdentifierTransformFunction implements TransformFunction {
private final String _columnName;
private final Dictionary _dictionary;
private final TransformResultMetadata _resultMetadata;
@@ -147,67 +145,6 @@ public class IdentifierTransformFunction implements
TransformFunction, PushDownT
return valueBlock.getBlockValueSet(_columnName).getBytesValuesMV();
}
- @Override
- public void transformToIntValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, int[] buffer) {
- projectionBlock.fillValues(_columnName, evaluator, buffer);
- }
-
- @Override
- public void transformToLongValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, long[] buffer) {
- projectionBlock.fillValues(_columnName, evaluator, buffer);
- }
-
- @Override
- public void transformToFloatValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, float[] buffer) {
- projectionBlock.fillValues(_columnName, evaluator, buffer);
- }
-
- @Override
- public void transformToDoubleValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator,
- double[] buffer) {
- projectionBlock.fillValues(_columnName, evaluator, buffer);
- }
-
- @Override
- public void transformToBigDecimalValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator,
- BigDecimal[] buffer) {
- projectionBlock.fillValues(_columnName, evaluator, buffer);
- }
-
- @Override
- public void transformToStringValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator,
- String[] buffer) {
- projectionBlock.fillValues(_columnName, evaluator, buffer);
- }
-
- @Override
- public void transformToIntValuesMV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, int[][] buffer) {
- projectionBlock.fillValues(_columnName, evaluator, buffer);
- }
-
- @Override
- public void transformToLongValuesMV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, long[][] buffer) {
- projectionBlock.fillValues(_columnName, evaluator, buffer);
- }
-
- @Override
- public void transformToFloatValuesMV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator,
- float[][] buffer) {
- projectionBlock.fillValues(_columnName, evaluator, buffer);
- }
-
- @Override
- public void transformToDoubleValuesMV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator,
- double[][] buffer) {
- projectionBlock.fillValues(_columnName, evaluator, buffer);
- }
-
- @Override
- public void transformToStringValuesMV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator,
- String[][] buffer) {
- projectionBlock.fillValues(_columnName, evaluator, buffer);
- }
-
@Override
public RoaringBitmap getNullBitmap(ValueBlock valueBlock) {
return valueBlock.getBlockValueSet(_columnName).getNullBitmap();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
index 4970f3738f..35249e475b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
@@ -29,13 +29,11 @@ import
com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
+import java.util.function.IntFunction;
import org.apache.pinot.common.function.JsonPathCache;
import org.apache.pinot.core.operator.ColumnContext;
-import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
-import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluator;
-import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluators;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -70,10 +68,8 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
.mappingProvider(new
JacksonMappingProvider()).options(Option.SUPPRESS_EXCEPTIONS).build());
private TransformFunction _jsonFieldTransformFunction;
- private String _jsonPathString;
private JsonPath _jsonPath;
private Object _defaultValue;
- private JsonPathEvaluator _jsonPathEvaluator;
private TransformResultMetadata _resultMetadata;
@Override
@@ -98,7 +94,8 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
+ "function");
}
_jsonFieldTransformFunction = firstArgument;
- _jsonPathString = ((LiteralTransformFunction)
arguments.get(1)).getStringLiteral();
+ String jsonPathString = ((LiteralTransformFunction)
arguments.get(1)).getStringLiteral();
+ _jsonPath = JsonPathCache.INSTANCE.getOrCompute(jsonPathString);
String resultsType = ((LiteralTransformFunction)
arguments.get(2)).getStringLiteral().toUpperCase();
boolean isSingleValue = !resultsType.endsWith("_ARRAY");
DataType dataType;
@@ -114,11 +111,6 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
_defaultValue = dataType.convert(((LiteralTransformFunction)
arguments.get(3)).getStringLiteral());
}
_resultMetadata = new TransformResultMetadata(dataType, isSingleValue,
false);
- try {
- _jsonPathEvaluator = JsonPathEvaluators.create(_jsonPathString,
_defaultValue);
- } catch (Exception e) {
- throw new IllegalArgumentException("Invalid json path: " +
_jsonPathString, e);
- }
}
@Override
@@ -129,33 +121,29 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
@Override
public int[] transformToIntValuesSV(ValueBlock valueBlock) {
initIntValuesSV(valueBlock.getNumDocs());
- if (_jsonFieldTransformFunction instanceof PushDownTransformFunction &&
valueBlock instanceof ProjectionBlock) {
- ((PushDownTransformFunction)
_jsonFieldTransformFunction).transformToIntValuesSV((ProjectionBlock)
valueBlock,
- _jsonPathEvaluator, _intValuesSV);
- return _intValuesSV;
+ IntFunction<Object> resultExtractor = getResultExtractor(valueBlock);
+ int defaultValue = 0;
+ if (_defaultValue != null) {
+ if (_defaultValue instanceof Number) {
+ defaultValue = ((Number) _defaultValue).intValue();
+ } else {
+ defaultValue = Integer.parseInt(_defaultValue.toString());
+ }
}
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- return transformTransformedValuesToIntValuesSV(valueBlock);
- }
-
- private int[] transformTransformedValuesToIntValuesSV(ValueBlock valueBlock)
{
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- ensureJsonPathCompiled();
- String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
int numDocs = valueBlock.getNumDocs();
for (int i = 0; i < numDocs; i++) {
Object result = null;
try {
- result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ result = resultExtractor.apply(i);
} catch (Exception ignored) {
}
if (result == null) {
if (_defaultValue != null) {
- _intValuesSV[i] = (int) _defaultValue;
+ _intValuesSV[i] = defaultValue;
continue;
}
- throw new RuntimeException(
- String.format("Illegal Json Path: [%s], when reading [%s]",
_jsonPathString, jsonStrings[i]));
+ throw new IllegalArgumentException(
+ "Cannot resolve JSON path on some records. Consider setting a
default value.");
}
if (result instanceof Number) {
_intValuesSV[i] = ((Number) result).intValue();
@@ -169,32 +157,29 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
@Override
public long[] transformToLongValuesSV(ValueBlock valueBlock) {
initLongValuesSV(valueBlock.getNumDocs());
- if (_jsonFieldTransformFunction instanceof PushDownTransformFunction &&
valueBlock instanceof ProjectionBlock) {
- ((PushDownTransformFunction)
_jsonFieldTransformFunction).transformToLongValuesSV((ProjectionBlock)
valueBlock,
- _jsonPathEvaluator, _longValuesSV);
- return _longValuesSV;
+ IntFunction<Object> resultExtractor = getResultExtractor(valueBlock);
+ long defaultValue = 0;
+ if (_defaultValue != null) {
+ if (_defaultValue instanceof Number) {
+ defaultValue = ((Number) _defaultValue).longValue();
+ } else {
+ defaultValue = Long.parseLong(_defaultValue.toString());
+ }
}
- return transformTransformedValuesToLongValuesSV(valueBlock);
- }
-
- private long[] transformTransformedValuesToLongValuesSV(ValueBlock
valueBlock) {
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- ensureJsonPathCompiled();
- String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
int numDocs = valueBlock.getNumDocs();
for (int i = 0; i < numDocs; i++) {
Object result = null;
try {
- result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ result = resultExtractor.apply(i);
} catch (Exception ignored) {
}
if (result == null) {
if (_defaultValue != null) {
- _longValuesSV[i] = (long) _defaultValue;
+ _longValuesSV[i] = defaultValue;
continue;
}
- throw new RuntimeException(
- String.format("Illegal Json Path: [%s], when reading [%s]",
_jsonPathString, jsonStrings[i]));
+ throw new IllegalArgumentException(
+ "Cannot resolve JSON path on some records. Consider setting a
default value.");
}
if (result instanceof Number) {
_longValuesSV[i] = ((Number) result).longValue();
@@ -209,32 +194,29 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
@Override
public float[] transformToFloatValuesSV(ValueBlock valueBlock) {
initFloatValuesSV(valueBlock.getNumDocs());
- if (_jsonFieldTransformFunction instanceof PushDownTransformFunction &&
valueBlock instanceof ProjectionBlock) {
- ((PushDownTransformFunction)
_jsonFieldTransformFunction).transformToFloatValuesSV((ProjectionBlock)
valueBlock,
- _jsonPathEvaluator, _floatValuesSV);
- return _floatValuesSV;
+ IntFunction<Object> resultExtractor = getResultExtractor(valueBlock);
+ float defaultValue = 0;
+ if (_defaultValue != null) {
+ if (_defaultValue instanceof Number) {
+ defaultValue = ((Number) _defaultValue).floatValue();
+ } else {
+ defaultValue = Float.parseFloat(_defaultValue.toString());
+ }
}
- return transformTransformedValuesToFloatValuesSV(valueBlock);
- }
-
- private float[] transformTransformedValuesToFloatValuesSV(ValueBlock
valueBlock) {
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- ensureJsonPathCompiled();
- String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
int numDocs = valueBlock.getNumDocs();
for (int i = 0; i < numDocs; i++) {
Object result = null;
try {
- result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ result = resultExtractor.apply(i);
} catch (Exception ignored) {
}
if (result == null) {
if (_defaultValue != null) {
- _floatValuesSV[i] = (float) _defaultValue;
+ _floatValuesSV[i] = defaultValue;
continue;
}
- throw new RuntimeException(
- String.format("Illegal Json Path: [%s], when reading [%s]",
_jsonPathString, jsonStrings[i]));
+ throw new IllegalArgumentException(
+ "Cannot resolve JSON path on some records. Consider setting a
default value.");
}
if (result instanceof Number) {
_floatValuesSV[i] = ((Number) result).floatValue();
@@ -248,32 +230,29 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
@Override
public double[] transformToDoubleValuesSV(ValueBlock valueBlock) {
initDoubleValuesSV(valueBlock.getNumDocs());
- if (_jsonFieldTransformFunction instanceof PushDownTransformFunction &&
valueBlock instanceof ProjectionBlock) {
- ((PushDownTransformFunction)
_jsonFieldTransformFunction).transformToDoubleValuesSV((ProjectionBlock)
valueBlock,
- _jsonPathEvaluator, _doubleValuesSV);
- return _doubleValuesSV;
+ IntFunction<Object> resultExtractor = getResultExtractor(valueBlock);
+ double defaultValue = 0;
+ if (_defaultValue != null) {
+ if (_defaultValue instanceof Number) {
+ defaultValue = ((Number) _defaultValue).doubleValue();
+ } else {
+ defaultValue = Double.parseDouble(_defaultValue.toString());
+ }
}
- return transformTransformedValuesToDoubleValuesSV(valueBlock);
- }
-
- private double[] transformTransformedValuesToDoubleValuesSV(ValueBlock
valueBlock) {
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- ensureJsonPathCompiled();
- String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
int numDocs = valueBlock.getNumDocs();
for (int i = 0; i < numDocs; i++) {
Object result = null;
try {
- result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ result = resultExtractor.apply(i);
} catch (Exception ignored) {
}
if (result == null) {
if (_defaultValue != null) {
- _doubleValuesSV[i] = (double) _defaultValue;
+ _doubleValuesSV[i] = defaultValue;
continue;
}
- throw new RuntimeException(
- String.format("Illegal Json Path: [%s], when reading [%s]",
_jsonPathString, jsonStrings[i]));
+ throw new IllegalArgumentException(
+ "Cannot resolve JSON path on some records. Consider setting a
default value.");
}
if (result instanceof Number) {
_doubleValuesSV[i] = ((Number) result).doubleValue();
@@ -287,34 +266,31 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
@Override
public BigDecimal[] transformToBigDecimalValuesSV(ValueBlock valueBlock) {
initBigDecimalValuesSV(valueBlock.getNumDocs());
- if (_jsonFieldTransformFunction instanceof PushDownTransformFunction &&
valueBlock instanceof ProjectionBlock) {
- ((PushDownTransformFunction)
_jsonFieldTransformFunction).transformToBigDecimalValuesSV(
- (ProjectionBlock) valueBlock, _jsonPathEvaluator,
_bigDecimalValuesSV);
- return _bigDecimalValuesSV;
+ IntFunction<Object> resultExtractor = getResultExtractor(valueBlock,
JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL);
+ BigDecimal defaultValue = null;
+ if (_defaultValue != null) {
+ if (_defaultValue instanceof BigDecimal) {
+ defaultValue = (BigDecimal) _defaultValue;
+ } else {
+ defaultValue = new BigDecimal(_defaultValue.toString());
+ }
}
- return transformTransformedValuesToBigDecimalValuesSV(valueBlock);
- }
-
- private BigDecimal[]
transformTransformedValuesToBigDecimalValuesSV(ValueBlock valueBlock) {
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- ensureJsonPathCompiled();
- String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
int numDocs = valueBlock.getNumDocs();
for (int i = 0; i < numDocs; i++) {
Object result = null;
try {
- result =
JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parse(jsonStrings[i]).read(_jsonPath);
+ result = resultExtractor.apply(i);
} catch (Exception ignored) {
}
if (result == null) {
if (_defaultValue != null) {
- _bigDecimalValuesSV[i] = (BigDecimal) _defaultValue;
+ _bigDecimalValuesSV[i] = defaultValue;
continue;
}
- throw new RuntimeException(
- String.format("Illegal Json Path: [%s], when reading [%s]",
_jsonPathString, jsonStrings[i]));
+ throw new IllegalArgumentException(
+ "Cannot resolve JSON path on some records. Consider setting a
default value.");
}
- if (result instanceof Number) {
+ if (result instanceof BigDecimal) {
_bigDecimalValuesSV[i] = (BigDecimal) result;
} else {
_bigDecimalValuesSV[i] = new BigDecimal(result.toString());
@@ -326,32 +302,25 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
@Override
public String[] transformToStringValuesSV(ValueBlock valueBlock) {
initStringValuesSV(valueBlock.getNumDocs());
- if (_jsonFieldTransformFunction instanceof PushDownTransformFunction &&
valueBlock instanceof ProjectionBlock) {
- ((PushDownTransformFunction)
_jsonFieldTransformFunction).transformToStringValuesSV((ProjectionBlock)
valueBlock,
- _jsonPathEvaluator, _stringValuesSV);
- return _stringValuesSV;
+ IntFunction<Object> resultExtractor = getResultExtractor(valueBlock,
JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL);
+ String defaultValue = null;
+ if (_defaultValue != null) {
+ defaultValue = _defaultValue.toString();
}
- return transformTransformedValuesToStringValuesSV(valueBlock);
- }
-
- private String[] transformTransformedValuesToStringValuesSV(ValueBlock
valueBlock) {
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- ensureJsonPathCompiled();
- String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
int numDocs = valueBlock.getNumDocs();
for (int i = 0; i < numDocs; i++) {
Object result = null;
try {
- result =
JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parse(jsonStrings[i]).read(_jsonPath);
+ result = resultExtractor.apply(i);
} catch (Exception ignored) {
}
if (result == null) {
if (_defaultValue != null) {
- _stringValuesSV[i] = (String) _defaultValue;
+ _stringValuesSV[i] = defaultValue;
continue;
}
- throw new RuntimeException(
- String.format("Illegal Json Path: [%s], when reading [%s]",
_jsonPathString, jsonStrings[i]));
+ throw new IllegalArgumentException(
+ "Cannot resolve JSON path on some records. Consider setting a
default value.");
}
if (result instanceof String) {
_stringValuesSV[i] = (String) result;
@@ -365,23 +334,12 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
@Override
public int[][] transformToIntValuesMV(ValueBlock valueBlock) {
initIntValuesMV(valueBlock.getNumDocs());
- if (_jsonFieldTransformFunction instanceof PushDownTransformFunction &&
valueBlock instanceof ProjectionBlock) {
- ((PushDownTransformFunction)
_jsonFieldTransformFunction).transformToIntValuesMV((ProjectionBlock)
valueBlock,
- _jsonPathEvaluator, _intValuesMV);
- return _intValuesMV;
- }
- return transformTransformedValuesToIntValuesMV(valueBlock);
- }
-
- private int[][] transformTransformedValuesToIntValuesMV(ValueBlock
valueBlock) {
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- ensureJsonPathCompiled();
- String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
+ IntFunction<List<Integer>> resultExtractor =
getResultExtractor(valueBlock);
int numDocs = valueBlock.getNumDocs();
for (int i = 0; i < numDocs; i++) {
List<Integer> result = null;
try {
- result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ result = resultExtractor.apply(i);
} catch (Exception ignored) {
}
if (result == null) {
@@ -401,23 +359,12 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
@Override
public long[][] transformToLongValuesMV(ValueBlock valueBlock) {
initLongValuesMV(valueBlock.getNumDocs());
- if (_jsonFieldTransformFunction instanceof PushDownTransformFunction &&
valueBlock instanceof ProjectionBlock) {
- ((PushDownTransformFunction)
_jsonFieldTransformFunction).transformToLongValuesMV((ProjectionBlock)
valueBlock,
- _jsonPathEvaluator, _longValuesMV);
- return _longValuesMV;
- }
- return transformTransformedValuesToLongValuesMV(valueBlock);
- }
-
- private long[][] transformTransformedValuesToLongValuesMV(ValueBlock
valueBlock) {
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- ensureJsonPathCompiled();
- String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
+ IntFunction<List<Long>> resultExtractor = getResultExtractor(valueBlock);
int length = valueBlock.getNumDocs();
for (int i = 0; i < length; i++) {
List<Long> result = null;
try {
- result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ result = resultExtractor.apply(i);
} catch (Exception ignored) {
}
if (result == null) {
@@ -437,23 +384,12 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
@Override
public float[][] transformToFloatValuesMV(ValueBlock valueBlock) {
initFloatValuesMV(valueBlock.getNumDocs());
- if (_jsonFieldTransformFunction instanceof PushDownTransformFunction &&
valueBlock instanceof ProjectionBlock) {
- ((PushDownTransformFunction)
_jsonFieldTransformFunction).transformToFloatValuesMV((ProjectionBlock)
valueBlock,
- _jsonPathEvaluator, _floatValuesMV);
- return _floatValuesMV;
- }
- return transformTransformedValuesToFloatValuesMV(valueBlock);
- }
-
- private float[][] transformTransformedValuesToFloatValuesMV(ValueBlock
valueBlock) {
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- ensureJsonPathCompiled();
- String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
+ IntFunction<List<Float>> resultExtractor = getResultExtractor(valueBlock);
int length = valueBlock.getNumDocs();
for (int i = 0; i < length; i++) {
List<Float> result = null;
try {
- result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ result = resultExtractor.apply(i);
} catch (Exception ignored) {
}
if (result == null) {
@@ -473,23 +409,12 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
@Override
public double[][] transformToDoubleValuesMV(ValueBlock valueBlock) {
initDoubleValuesMV(valueBlock.getNumDocs());
- if (_jsonFieldTransformFunction instanceof PushDownTransformFunction &&
valueBlock instanceof ProjectionBlock) {
- ((PushDownTransformFunction)
_jsonFieldTransformFunction).transformToDoubleValuesMV((ProjectionBlock)
valueBlock,
- _jsonPathEvaluator, _doubleValuesMV);
- return _doubleValuesMV;
- }
- return transformTransformedToDoubleValuesMV(valueBlock);
- }
-
- private double[][] transformTransformedToDoubleValuesMV(ValueBlock
valueBlock) {
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- ensureJsonPathCompiled();
- String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
+ IntFunction<List<Double>> resultExtractor = getResultExtractor(valueBlock);
int length = valueBlock.getNumDocs();
for (int i = 0; i < length; i++) {
List<Double> result = null;
try {
- result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ result = resultExtractor.apply(i);
} catch (Exception ignored) {
}
if (result == null) {
@@ -509,23 +434,12 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
@Override
public String[][] transformToStringValuesMV(ValueBlock valueBlock) {
initStringValuesMV(valueBlock.getNumDocs());
- if (_jsonFieldTransformFunction instanceof PushDownTransformFunction &&
valueBlock instanceof ProjectionBlock) {
- ((PushDownTransformFunction)
_jsonFieldTransformFunction).transformToStringValuesMV((ProjectionBlock)
valueBlock,
- _jsonPathEvaluator, _stringValuesMV);
- return _stringValuesMV;
- }
- return transformTransformedValuesToStringValuesMV(valueBlock);
- }
-
- private String[][] transformTransformedValuesToStringValuesMV(ValueBlock
valueBlock) {
- // operating on the output of another transform so can't pass the
evaluation down to the storage
- ensureJsonPathCompiled();
- String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
+ IntFunction<List<String>> resultExtractor = getResultExtractor(valueBlock);
int length = valueBlock.getNumDocs();
for (int i = 0; i < length; i++) {
List<String> result = null;
try {
- result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ result = resultExtractor.apply(i);
} catch (Exception ignored) {
}
if (result == null) {
@@ -542,9 +456,17 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
return _stringValuesMV;
}
- private void ensureJsonPathCompiled() {
- if (_jsonPath == null) {
- _jsonPath = JsonPathCache.INSTANCE.getOrCompute(_jsonPathString);
+ private <T> IntFunction<T> getResultExtractor(ValueBlock valueBlock,
ParseContext parseContext) {
+ if (_jsonFieldTransformFunction.getResultMetadata().getDataType() ==
DataType.BYTES) {
+ byte[][] jsonBytes =
_jsonFieldTransformFunction.transformToBytesValuesSV(valueBlock);
+ return i -> parseContext.parseUtf8(jsonBytes[i]).read(_jsonPath);
+ } else {
+ String[] jsonStrings =
_jsonFieldTransformFunction.transformToStringValuesSV(valueBlock);
+ return i -> parseContext.parse(jsonStrings[i]).read(_jsonPath);
}
}
+
+ private <T> IntFunction<T> getResultExtractor(ValueBlock valueBlock) {
+ return getResultExtractor(valueBlock, JSON_PARSER_CONTEXT);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java
deleted file mode 100644
index 4af9cfd232..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.transform.function;
-
-import java.math.BigDecimal;
-import org.apache.pinot.core.operator.blocks.ProjectionBlock;
-import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
-
-
-public interface PushDownTransformFunction {
-
- /**
- * SINGLE-VALUED APIs
- */
-
- /**
- * Transforms the data from the given projection block to single-valued int
values.
- *
- * @param projectionBlock Projection result
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- void transformToIntValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, int[] buffer);
-
- /**
- * Transforms the data from the given projection block to single-valued long
values.
- *
- * @param projectionBlock Projection result
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- void transformToLongValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, long[] buffer);
-
- /**
- * Transforms the data from the given projection block to single-valued
float values.
- *
- * @param projectionBlock Projection result
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- void transformToFloatValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, float[] buffer);
-
- /**
- * Transforms the data from the given projection block to single-valued
double values.
- *
- * @param projectionBlock Projection result
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- void transformToDoubleValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, double[] buffer);
-
- /**
- * Transforms the data from the given projection block to single-valued
BigDecimal values.
- *
- * @param projectionBlock Projection result
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- void transformToBigDecimalValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator,
- BigDecimal[] buffer);
-
- /**
- * Transforms the data from the given projection block to single-valued
string values.
- *
- * @param projectionBlock Projection result
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- void transformToStringValuesSV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, String[] buffer);
-
- /**
- * MULTI-VALUED APIs
- */
-
- /**
- * Transforms the data from the given projection block to multi-valued int
values.
- *
- * @param projectionBlock Projection result
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- void transformToIntValuesMV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, int[][] buffer);
-
- /**
- * Transforms the data from the given projection block to multi-valued long
values.
- *
- * @param projectionBlock Projection result
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- void transformToLongValuesMV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, long[][] buffer);
-
- /**
- * Transforms the data from the given projection block to multi-valued float
values.
- *
- * @param projectionBlock Projection result
- * @param jsonPath transform
- * @param buffer values to fill
- */
- void transformToFloatValuesMV(ProjectionBlock projectionBlock,
TransformEvaluator jsonPath, float[][] buffer);
-
- /**
- * Transforms the data from the given projection block to multi-valued
double values.
- *
- * @param projectionBlock Projection result
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- void transformToDoubleValuesMV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, double[][] buffer);
-
- /**
- * Transforms the data from the given projection block to multi-valued
string values.
- *
- * @param projectionBlock Projection result
- * @param evaluator transform evaluator
- * @param buffer values to fill
- */
- void transformToStringValuesMV(ProjectionBlock projectionBlock,
TransformEvaluator evaluator, String[][] buffer);
-}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluatorTest.java
deleted file mode 100644
index c7b304667c..0000000000
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluatorTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.common.evaluators;
-
-import java.nio.charset.StandardCharsets;
-import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluator;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.testng.annotations.Test;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
-
-
-public class DefaultJsonPathEvaluatorTest {
- @Test
- public void testNonDictIntegerArray() {
- String json = "{\"values\": [1, 2, 3, 4, 5]}";
- String path = "$.values[0:3]";
- JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new
int[]{});
- ForwardIndexReader<ForwardIndexReaderContext> reader =
mock(ForwardIndexReader.class);
- when(reader.isDictionaryEncoded()).thenReturn(false);
- when(reader.getBytes(eq(0),
any())).thenReturn(json.getBytes(StandardCharsets.UTF_8));
- when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING);
- when(reader.createContext()).thenReturn(null);
-
- // Read as ints
- int[][] buffer = new int[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer);
- assertArrayEquals(buffer, new int[][]{{1, 2, 3}});
-
- // Read as longs
- long[][] longBuffer = new long[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
longBuffer);
- assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}});
-
- // Read as floats
- float[][] floatBuffer = new float[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
floatBuffer);
- assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}});
-
- // Read as doubles
- double[][] doubleBuffer = new double[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
doubleBuffer);
- assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}});
-
- // Read as strings
- String[][] stringBuffer = new String[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
stringBuffer);
- assertArrayEquals(stringBuffer, new String[][]{{"1", "2", "3"}});
- }
-
- @Test
- public void testNonDictStringArray() {
- String json = "{\"values\": [\"1\", \"2\", \"3\", \"4\", \"5\"]}";
- String path = "$.values[0:3]";
- JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new
int[]{});
- ForwardIndexReader<ForwardIndexReaderContext> reader =
mock(ForwardIndexReader.class);
- when(reader.isDictionaryEncoded()).thenReturn(false);
- when(reader.getBytes(eq(0),
any())).thenReturn(json.getBytes(StandardCharsets.UTF_8));
- when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING);
- when(reader.createContext()).thenReturn(null);
-
- // Read as ints
- int[][] buffer = new int[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer);
- assertArrayEquals(buffer, new int[][]{{1, 2, 3}});
-
- // Read as longs
- long[][] longBuffer = new long[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
longBuffer);
- assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}});
-
- // Read as floats
- float[][] floatBuffer = new float[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
floatBuffer);
- assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}});
-
- // Read as doubles
- double[][] doubleBuffer = new double[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
doubleBuffer);
- assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}});
-
- // Read as strings
- String[][] stringBuffer = new String[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
stringBuffer);
- assertArrayEquals(stringBuffer, new String[][]{{"1", "2", "3"}});
- }
-
- @Test
- public void testNonDictDoubleArray() {
- String json = "{\"values\": [1.0, 2.0, 3.0, 4.0, 5.0]}";
- String path = "$.values[0:3]";
- JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new
int[]{});
- ForwardIndexReader<ForwardIndexReaderContext> reader =
mock(ForwardIndexReader.class);
- when(reader.isDictionaryEncoded()).thenReturn(false);
- when(reader.getBytes(eq(0),
any())).thenReturn(json.getBytes(StandardCharsets.UTF_8));
- when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING);
- when(reader.createContext()).thenReturn(null);
-
- // Read as ints
- int[][] buffer = new int[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer);
- assertArrayEquals(buffer, new int[][]{{1, 2, 3}});
-
- // Read as longs
- long[][] longBuffer = new long[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
longBuffer);
- assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}});
-
- // Read as floats
- float[][] floatBuffer = new float[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
floatBuffer);
- assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}});
-
- // Read as doubles
- double[][] doubleBuffer = new double[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
doubleBuffer);
- assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}});
-
- // Read as strings
- String[][] stringBuffer = new String[1][3];
- evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null,
stringBuffer);
- assertArrayEquals(stringBuffer, new String[][]{{"1.0", "2.0", "3.0"}});
- }
-}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
index bf09de1a96..67e22a66d0 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
@@ -299,7 +299,7 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
// - PlaceholderScalarFunction registered will throw on intermediate
stage, but works on leaf stage.
// - checked "Illegal Json Path" as col1 is not actually a json
string, but the call is correctly triggered.
- new Object[]{"SELECT CAST(jsonExtractScalar(col1, 'path', 'INT') AS
INT) FROM a", "Illegal Json Path"},
+ new Object[]{"SELECT CAST(jsonExtractScalar(col1, 'path', 'INT') AS
INT) FROM a", "Cannot resolve JSON path"},
// - checked function cannot be found b/c there's no intermediate
stage impl for json_extract_scalar
new Object[]{"SELECT CAST(json_extract_scalar(a.col1, b.col2, 'INT')
AS INT)"
+ "FROM a JOIN b ON a.col1 = b.col1", "Cannot find function with
name: JSON_EXTRACT_SCALAR"},
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/TransformEvaluator.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/TransformEvaluator.java
deleted file mode 100644
index 6bf04ca7d6..0000000000
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/TransformEvaluator.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.spi.evaluator;
-
-import java.math.BigDecimal;
-import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-
-
-/**
- * This is an evolving SPI and subject to change.
- */
-public interface TransformEvaluator {
- /**
- * Evaluate the JSON path and fill the value buffer
- * @param docIds the doc ids to evaluate the JSON path for
- * @param length the number of doc ids to evaluate for
- * @param reader the ForwardIndexReader
- * @param context the reader context
- * @param dictionary the dictionary
- * @param valueBuffer the values to fill
- * @param <T> type of the reader context
- */
- <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int
length, ForwardIndexReader<T> reader,
- T context, Dictionary dictionary, int[] dictIdBuffer, int[] valueBuffer);
-
- /**
- * Evaluate the JSON path and fill the value buffer
- * @param docIds the doc ids to evaluate the JSON path for
- * @param length the number of doc ids to evaluate for
- * @param reader the ForwardIndexReader
- * @param context the reader context
- * @param valueBuffer the values to fill
- * @param <T> type of the reader context
- */
- <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int
length, ForwardIndexReader<T> reader,
- T context, Dictionary dictionary, int[] dictIdBuffer, long[]
valueBuffer);
-
- /**
- * Evaluate the JSON path and fill the value buffer
- * @param docIds the doc ids to evaluate the JSON path for
- * @param length the number of doc ids to evaluate for
- * @param reader the ForwardIndexReader
- * @param context the reader context
- * @param valueBuffer the values to fill
- * @param <T> type of the reader context
- */
- <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int
length, ForwardIndexReader<T> reader,
- T context, Dictionary dictionary, int[] dictIdBuffer, float[]
valueBuffer);
-
- /**
- * Evaluate the JSON path and fill the value buffer
- * @param docIds the doc ids to evaluate the JSON path for
- * @param length the number of doc ids to evaluate for
- * @param reader the ForwardIndexReader
- * @param context the reader context
- * @param valueBuffer the values to fill
- * @param <T> type of the reader context
- */
- <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int
length, ForwardIndexReader<T> reader,
- T context, Dictionary dictionary, int[] dictIdBuffer, double[]
valueBuffer);
-
- /**
- * Evaluate the JSON path and fill the value buffer
- * @param docIds the doc ids to evaluate the JSON path for
- * @param length the number of doc ids to evaluate for
- * @param reader the ForwardIndexReader
- * @param context the reader context
- * @param valueBuffer the values to fill
- * @param <T> type of the reader context
- */
- <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int
length, ForwardIndexReader<T> reader,
- T context, Dictionary dictionary, int[] dictIdBuffer, BigDecimal[]
valueBuffer);
-
- /**
- * Evaluate the JSON path and fill the value buffer
- * @param docIds the doc ids to evaluate the JSON path for
- * @param length the number of doc ids to evaluate for
- * @param reader the ForwardIndexReader
- * @param context the reader context
- * @param valueBuffer the values to fill
- * @param <T> type of the reader context
- */
- <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int
length, ForwardIndexReader<T> reader,
- T context, Dictionary dictionary, int[] dictIdBuffer, String[]
valueBuffer);
-
- /**
- * Evaluate the JSON path and fill the value buffer
- * @param docIds the doc ids to evaluate the JSON path for
- * @param length the number of doc ids to evaluate for
- * @param reader the ForwardIndexReader
- * @param context the reader context
- * @param dictIdsBuffer a buffer for dictionary ids if required
- * @param valueBuffer the values to fill
- * @param <T> type of the reader context
- */
- <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int
length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, int[][] valueBuffer);
-
- /**
- * Evaluate the JSON path and fill the value buffer
- * @param docIds the doc ids to evaluate the JSON path for
- * @param length the number of doc ids to evaluate for
- * @param reader the ForwardIndexReader
- * @param context the reader context
- * @param dictIdsBuffer a buffer for dictionary ids if required
- * @param valueBuffer the values to fill
- * @param <T> type of the reader context
- */
- <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int
length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, long[][] valueBuffer);
-
- /**
- * Evaluate the JSON path and fill the value buffer
- * @param docIds the doc ids to evaluate the JSON path for
- * @param length the number of doc ids to evaluate for
- * @param reader the ForwardIndexReader
- * @param context the reader context
- * @param dictIdsBuffer a buffer for dictionary ids if required
- * @param valueBuffer the values to fill
- * @param <T> type of the reader context
- */
- <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int
length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, float[][] valueBuffer);
-
- /**
- * Evaluate the JSON path and fill the value buffer
- * @param docIds the doc ids to evaluate the JSON path for
- * @param length the number of doc ids to evaluate for
- * @param reader the ForwardIndexReader
- * @param context the reader context
- * @param dictIdsBuffer a buffer for dictionary ids if required
- * @param valueBuffer the values to fill
- * @param <T> type of the reader context
- */
- <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int
length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, double[][] valueBuffer);
-
- /**
- * Evaluate the JSON path and fill the value buffer
- * @param docIds the doc ids to evaluate the JSON path for
- * @param length the number of doc ids to evaluate for
- * @param reader the ForwardIndexReader
- * @param context the reader context
- * @param dictIdsBuffer a buffer for dictionary ids if required
- * @param valueBuffer the values to fill
- * @param <T> type of the reader context
- */
- <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int
length,
- ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[]
dictIdsBuffer, String[][] valueBuffer);
-}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluator.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluator.java
deleted file mode 100644
index 034151ea88..0000000000
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluator.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.spi.evaluator.json;
-
-import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
-
-/**
- * Introduce an empty interface to allow it to be extended without
- * affecting {@see TransformEvaluator}.
- *
- * This is an evolving SPI and subject to change.
- */
-public interface JsonPathEvaluator extends TransformEvaluator {
-}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluatorProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluatorProvider.java
deleted file mode 100644
index 4de602526e..0000000000
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluatorProvider.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.spi.evaluator.json;
-
-/**
- * This is an evolving SPI and subject to change.
- */
-public interface JsonPathEvaluatorProvider {
- /**
- * Create a {@see JsonPathEvaluator}
- * @param delegate to be delegated to for evaluation
- * @param jsonPath the json path as a string
- * @param defaultValue the default value
- * @return a {@see JsonPathEvaluator}
- */
- JsonPathEvaluator create(JsonPathEvaluator delegate, String jsonPath, Object
defaultValue);
-}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluators.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluators.java
deleted file mode 100644
index 0106d3b98c..0000000000
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluators.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.spi.evaluator.json;
-
-import com.google.common.base.Preconditions;
-import java.lang.invoke.MethodHandle;
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.MethodType;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Allows registration of a custom {@see JsonPathEvaluator} which can handle
custom storage
- * functionality also present in a plugin. A default evaluator which can
handle all default
- * storage types will be provided to delegate to when standard storage types
are encountered.
- *
- * This is an evolving SPI and subject to change.
- */
-public final class JsonPathEvaluators {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(JsonPathEvaluators.class);
-
- private static final AtomicReferenceFieldUpdater<JsonPathEvaluators,
JsonPathEvaluatorProvider> UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(JsonPathEvaluators.class,
JsonPathEvaluatorProvider.class, "_provider");
- private static final JsonPathEvaluators INSTANCE = new JsonPathEvaluators();
- private static final DefaultProvider DEFAULT_PROVIDER = new
DefaultProvider();
- private volatile JsonPathEvaluatorProvider _provider;
-
- /**
- * Registration point to override how JSON paths are evaluated. This should
be used
- * when a Pinot plugin has special storage capabilities. For instance,
imagine a
- * plugin with a raw forward index which stores JSON in a binary format which
- * pinot-core is unaware of and cannot evaluate JSON paths against
(pinot-core only
- * understands true JSON documents). Whenever JSON paths are evaluated
against this
- * custom storage, different storage access operations may be required, and
the provided
- * {@see JsonPathEvaluator} can inspect the provided {@see
ForwardIndexReader} to
- * determine whether it is the custom implementation and evaluate the JSON
path against
- * the binary JSON managed by the custom reader. If it is not the custom
implementation,
- * then the evaluation should be delegated to the provided delegate.
- *
- * This prevents the interface {@see ForwardIndexReader} from needing to be
able to model
- * any plugin storage format, which creates flexibility for the kinds of
data structure
- * plugins can employ.
- *
- * @param provider provides {@see JsonPathEvaluator}
- * @return true if registration is successful, false otherwise
- */
- public static boolean registerProvider(JsonPathEvaluatorProvider provider) {
- Preconditions.checkArgument(provider != null, "");
- if (!UPDATER.compareAndSet(INSTANCE, null, provider)) {
- LOGGER.warn("failed to register {} - {} already registered", provider,
INSTANCE._provider);
- return false;
- }
- return true;
- }
-
- /**
- * pinot-core must construct {@see JsonPathEvaluator} via this method to
ensure it uses
- * the registered implementation. Using the registered implementation allows
pinot-core
- * to evaluate JSON paths against data structures it doesn't understand or
model.
- * @param jsonPath the JSON path
- * @param defaultValue the default value
- * @return a JSON path evaluator which must understand all possible storage
representations of JSON.
- */
- public static JsonPathEvaluator create(String jsonPath, Object defaultValue)
{
- // plugins compose and delegate to the default implementation.
- JsonPathEvaluator defaultEvaluator = DEFAULT_PROVIDER.create(jsonPath,
defaultValue);
- return Holder.PROVIDER.create(defaultEvaluator, jsonPath, defaultValue);
- }
-
- /**
- * Storing the registered evaluator in this holder and initialising it during
- * the class load gives the best of both worlds: plugins have until the first
- * JSON path evaluation to register an evaluator via
- * {@see JsonPathEvaluators#registerProvider}, but once this class is loaded,
- * the provider is constant and calls may be optimise aggressively by the JVM
- * in ways which are impossible with a volatile reference.
- */
- private static final class Holder {
- static final JsonPathEvaluatorProvider PROVIDER;
-
- static {
- JsonPathEvaluatorProvider provider =
JsonPathEvaluators.INSTANCE._provider;
- if (provider == null) {
- provider = DEFAULT_PROVIDER;
- if (!UPDATER.compareAndSet(INSTANCE, null, provider)) {
- provider = JsonPathEvaluators.INSTANCE._provider;
- }
- }
- PROVIDER = provider;
- }
- }
-
- private static class DefaultProvider implements JsonPathEvaluatorProvider {
-
- // default implementation uses MethodHandles to avoid pulling lots of
implementation details into the SPI layer
-
- private static final MethodHandle FACTORY;
-
- static {
- String className =
"org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator";
- MethodHandle factory = null;
- try {
- Class<?> clazz = Class.forName(className, false,
JsonPathEvaluators.class.getClassLoader());
- factory = MethodHandles.publicLookup()
- .findStatic(clazz, "create",
MethodType.methodType(JsonPathEvaluator.class, String.class, Object.class));
- } catch (Throwable implausible) {
- LOGGER.error("could not construct MethodHandle for {}", className,
- implausible);
- }
- FACTORY = factory;
- }
-
- public JsonPathEvaluator create(String jsonPath, Object defaultValue) {
- return create(null, jsonPath, defaultValue);
- }
-
- @Override
- public JsonPathEvaluator create(JsonPathEvaluator delegate, String
jsonPath, Object defaultValue) {
- try {
- return (JsonPathEvaluator) FACTORY.invokeExact(jsonPath, defaultValue);
- } catch (IllegalArgumentException e) {
- throw e;
- } catch (Throwable t) {
- throw new RuntimeException(t);
- }
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]