This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 96f2164fe03 Revert "Remove UDTFJexl from built-in function (#11365)"
96f2164fe03 is described below
commit 96f2164fe03c07055a42d92049f61adb1cecd3cb
Author: Liao Lanyu <[email protected]>
AuthorDate: Thu Oct 26 11:22:59 2023 +0800
Revert "Remove UDTFJexl from built-in function (#11365)"
---
.../BuiltinTimeSeriesGeneratingFunctionEnum.java | 1 +
.../fragment/FragmentInstanceContext.java | 1 -
.../schedule/DriverTaskTimeoutSentinelThread.java | 2 +-
iotdb-core/node-commons/pom.xml | 4 +
.../BuiltinTimeSeriesGeneratingFunction.java | 1 +
.../apache/iotdb/commons/udf/builtin/UDTFJexl.java | 348 +++++++++++++++++++++
6 files changed, 355 insertions(+), 2 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
index e269f1af701..fa0c5b5b148 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
@@ -72,6 +72,7 @@ public enum BuiltinTimeSeriesGeneratingFunctionEnum {
EQUAL_SIZE_BUCKET_AGG_SAMPLE("EQUAL_SIZE_BUCKET_AGG_SAMPLE"),
EQUAL_SIZE_BUCKET_M4_SAMPLE("EQUAL_SIZE_BUCKET_M4_SAMPLE"),
EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE("EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE"),
+ JEXL("JEXL"),
MASTER_REPAIR("MASTER_REPAIR"),
M4("M4");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 9969e97f2d8..9da3e51c1b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -409,7 +409,6 @@ public class FragmentInstanceContext extends QueryContext {
allDriversClosed.await();
break;
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
LOGGER.warn(
"Interrupted when await on allDriversClosed, FragmentInstance Id
is {}", this.getId());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
index 9fb4e59dda2..f7bbfab725d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -31,7 +31,7 @@ public class DriverTaskTimeoutSentinelThread extends
AbstractDriverThread {
private static final Logger LOGGER =
LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
- private static final long SLEEP_BOUND = 5 * 1000L;
+ private final long SLEEP_BOUND = 5 * 1000L;
public DriverTaskTimeoutSentinelThread(
String workerId,
diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml
index 9c807ad4d1b..5f1850291c2 100644
--- a/iotdb-core/node-commons/pom.xml
+++ b/iotdb-core/node-commons/pom.xml
@@ -141,6 +141,10 @@
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-jexl3</artifactId>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
index 25bf548c6c4..ece0e3104a6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
@@ -88,6 +88,7 @@ public enum BuiltinTimeSeriesGeneratingFunction {
EQUAL_SIZE_BUCKET_M4_SAMPLE("EQUAL_SIZE_BUCKET_M4_SAMPLE",
UDTFEqualSizeBucketM4Sample.class),
EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE(
"EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE",
UDTFEqualSizeBucketOutlierSample.class),
+ JEXL("JEXL", UDTFJexl.class),
MASTER_REPAIR("MASTER_REPAIR", UDTFMasterRepair.class),
M4("M4", UDTFM4.class);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFJexl.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFJexl.java
new file mode 100644
index 00000000000..042aa92b977
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFJexl.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import
org.apache.iotdb.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import
org.apache.iotdb.udf.api.exception.UDFOutputSeriesDataTypeNotValidException;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.commons.jexl3.JexlBuilder;
+import org.apache.commons.jexl3.JexlEngine;
+import org.apache.commons.jexl3.JexlScript;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class UDTFJexl implements UDTF {
+
+ private int inputSeriesNumber;
+ private TSDataType[] inputDataType;
+ private TSDataType outputDataType;
+ private JexlScript script;
+ private Evaluator evaluator;
+
+ @Override
+ public void validate(UDFParameterValidator validator) throws UDFException {
+ inputSeriesNumber = validator.getParameters().getChildExpressionsSize();
+ for (int i = 0; i < inputSeriesNumber; i++) {
+ validator.validateInputSeriesDataType(
+ i, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE, Type.TEXT,
Type.BOOLEAN);
+ }
+ validator.validateRequiredAttribute("expr");
+ }
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations
configurations)
+ throws UDFInputSeriesDataTypeNotValidException,
UDFOutputSeriesDataTypeNotValidException,
+ MetadataException {
+ String expr = parameters.getString("expr");
+ JexlEngine jexl = new JexlBuilder().create();
+ script = jexl.createScript(expr);
+
+ inputDataType = new TSDataType[inputSeriesNumber];
+ for (int i = 0; i < inputSeriesNumber; i++) {
+ inputDataType[i] =
UDFDataTypeTransformer.transformToTsDataType(parameters.getDataType(i));
+ }
+ outputDataType = probeOutputDataType();
+
+ if (inputSeriesNumber == 1) {
+ switch (inputDataType[0]) {
+ case INT32:
+ evaluator = new EvaluatorIntInput();
+ break;
+ case INT64:
+ evaluator = new EvaluatorLongInput();
+ break;
+ case FLOAT:
+ evaluator = new EvaluatorFloatInput();
+ break;
+ case DOUBLE:
+ evaluator = new EvaluatorDoubleInput();
+ break;
+ case TEXT:
+ evaluator = new EvaluatorStringInput();
+ break;
+ case BOOLEAN:
+ evaluator = new EvaluatorBooleanInput();
+ break;
+ default:
+ throw new UDFInputSeriesDataTypeNotValidException(
+ 0,
+ UDFDataTypeTransformer.transformToUDFDataType(inputDataType[0]),
+ Type.INT32,
+ Type.INT64,
+ Type.FLOAT,
+ Type.DOUBLE,
+ Type.TEXT,
+ Type.BOOLEAN);
+ }
+ } else {
+ evaluator = new EvaluatorMulInput();
+ }
+
+ configurations
+ .setAccessStrategy(new RowByRowAccessStrategy())
+
.setOutputDataType(UDFDataTypeTransformer.transformToUDFDataType(outputDataType));
+ }
+
+ // 23, 23L, 23f, 23d, "string", true are hard codes for probing
+ private HashMap<TSDataType, Object> initialMap() {
+ HashMap<TSDataType, Object> map = new HashMap<>();
+ map.put(TSDataType.INT32, 23);
+ map.put(TSDataType.INT64, 23L);
+ map.put(TSDataType.FLOAT, 23f);
+ map.put(TSDataType.DOUBLE, 23d);
+ map.put(TSDataType.TEXT, "string");
+ map.put(TSDataType.BOOLEAN, true);
+ return map;
+ }
+
+ private TSDataType probeOutputDataType() throws
UDFOutputSeriesDataTypeNotValidException {
+ // initial inputHardCodes to probe OutputDataType
+ HashMap<TSDataType, Object> map = initialMap();
+ Object[] inputHardCodes = new Object[inputSeriesNumber];
+ for (int i = 0; i < inputSeriesNumber; i++) {
+ inputHardCodes[i] = map.get(inputDataType[i]);
+ }
+
+ Object o = script.execute(null, inputHardCodes);
+
+ if (o instanceof Number) {
+ return TSDataType.DOUBLE;
+ } else if (o instanceof String) {
+ return TSDataType.TEXT;
+ } else if (o instanceof Boolean) {
+ return TSDataType.BOOLEAN;
+ } else {
+ throw new UDFOutputSeriesDataTypeNotValidException(0, "[Number, String,
Boolean]");
+ }
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector)
+ throws IOException, UDFOutputSeriesDataTypeNotValidException,
+ UDFInputSeriesDataTypeNotValidException {
+ switch (outputDataType) {
+ case DOUBLE:
+ evaluator.evaluateDouble(row, collector);
+ break;
+ case TEXT:
+ evaluator.evaluateText(row, collector);
+ break;
+ case BOOLEAN:
+ evaluator.evaluateBoolean(row, collector);
+ break;
+ default:
+ // This will not happen.
+ throw new UDFOutputSeriesDataTypeNotValidException(0, "[Number,
String, Boolean]");
+ }
+ }
+
+ private interface Evaluator {
+ void evaluateDouble(Row row, PointCollector collector)
+ throws IOException, UDFInputSeriesDataTypeNotValidException;
+
+ void evaluateText(Row row, PointCollector collector)
+ throws IOException, UDFInputSeriesDataTypeNotValidException;
+
+ void evaluateBoolean(Row row, PointCollector collector)
+ throws IOException, UDFInputSeriesDataTypeNotValidException;
+ }
+
+ private class EvaluatorIntInput implements Evaluator {
+ @Override
+ public void evaluateDouble(Row row, PointCollector collector) throws
IOException {
+ collector.putDouble(
+ row.getTime(), ((Number) script.execute(null,
row.getInt(0))).doubleValue());
+ }
+
+ @Override
+ public void evaluateText(Row row, PointCollector collector) throws
IOException {
+ collector.putString(row.getTime(), (String) script.execute(null,
row.getInt(0)));
+ }
+
+ @Override
+ public void evaluateBoolean(Row row, PointCollector collector) throws
IOException {
+ collector.putBoolean(row.getTime(), (Boolean) script.execute(null,
row.getInt(0)));
+ }
+ }
+
+ private class EvaluatorLongInput implements Evaluator {
+ @Override
+ public void evaluateDouble(Row row, PointCollector collector) throws
IOException {
+ collector.putDouble(
+ row.getTime(), ((Number) script.execute(null,
row.getLong(0))).doubleValue());
+ }
+
+ @Override
+ public void evaluateText(Row row, PointCollector collector) throws
IOException {
+ collector.putString(row.getTime(), (String) script.execute(null,
row.getLong(0)));
+ }
+
+ @Override
+ public void evaluateBoolean(Row row, PointCollector collector) throws
IOException {
+ collector.putBoolean(row.getTime(), (Boolean) script.execute(null,
row.getLong(0)));
+ }
+ }
+
+ private class EvaluatorFloatInput implements Evaluator {
+ @Override
+ public void evaluateDouble(Row row, PointCollector collector) throws
IOException {
+ collector.putDouble(
+ row.getTime(), ((Number) script.execute(null,
row.getFloat(0))).doubleValue());
+ }
+
+ @Override
+ public void evaluateText(Row row, PointCollector collector) throws
IOException {
+ collector.putString(row.getTime(), (String) script.execute(null,
row.getFloat(0)));
+ }
+
+ @Override
+ public void evaluateBoolean(Row row, PointCollector collector) throws
IOException {
+ collector.putBoolean(row.getTime(), (Boolean) script.execute(null,
row.getFloat(0)));
+ }
+ }
+
+ private class EvaluatorDoubleInput implements Evaluator {
+ @Override
+ public void evaluateDouble(Row row, PointCollector collector) throws
IOException {
+ collector.putDouble(
+ row.getTime(), ((Number) script.execute(null,
row.getDouble(0))).doubleValue());
+ }
+
+ @Override
+ public void evaluateText(Row row, PointCollector collector) throws
IOException {
+ collector.putString(row.getTime(), (String) script.execute(null,
row.getDouble(0)));
+ }
+
+ @Override
+ public void evaluateBoolean(Row row, PointCollector collector) throws
IOException {
+ collector.putBoolean(row.getTime(), (Boolean) script.execute(null,
row.getDouble(0)));
+ }
+ }
+
+ private class EvaluatorStringInput implements Evaluator {
+ @Override
+ public void evaluateDouble(Row row, PointCollector collector) throws
IOException {
+ collector.putDouble(
+ row.getTime(), ((Number) script.execute(null,
row.getString(0))).doubleValue());
+ }
+
+ @Override
+ public void evaluateText(Row row, PointCollector collector) throws
IOException {
+ collector.putString(row.getTime(), (String) script.execute(null,
row.getString(0)));
+ }
+
+ @Override
+ public void evaluateBoolean(Row row, PointCollector collector) throws
IOException {
+ collector.putBoolean(row.getTime(), (Boolean) script.execute(null,
row.getString(0)));
+ }
+ }
+
+ private class EvaluatorBooleanInput implements Evaluator {
+ @Override
+ public void evaluateDouble(Row row, PointCollector collector) throws
IOException {
+ collector.putDouble(
+ row.getTime(), ((Number) script.execute(null,
row.getBoolean(0))).doubleValue());
+ }
+
+ @Override
+ public void evaluateText(Row row, PointCollector collector) throws
IOException {
+ collector.putString(row.getTime(), (String) script.execute(null,
row.getBoolean(0)));
+ }
+
+ @Override
+ public void evaluateBoolean(Row row, PointCollector collector) throws
IOException {
+ collector.putBoolean(row.getTime(), (Boolean) script.execute(null,
row.getBoolean(0)));
+ }
+ }
+
+ private class EvaluatorMulInput implements Evaluator {
+
+ Object[] values = new Object[inputSeriesNumber];
+
+ @Override
+ public void evaluateDouble(Row row, PointCollector collector)
+ throws IOException, UDFInputSeriesDataTypeNotValidException {
+ getValues(row);
+ collector.putDouble(row.getTime(), ((Number) script.execute(null,
values)).doubleValue());
+ }
+
+ @Override
+ public void evaluateText(Row row, PointCollector collector)
+ throws IOException, UDFInputSeriesDataTypeNotValidException {
+ getValues(row);
+ collector.putString(row.getTime(), (String) script.execute(null,
values));
+ }
+
+ @Override
+ public void evaluateBoolean(Row row, PointCollector collector)
+ throws IOException, UDFInputSeriesDataTypeNotValidException {
+ getValues(row);
+ collector.putBoolean(row.getTime(), (Boolean) script.execute(null,
values));
+ }
+
+ public void getValues(Row row) throws IOException,
UDFInputSeriesDataTypeNotValidException {
+ for (int i = 0; i < inputSeriesNumber; i++) {
+ switch (inputDataType[i]) {
+ case INT32:
+ values[i] = row.getInt(i);
+ break;
+ case INT64:
+ values[i] = row.getLong(i);
+ break;
+ case FLOAT:
+ values[i] = row.getFloat(i);
+ break;
+ case DOUBLE:
+ values[i] = row.getDouble(i);
+ break;
+ case TEXT:
+ values[i] = row.getString(i);
+ break;
+ case BOOLEAN:
+ values[i] = row.getBoolean(i);
+ break;
+ default:
+ throw new UDFInputSeriesDataTypeNotValidException(
+ i,
+
UDFDataTypeTransformer.transformToUDFDataType(inputDataType[i]),
+ Type.INT32,
+ Type.INT64,
+ Type.FLOAT,
+ Type.DOUBLE,
+ Type.TEXT,
+ Type.BOOLEAN);
+ }
+ }
+ }
+ }
+}