Jackie-Jiang commented on a change in pull request #7820:
URL: https://github.com/apache/pinot/pull/7820#discussion_r762322462
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
##########
@@ -559,12 +744,21 @@ void readStringValuesMV(int[] docIds, int length,
String[][] valuesBuffer) {
}
}
+ void readStringValuesMV(TransformEvaluator evaluator, int[] docIds, int
length, String[][] valuesBuffer) {
+ evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(),
_dictionary, getSVDictIdsBuffer(),
+ valuesBuffer);
+ }
+
public void readNumValuesMV(int[] docIds, int length, int[]
numValuesBuffer) {
for (int i = 0; i < length; i++) {
- numValuesBuffer[i] = _reader.getDictIdMV(docIds[i],
_reusableMVDictIds, getReaderContext());
+ numValuesBuffer[i] = _reader.getDictIdMV(docIds[i],
getSVDictIdsBuffer(), getReaderContext());
Review comment:
This should not be changed
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
##########
@@ -0,0 +1,588 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common.evaluators;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.ParseContext;
+import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
+import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.pinot.common.function.JsonPathCache;
+import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluator;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
+
+ private static final ParseContext JSON_PARSER_CONTEXT = JsonPath.using(
+ new Configuration.ConfigurationBuilder().jsonProvider(new
JacksonJsonProvider())
+ .mappingProvider(new
JacksonMappingProvider()).options(Option.SUPPRESS_EXCEPTIONS).build());
+
+ private static final int[] EMPTY_INTS = new int[0];
+ private static final long[] EMPTY_LONGS = new long[0];
+ private static final float[] EMPTY_FLOATS = new float[0];
+ private static final double[] EMPTY_DOUBLES = new double[0];
+ private static final String[] EMPTY_STRINGS = new String[0];
+
+ public static JsonPathEvaluator create(String jsonPath, Object defaultValue)
{
Review comment:
(minor) Let's annotate `defaultValue` as `nullable` so that IDE can warn
on potential null value access. Same for other methods with it as argument
##########
File path:
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluator.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.evaluator.json;
+
+import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
+
+/**
+ * Introduce an empty interface to allow it to be extended without
+ * affecting {@see TransformEvaluator}.
+ *
+ * This is an evolving SPI and subject to change.
+ */
+public interface JsonPathEvaluator extends TransformEvaluator {
Review comment:
These interfaces do not belong to the `segment-spi`, but `query-spi`
which is not available yet. For now we can put them into the `pinot-core` which
contains all the query related interfaces
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
##########
@@ -69,4 +70,114 @@ public BlockDocIdValueSet getBlockDocIdValueSet() {
public BlockMetadata getMetadata() {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Pushes a {@see TransformEvaluator} which will produce an int value down
+ * to be evaluated against the column. This is an unstable API.
+ * @param column column to evaluate against
+ * @param evaluator the evaluator which produces values from the storage in
the column
+ * @param buffer the buffer to write outputs into
+ */
+ public void pushDown(String column, TransformEvaluator evaluator, int[]
buffer) {
Review comment:
Suggest renaming it to `fillValues` which is more general, and can be
used in the future for all value reads
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
##########
@@ -94,6 +95,7 @@ public void init(List<TransformFunction> arguments,
Map<String, DataSource> data
_defaultValue = dataType.convert(((LiteralTransformFunction)
arguments.get(3)).getLiteral());
}
_resultMetadata = new TransformResultMetadata(dataType, isSingleValue,
false);
+ _jsonPathEvaluator = JsonPathEvaluators.create(_jsonPathString,
_defaultValue);
Review comment:
(optional) Consider directly passing the `JsonPath` into the evaluator
so that we don't need to compile the json path in 2 places
##########
File path:
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluators.java
##########
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.evaluator.json;
+
+import com.google.common.base.Preconditions;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Allows registration of a custom {@see JsonPathEvaluator} which can handle
custom storage
+ * functionality also present in a plugin. A default evaluator which can
handle all default
+ * storage types will be provided to delegate to when standard storage types
are encountered.
+ *
+ * This is an evolving SPI and subject to change.
+ */
+public final class JsonPathEvaluators {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JsonPathEvaluators.class);
+
+ private static final AtomicReferenceFieldUpdater<JsonPathEvaluators,
JsonPathEvaluatorProvider> UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(JsonPathEvaluators.class,
JsonPathEvaluatorProvider.class, "_provider");
+ private static final JsonPathEvaluators INSTANCE = new JsonPathEvaluators();
+ private static final DefaultProvider DEFAULT_PROVIDER = new
DefaultProvider();
+ private volatile JsonPathEvaluatorProvider _provider;
+
+ /**
+ * Registration point to override how JSON paths are evaluated. This should
be used
+ * when a Pinot plugin has special storage capabilities. For instance,
imagine a
+ * plugin with a raw forward index which stores JSON in a binary format which
+ * pinot-core is unaware of and cannot evaluate JSON paths against
(pinot-core only
+ * understands true JSON documents). Whenever JSON paths are evaluated
against this
+ * custom storage, different storage access operations may be required, and
the provided
+ * {@see JsonPathEvaluator} can inspect the provided {@see
ForwardIndexReader} to
+ * determine whether it is the custom implementation and evaluate the JSON
path against
+ * the binary JSON managed by the custom reader. If it is not the custom
implementation,
+ * then the evaluation should be delegated to the provided delegate.
+ *
+ * This prevents the interface {@see ForwardIndexReader} from needing to be
able to model
+ * any plugin storage format, which creates flexibility for the kinds of
data structure
+ * plugins can employ.
+ *
+ * @param provider provides {@see JsonPathEvaluator}
+ * @return true if registration is successful, false otherwise
+ */
+ public static boolean registerProvider(JsonPathEvaluatorProvider provider) {
+ Preconditions.checkArgument(provider != null, "");
+ if (!UPDATER.compareAndSet(INSTANCE, null, provider)) {
+ LOGGER.warn("failed to register {} - {} already registered", provider,
INSTANCE._provider);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * pinot-core must construct {@see JsonPathEvaluator} via this method to
ensure it uses
+ * the registered implementation. Using the registered implementation allows
pinot-core
+ * to evaluate JSON paths against data structures it doesn't understand or
model.
+ * @param jsonPath the JSON path
+ * @param defaultValue the default value
+ * @return a JSON path evaluator which must understand all possible storage
representations of JSON.
+ */
+ public static JsonPathEvaluator create(String jsonPath, Object defaultValue)
{
+ // plugins compose and delegate to the default implementation.
+ JsonPathEvaluator defaultEvaluator = DEFAULT_PROVIDER.create(jsonPath,
defaultValue);
+ return Holder.PROVIDER.create(defaultEvaluator, jsonPath, defaultValue);
+ }
+
+ /**
+ * Storing the registered evaluator in this holder and initialising it during
+ * the class load gives the best of both worlds: plugins have until the first
+ * JSON path evaluation to register an evaluator via
+ * {@see JsonPathEvaluators#registerProvider}, but once this class is loaded,
+ * the provider is constant and calls may be optimise aggressively by the JVM
+ * in ways which are impossible with a volatile reference.
+ */
+ private static final class Holder {
+ static final JsonPathEvaluatorProvider PROVIDER;
+
+ static {
+ JsonPathEvaluatorProvider provider =
JsonPathEvaluators.INSTANCE._provider;
+ if (provider == null) {
+ provider = DEFAULT_PROVIDER;
+ if (!UPDATER.compareAndSet(INSTANCE, null, provider)) {
+ provider = JsonPathEvaluators.INSTANCE._provider;
+ }
+ }
+ PROVIDER = provider;
+ }
+ }
+
+ private static class DefaultProvider implements JsonPathEvaluatorProvider {
+
+ // default implementation uses MethodHandles to avoid pulling lots of
implementation details into the SPI layer
+
+ private static final MethodHandle FACTORY;
+
+ static {
+ String className =
"org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator";
+ MethodHandle factory = null;
+ try {
+ Class<?> clazz = Class.forName(className, false,
JsonPathEvaluators.class.getClassLoader());
+ factory = MethodHandles.publicLookup()
+ .findStatic(clazz, "create",
MethodType.methodType(JsonPathEvaluator.class, String.class, Object.class));
+ } catch (Throwable implausible) {
+ LOGGER.error("could not construct MethodHandle for {}", className,
+ implausible);
+ }
+ FACTORY = factory;
+ }
+
+ public JsonPathEvaluator create(String jsonPath, Object defaultValue) {
+ return create(null, jsonPath, defaultValue);
+ }
+
+ @Override
+ public JsonPathEvaluator create(JsonPathEvaluator delegate, String
jsonPath, Object defaultValue) {
Review comment:
Don't fully understand the logic here. What is the delegate evaluator
used for?
##########
File path:
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluators.java
##########
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.evaluator.json;
+
+import com.google.common.base.Preconditions;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Allows registration of a custom {@see JsonPathEvaluator} which can handle
custom storage
+ * functionality also present in a plugin. A default evaluator which can
handle all default
+ * storage types will be provided to delegate to when standard storage types
are encountered.
+ *
+ * This is an evolving SPI and subject to change.
+ */
+public final class JsonPathEvaluators {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JsonPathEvaluators.class);
+
+ private static final AtomicReferenceFieldUpdater<JsonPathEvaluators,
JsonPathEvaluatorProvider> UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(JsonPathEvaluators.class,
JsonPathEvaluatorProvider.class, "_provider");
+ private static final JsonPathEvaluators INSTANCE = new JsonPathEvaluators();
+ private static final DefaultProvider DEFAULT_PROVIDER = new
DefaultProvider();
+ private volatile JsonPathEvaluatorProvider _provider;
+
+ /**
+ * Registration point to override how JSON paths are evaluated. This should
be used
+ * when a Pinot plugin has special storage capabilities. For instance,
imagine a
+ * plugin with a raw forward index which stores JSON in a binary format which
+ * pinot-core is unaware of and cannot evaluate JSON paths against
(pinot-core only
+ * understands true JSON documents). Whenever JSON paths are evaluated
against this
+ * custom storage, different storage access operations may be required, and
the provided
+ * {@see JsonPathEvaluator} can inspect the provided {@see
ForwardIndexReader} to
+ * determine whether it is the custom implementation and evaluate the JSON
path against
+ * the binary JSON managed by the custom reader. If it is not the custom
implementation,
+ * then the evaluation should be delegated to the provided delegate.
+ *
+ * This prevents the interface {@see ForwardIndexReader} from needing to be
able to model
+ * any plugin storage format, which creates flexibility for the kinds of
data structure
+ * plugins can employ.
+ *
+ * @param provider provides {@see JsonPathEvaluator}
+ * @return true if registration is successful, false otherwise
+ */
+ public static boolean registerProvider(JsonPathEvaluatorProvider provider) {
+ Preconditions.checkArgument(provider != null, "");
Review comment:
Does it work if we simply put the `provider` as a `private static` field
and set it during the instance startup? Several classes handle the plugin that
way
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
+
+
+public interface PushDownTransformFunction {
Review comment:
(optional) Should we make this extend the `TransformFunction` interface,
or do you see this independent of the regular `TransformFunction`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]