This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1df8ec25eea Make all built-in scalar UDF into batch processing
1df8ec25eea is described below
commit 1df8ec25eeadf757e31a30b96c2f6523958b45aa
Author: Zhihao Shen <[email protected]>
AuthorDate: Sun Nov 5 09:27:32 2023 +0800
Make all built-in scalar UDF into batch processing
---
.../commons/udf/builtin/String/UDTFConcat.java | 63 +++++++++-
.../commons/udf/builtin/String/UDTFEndsWith.java | 20 ++++
.../commons/udf/builtin/String/UDTFLower.java | 21 ++++
.../commons/udf/builtin/String/UDTFStartsWith.java | 36 +++++-
.../commons/udf/builtin/String/UDTFStrCompare.java | 40 ++++++-
.../commons/udf/builtin/String/UDTFStrLength.java | 36 +++++-
.../commons/udf/builtin/String/UDTFStrLocate.java | 37 +++++-
.../iotdb/commons/udf/builtin/String/UDTFTrim.java | 36 +++++-
.../commons/udf/builtin/String/UDTFUpper.java | 21 ++++
.../apache/iotdb/commons/udf/builtin/UDTFAbs.java | 85 ++++++++++++++
.../iotdb/commons/udf/builtin/UDTFConst.java | 127 ++++++++++++++++++++-
.../iotdb/commons/udf/builtin/UDTFConstE.java | 34 +++++-
.../iotdb/commons/udf/builtin/UDTFConstPi.java | 34 +++++-
.../iotdb/commons/udf/builtin/UDTFContains.java | 36 +++++-
.../iotdb/commons/udf/builtin/UDTFInRange.java | 121 +++++++++++++++++++-
.../iotdb/commons/udf/builtin/UDTFMatches.java | 35 +++++-
.../apache/iotdb/commons/udf/builtin/UDTFMath.java | 85 ++++++++++++++
.../iotdb/commons/udf/builtin/UDTFOnOff.java | 85 ++++++++++++++
library-udf/pom.xml | 5 +
.../apache/iotdb/library/anomaly/UDTFRange.java | 9 +-
.../apache/iotdb/library/dprofile/UDTFSample.java | 8 +-
.../iotdb/library/string/UDTFRegexMatch.java | 44 ++++++-
.../iotdb/library/string/UDTFRegexReplace.java | 46 +++++++-
.../iotdb/library/string/UDTFRegexSplit.java | 64 ++++++++++-
.../iotdb/library/string/UDTFStrReplace.java | 45 +++++++-
25 files changed, 1130 insertions(+), 43 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFConcat.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFConcat.java
index d3fda2a3a54..1ad1a7451b0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFConcat.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFConcat.java
@@ -18,15 +18,22 @@
*/
package org.apache.iotdb.commons.udf.builtin.String;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
+
/*This function Returns the concat string by input series and targets.
startsEnd: Indicates whether series behind targets. The default value is
false.*/
public class UDTFConcat implements UDTF {
@@ -52,7 +59,9 @@ public class UDTFConcat implements UDTF {
if (key.startsWith("target") && value != null)
concatTargets.append(value);
});
seriesBehind = parameters.getBooleanOrDefault("series_behind", false);
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.TEXT);
+ configurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.TEXT);
}
@Override
@@ -71,4 +80,54 @@ public class UDTFConcat implements UDTF {
? concatSeries.insert(0, concatTargets).toString()
: concatSeries.append(concatTargets).toString());
}
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ StringBuilder concatSeries = new StringBuilder();
+ for (int i = 0; i < row.size(); i++) {
+ if (row.isNull(i)) {
+ continue;
+ }
+ concatSeries.append(row.getString(i));
+ }
+
+ String res =
+ seriesBehind
+ ? concatSeries.insert(0, concatTargets).toString()
+ : concatSeries.append(concatTargets).toString();
+ return BytesUtils.valueOf(res);
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ int colCount = columns.length;
+ int rowCount = columns[0].getPositionCount();
+
+ Binary[][] inputFrame = new Binary[colCount][rowCount];
+ for (int i = 0; i < colCount; i++) {
+ inputFrame[i] = columns[i].getBinaries();
+ }
+
+ boolean[][] isNullFrame = new boolean[colCount][rowCount];
+ for (int i = 0; i < colCount; i++) {
+ isNullFrame[i] = columns[i].isNull();
+ }
+
+ for (int row = 0; row < rowCount; row++) {
+ StringBuilder concatSeries = new StringBuilder();
+ for (int col = 0; col < colCount; col++) {
+ if (isNullFrame[col][row]) {
+ continue;
+ }
+ String str =
inputFrame[col][row].getStringValue(TSFileConfig.STRING_CHARSET);
+ concatSeries.append(str);
+ }
+
+ String res =
+ seriesBehind
+ ? concatSeries.insert(0, concatTargets).toString()
+ : concatSeries.append(concatTargets).toString();
+ builder.writeBinary(BytesUtils.valueOf(res));
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFEndsWith.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFEndsWith.java
index bd6643d9338..6da4971cdd2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFEndsWith.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFEndsWith.java
@@ -18,6 +18,10 @@
*/
package org.apache.iotdb.commons.udf.builtin.String;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
@@ -60,4 +64,20 @@ public class UDTFEndsWith implements UDTF {
}
return row.getString(0).endsWith(target);
}
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].getStringValue(TSFileConfig.STRING_CHARSET);
+ builder.writeBoolean(str.endsWith(target));
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFLower.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFLower.java
index cc937e3787f..563d807aa27 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFLower.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFLower.java
@@ -18,6 +18,10 @@
*/
package org.apache.iotdb.commons.udf.builtin.String;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
@@ -58,4 +62,21 @@ public class UDTFLower implements UDTF {
}
return BytesUtils.valueOf(row.getString(0).toLowerCase());
}
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].getStringValue(TSFileConfig.STRING_CHARSET);
+ String res = str.toLowerCase();
+ builder.writeBinary(BytesUtils.valueOf(res));
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStartsWith.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStartsWith.java
index 05eb8713cca..783f3ef7aee 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStartsWith.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStartsWith.java
@@ -18,15 +18,21 @@
*/
package org.apache.iotdb.commons.udf.builtin.String;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
+
/*This function returns if input series starts with the specified prefix.*/
public class UDTFStartsWith implements UDTF {
private String target;
@@ -40,11 +46,37 @@ public class UDTFStartsWith implements UDTF {
public void beforeStart(UDFParameters parameters, UDTFConfigurations
configurations)
throws Exception {
target = parameters.getString("target");
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.BOOLEAN);
+ configurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.BOOLEAN);
}
@Override
public void transform(Row row, PointCollector collector) throws Exception {
collector.putBoolean(row.getTime(), row.getString(0).startsWith(target));
}
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ if (row.isNull(0)) {
+ return null;
+ }
+ return row.getString(0).startsWith(target);
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].getStringValue(TSFileConfig.STRING_CHARSET);
+ builder.writeBoolean(str.startsWith(target));
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrCompare.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrCompare.java
index b0d1d32df71..d07a63fb09f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrCompare.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrCompare.java
@@ -18,15 +18,21 @@
*/
package org.apache.iotdb.commons.udf.builtin.String;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
+
/*This function returns 0 if targets are the same, -1 if targtet1 is smaller
than targtet2,
and NULL if either argument is NULL. It returns 1 otherwise.*/
public class UDTFStrCompare implements UDTF {
@@ -42,7 +48,9 @@ public class UDTFStrCompare implements UDTF {
@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations
configurations)
throws Exception {
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.INT32);
+ configurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.INT32);
}
@Override
@@ -52,4 +60,32 @@ public class UDTFStrCompare implements UDTF {
}
collector.putInt(row.getTime(),
row.getString(0).compareTo(row.getString(1)));
}
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ if (row.isNull(0) || row.isNull(1)) {
+ return null;
+ }
+ return row.getString(0).compareTo(row.getString(1));
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs1 = columns[0].getBinaries();
+ Binary[] inputs2 = columns[1].getBinaries();
+ boolean[] isNulls1 = columns[0].isNull();
+ boolean[] isNulls2 = columns[1].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls1[i] || isNulls2[i]) {
+ builder.appendNull();
+ } else {
+ String str1 = inputs1[i].getStringValue(TSFileConfig.STRING_CHARSET);
+ String str2 = inputs2[i].getStringValue(TSFileConfig.STRING_CHARSET);
+
+ builder.writeInt(str1.compareTo(str2));
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrLength.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrLength.java
index 87240325e45..8f9e8c5f0b8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrLength.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrLength.java
@@ -18,15 +18,21 @@
*/
package org.apache.iotdb.commons.udf.builtin.String;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
+
/*This function returns length of string from an input series.*/
public class UDTFStrLength implements UDTF {
@@ -38,11 +44,37 @@ public class UDTFStrLength implements UDTF {
@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations
configurations)
throws Exception {
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.INT32);
+ configurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.INT32);
}
@Override
public void transform(Row row, PointCollector collector) throws Exception {
collector.putInt(row.getTime(), row.getString(0).length());
}
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ if (row.isNull(0)) {
+ return null;
+ }
+ return row.getString(0).length();
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].getStringValue(TSFileConfig.STRING_CHARSET);
+ builder.writeInt(str.length());
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrLocate.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrLocate.java
index f5dcdf892ac..a31b9bb6f1f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrLocate.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFStrLocate.java
@@ -18,15 +18,21 @@
*/
package org.apache.iotdb.commons.udf.builtin.String;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
+
/*This function returns position of target in an input series.*/
public class UDTFStrLocate implements UDTF {
private String target;
@@ -48,7 +54,9 @@ public class UDTFStrLocate implements UDTF {
throws Exception {
target = parameters.getString("target");
reverse = parameters.getBooleanOrDefault("reverse", false);
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.INT32);
+ configurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.INT32);
}
@Override
@@ -57,4 +65,29 @@ public class UDTFStrLocate implements UDTF {
row.getTime(),
reverse ? row.getString(0).lastIndexOf(target) :
row.getString(0).indexOf(target));
}
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ if (row.isNull(0)) {
+ return null;
+ }
+ return reverse ? row.getString(0).lastIndexOf(target) :
row.getString(0).indexOf(target);
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].getStringValue(TSFileConfig.STRING_CHARSET);
+ int res = reverse ? str.lastIndexOf(target) : str.indexOf(target);
+ builder.writeInt(res);
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFTrim.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFTrim.java
index fe5ff8079f7..e45bbf9e40a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFTrim.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFTrim.java
@@ -18,13 +18,18 @@
*/
package org.apache.iotdb.commons.udf.builtin.String;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
/*This function returns the string whose value is target, with all leading and
trailing space removed.*/
@@ -38,11 +43,38 @@ public class UDTFTrim implements UDTF {
@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations
configurations)
throws Exception {
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.TEXT);
+ configurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.TEXT);
}
@Override
public void transform(Row row, PointCollector collector) throws Exception {
collector.putString(row.getTime(), row.getString(0).trim());
}
+
+ @Override
+ public Object transform(Row row) throws Exception {
+ if (row.isNull(0)) {
+ return null;
+ }
+ return BytesUtils.valueOf(row.getString(0).trim());
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].getStringValue(TSFileConfig.STRING_CHARSET);
+ String res = str.trim();
+ builder.writeBinary(BytesUtils.valueOf(res));
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFUpper.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFUpper.java
index 2a0b1c09441..9d4624f4c0b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFUpper.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/String/UDTFUpper.java
@@ -18,6 +18,10 @@
*/
package org.apache.iotdb.commons.udf.builtin.String;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
@@ -58,4 +62,21 @@ public class UDTFUpper implements UDTF {
}
return BytesUtils.valueOf(row.getString(0).toUpperCase());
}
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].getStringValue(TSFileConfig.STRING_CHARSET);
+ String res = str.toUpperCase();
+ builder.writeBinary(BytesUtils.valueOf(res));
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFAbs.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFAbs.java
index 5e1a0cf96a5..9fad9e2017b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFAbs.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFAbs.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.commons.udf.builtin;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
@@ -97,8 +99,91 @@ public class UDTFAbs extends UDTFMath {
}
}
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ switch (dataType) {
+ case INT32:
+ transformInt(columns, builder);
+ return;
+ case INT64:
+ transformLong(columns, builder);
+ return;
+ case FLOAT:
+ transformFloat(columns, builder);
+ return;
+ case DOUBLE:
+ transformDouble(columns, builder);
+ return;
+ default:
+ // This will not happen.
+ throw new UDFInputSeriesDataTypeNotValidException(
+ 0,
+ UDFDataTypeTransformer.transformToUDFDataType(dataType),
+ Type.INT32,
+ Type.INT64,
+ Type.FLOAT,
+ Type.DOUBLE);
+ }
+ }
+
@Override
protected void setTransformer() {
throw new UnsupportedOperationException("UDTFAbs#setTransformer()");
}
+
+ protected void transformInt(Column[] columns, ColumnBuilder builder) {
+ int[] inputs = columns[0].getInts();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeInt(Math.abs(inputs[i]));
+ }
+ }
+ }
+
+ protected void transformLong(Column[] columns, ColumnBuilder builder) {
+ long[] inputs = columns[0].getLongs();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeLong(Math.abs(inputs[i]));
+ }
+ }
+ }
+
+ protected void transformFloat(Column[] columns, ColumnBuilder builder) {
+ float[] inputs = columns[0].getFloats();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeFloat(Math.abs(inputs[i]));
+ }
+ }
+ }
+
+ protected void transformDouble(Column[] columns, ColumnBuilder builder) {
+ double[] inputs = columns[0].getDoubles();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeDouble(Math.abs(inputs[i]));
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConst.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConst.java
index 585cd428c18..837ab9538b9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConst.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConst.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.commons.udf.builtin;
import org.apache.iotdb.commons.udf.utils.UDFBinaryTransformer;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
import org.apache.iotdb.tsfile.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BytesUtils;
@@ -30,9 +32,10 @@ import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException;
+import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
@@ -96,7 +99,7 @@ public class UDTFConst implements UDTF {
}
configurations
- .setAccessStrategy(new RowByRowAccessStrategy())
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
.setOutputDataType(UDFDataTypeTransformer.transformToUDFDataType(dataType));
}
@@ -125,4 +128,124 @@ public class UDTFConst implements UDTF {
throw new UnsupportedOperationException();
}
}
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ switch (dataType) {
+ case INT32:
+ return intValue;
+ case INT64:
+ return longValue;
+ case FLOAT:
+ return floatValue;
+ case DOUBLE:
+ return doubleValue;
+ case BOOLEAN:
+ return booleanValue;
+ case TEXT:
+ return UDFBinaryTransformer.transformToUDFBinary(binaryValue);
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ int count = columns[0].getPositionCount();
+
+ switch (dataType) {
+ case INT32:
+ for (int i = 0; i < count; i++) {
+ boolean hasWritten = false;
+ for (Column column : columns) {
+ if (!column.isNull(i)) {
+ builder.writeInt(intValue);
+ hasWritten = true;
+ break;
+ }
+ }
+ if (!hasWritten) {
+ builder.appendNull();
+ }
+ }
+ return;
+ case INT64:
+ for (int i = 0; i < count; i++) {
+ boolean hasWritten = false;
+ for (Column column : columns) {
+ if (!column.isNull(i)) {
+ builder.writeLong(longValue);
+ hasWritten = true;
+ break;
+ }
+ }
+ if (!hasWritten) {
+ builder.appendNull();
+ }
+ }
+ return;
+ case FLOAT:
+ for (int i = 0; i < count; i++) {
+ boolean hasWritten = false;
+ for (Column column : columns) {
+ if (!column.isNull(i)) {
+ builder.writeFloat(floatValue);
+ hasWritten = true;
+ break;
+ }
+ }
+ if (!hasWritten) {
+ builder.appendNull();
+ }
+ }
+ return;
+ case DOUBLE:
+ for (int i = 0; i < count; i++) {
+ boolean hasWritten = false;
+ for (Column column : columns) {
+ if (!column.isNull(i)) {
+ builder.writeDouble(doubleValue);
+ hasWritten = true;
+ break;
+ }
+ }
+ if (!hasWritten) {
+ builder.appendNull();
+ }
+ }
+ return;
+ case BOOLEAN:
+ for (int i = 0; i < count; i++) {
+ boolean hasWritten = false;
+ for (Column column : columns) {
+ if (!column.isNull(i)) {
+ builder.writeBoolean(booleanValue);
+ hasWritten = true;
+ break;
+ }
+ }
+ if (!hasWritten) {
+ builder.appendNull();
+ }
+ }
+ return;
+ case TEXT:
+ for (int i = 0; i < count; i++) {
+ boolean hasWritten = false;
+ for (Column column : columns) {
+ if (!column.isNull(i)) {
+ builder.writeBinary(binaryValue);
+ hasWritten = true;
+ break;
+ }
+ }
+ if (!hasWritten) {
+ builder.appendNull();
+ }
+ }
+ return;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConstE.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConstE.java
index b4468fe39b3..1fe18f0751e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConstE.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConstE.java
@@ -19,23 +19,53 @@
package org.apache.iotdb.commons.udf.builtin;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
+
public class UDTFConstE implements UDTF {
@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations
configurations) {
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+ configurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.DOUBLE);
}
@Override
public void transform(Row row, PointCollector collector) throws Exception {
collector.putDouble(row.getTime(), Math.E);
}
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ return Math.E;
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ int colCount = columns[0].getPositionCount();
+
+ for (int i = 0; i < colCount; i++) {
+ boolean hasWritten = false;
+ for (Column column : columns) {
+ if (!column.isNull(i)) {
+ builder.writeDouble(Math.E);
+ hasWritten = true;
+ break;
+ }
+ }
+ if (!hasWritten) {
+ builder.appendNull();
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConstPi.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConstPi.java
index 8e999343b29..b1e53c9ae51 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConstPi.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFConstPi.java
@@ -19,23 +19,53 @@
package org.apache.iotdb.commons.udf.builtin;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
+
public class UDTFConstPi implements UDTF {
@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations
configurations) {
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+ configurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.DOUBLE);
}
@Override
public void transform(Row row, PointCollector collector) throws Exception {
collector.putDouble(row.getTime(), Math.PI);
}
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ return Math.PI;
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ int colCount = columns[0].getPositionCount();
+
+ for (int i = 0; i < colCount; i++) {
+ boolean hasWritten = false;
+ for (Column column : columns) {
+ if (!column.isNull(i)) {
+ builder.writeDouble(Math.PI);
+ hasWritten = true;
+ break;
+ }
+ }
+ if (!hasWritten) {
+ builder.appendNull();
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFContains.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFContains.java
index 9463acded69..075aa0c6ec1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFContains.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFContains.java
@@ -19,16 +19,22 @@
package org.apache.iotdb.commons.udf.builtin;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
+
public class UDTFContains implements UDTF {
private String s;
@@ -44,11 +50,37 @@ public class UDTFContains implements UDTF {
@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations
configurations) {
s = parameters.getString("s");
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.BOOLEAN);
+ configurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.BOOLEAN);
}
@Override
public void transform(Row row, PointCollector collector) throws Exception {
collector.putBoolean(row.getTime(), row.getString(0).contains(s));
}
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ if (row.isNull(0)) {
+ return null;
+ }
+ return row.getString(0).contains(s);
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].getStringValue(TSFileConfig.STRING_CHARSET);
+ builder.writeBoolean(str.contains(s));
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFInRange.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFInRange.java
index 3691e464765..d5fecb061fe 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFInRange.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFInRange.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.commons.udf.builtin;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
import org.apache.iotdb.tsfile.enums.TSDataType;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
@@ -28,7 +30,7 @@ import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.exception.UDFException;
import
org.apache.iotdb.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
import org.apache.iotdb.udf.api.type.Type;
@@ -60,7 +62,9 @@ public class UDTFInRange implements UDTF {
upper = parameters.getDouble("upper");
lower = parameters.getDouble("lower");
dataType =
UDFDataTypeTransformer.transformToTsDataType(parameters.getDataType(0));
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.BOOLEAN);
+ configurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.BOOLEAN);
}
@Override
@@ -91,4 +95,117 @@ public class UDTFInRange implements UDTF {
Type.DOUBLE);
}
}
+
+ @Override
+ public Object transform(Row row) throws Exception {
+ if (row.isNull(0)) {
+ return null;
+ }
+ switch (dataType) {
+ case INT32:
+ return row.getInt(0) >= lower && upper >= row.getInt(0);
+ case INT64:
+ return row.getLong(0) >= lower && upper >= row.getLong(0);
+ case FLOAT:
+ return row.getFloat(0) >= lower && upper >= row.getFloat(0);
+ case DOUBLE:
+ return row.getDouble(0) >= lower && upper >= row.getDouble(0);
+ default:
+ // This will not happen.
+ throw new UDFInputSeriesDataTypeNotValidException(
+ 0,
+ UDFDataTypeTransformer.transformToUDFDataType(dataType),
+ Type.INT32,
+ Type.INT64,
+ Type.FLOAT,
+ Type.DOUBLE);
+ }
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ switch (dataType) {
+ case INT32:
+ transformInt(columns, builder);
+ return;
+ case INT64:
+ transformLong(columns, builder);
+ return;
+ case FLOAT:
+ transformFloat(columns, builder);
+ return;
+ case DOUBLE:
+ transformDouble(columns, builder);
+ return;
+ default:
+ // This will not happen.
+ throw new UDFInputSeriesDataTypeNotValidException(
+ 0,
+ UDFDataTypeTransformer.transformToUDFDataType(dataType),
+ Type.INT32,
+ Type.INT64,
+ Type.FLOAT,
+ Type.DOUBLE);
+ }
+ }
+
+ private void transformInt(Column[] columns, ColumnBuilder builder) {
+ int[] inputs = columns[0].getInts();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ boolean res = inputs[i] >= lower && upper >= inputs[i];
+ builder.writeBoolean(res);
+ }
+ }
+ }
+
+ private void transformLong(Column[] columns, ColumnBuilder builder) {
+ long[] inputs = columns[0].getLongs();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ boolean res = inputs[i] >= lower && upper >= inputs[i];
+ builder.writeBoolean(res);
+ }
+ }
+ }
+
+ private void transformFloat(Column[] columns, ColumnBuilder builder) {
+ float[] inputs = columns[0].getFloats();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ boolean res = inputs[i] >= lower && upper >= inputs[i];
+ builder.writeBoolean(res);
+ }
+ }
+ }
+
+ private void transformDouble(Column[] columns, ColumnBuilder builder) {
+ double[] inputs = columns[0].getDoubles();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ boolean res = inputs[i] >= lower && upper >= inputs[i];
+ builder.writeBoolean(res);
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMatches.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMatches.java
index 0b0ed53a721..eaba0e50308 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMatches.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMatches.java
@@ -19,16 +19,21 @@
package org.apache.iotdb.commons.udf.builtin;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
import java.util.regex.Pattern;
public class UDTFMatches implements UDTF {
@@ -46,11 +51,37 @@ public class UDTFMatches implements UDTF {
@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations
configurations) {
pattern = Pattern.compile(parameters.getString("regex"));
- configurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.BOOLEAN);
+ configurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.BOOLEAN);
}
@Override
public void transform(Row row, PointCollector collector) throws Exception {
collector.putBoolean(row.getTime(),
pattern.matcher(row.getString(0)).matches());
}
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ if (row.isNull(0)) {
+ return null;
+ }
+ return pattern.matcher(row.getString(0)).matches();
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].getStringValue(TSFileConfig.STRING_CHARSET);
+ builder.writeBoolean(pattern.matcher(str).matches());
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMath.java
index 5e621115654..08f073973f9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMath.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.commons.udf.builtin;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
import org.apache.iotdb.tsfile.enums.TSDataType;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
@@ -119,4 +121,87 @@ public abstract class UDTFMath implements UDTF {
Type.DOUBLE);
}
}
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ switch (dataType) {
+ case INT32:
+ transformInt(columns, builder);
+ return;
+ case INT64:
+ transformLong(columns, builder);
+ return;
+ case FLOAT:
+ transformFloat(columns, builder);
+ return;
+ case DOUBLE:
+ transformDouble(columns, builder);
+ return;
+ default:
+ // This will not happen.
+ throw new UDFInputSeriesDataTypeNotValidException(
+ 0,
+ UDFDataTypeTransformer.transformToUDFDataType(dataType),
+ Type.INT32,
+ Type.INT64,
+ Type.FLOAT,
+ Type.DOUBLE);
+ }
+ }
+
+ private void transformInt(Column[] columns, ColumnBuilder builder) {
+ int[] inputs = columns[0].getInts();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeDouble(transformer.transform(inputs[i]));
+ }
+ }
+ }
+
+ private void transformLong(Column[] columns, ColumnBuilder builder) {
+ long[] inputs = columns[0].getLongs();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeDouble(transformer.transform(inputs[i]));
+ }
+ }
+ }
+
+ private void transformFloat(Column[] columns, ColumnBuilder builder) {
+ float[] inputs = columns[0].getFloats();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeDouble(transformer.transform(inputs[i]));
+ }
+ }
+ }
+
+ private void transformDouble(Column[] columns, ColumnBuilder builder) {
+ double[] inputs = columns[0].getDoubles();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeDouble(transformer.transform(inputs[i]));
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFOnOff.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFOnOff.java
index b8bec803ac6..657caba6bec 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFOnOff.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFOnOff.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.commons.udf.builtin;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
import org.apache.iotdb.tsfile.enums.TSDataType;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
@@ -113,4 +115,87 @@ public class UDTFOnOff implements UDTF {
Type.DOUBLE);
}
}
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ switch (dataType) {
+ case INT32:
+ transformInt(columns, builder);
+ return;
+ case INT64:
+ transformLong(columns, builder);
+ return;
+ case FLOAT:
+ transformFloat(columns, builder);
+ return;
+ case DOUBLE:
+ transformDouble(columns, builder);
+ return;
+ default:
+ // This will not happen.
+ throw new UDFInputSeriesDataTypeNotValidException(
+ 0,
+ UDFDataTypeTransformer.transformToUDFDataType(dataType),
+ Type.INT32,
+ Type.INT64,
+ Type.FLOAT,
+ Type.DOUBLE);
+ }
+ }
+
+ private void transformInt(Column[] columns, ColumnBuilder builder) {
+ int[] inputs = columns[0].getInts();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeBoolean(inputs[i] >= threshold);
+ }
+ }
+ }
+
+ private void transformLong(Column[] columns, ColumnBuilder builder) {
+ long[] inputs = columns[0].getLongs();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeBoolean(inputs[i] >= threshold);
+ }
+ }
+ }
+
+ private void transformFloat(Column[] columns, ColumnBuilder builder) {
+ float[] inputs = columns[0].getFloats();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeBoolean(inputs[i] >= threshold);
+ }
+ }
+ }
+
+ private void transformDouble(Column[] columns, ColumnBuilder builder) {
+ double[] inputs = columns[0].getDoubles();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ builder.writeBoolean(inputs[i] >= threshold);
+ }
+ }
+ }
}
diff --git a/library-udf/pom.xml b/library-udf/pom.xml
index 322df6a0c6b..c6e28fad4cf 100644
--- a/library-udf/pom.xml
+++ b/library-udf/pom.xml
@@ -33,6 +33,11 @@
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>common-api</artifactId>
+ <version>1.3.1-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>udf-api</artifactId>
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/anomaly/UDTFRange.java
b/library-udf/src/main/java/org/apache/iotdb/library/anomaly/UDTFRange.java
index fefbbbf7c24..e559c82e939 100644
--- a/library-udf/src/main/java/org/apache/iotdb/library/anomaly/UDTFRange.java
+++ b/library-udf/src/main/java/org/apache/iotdb/library/anomaly/UDTFRange.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.library.anomaly;
-import org.apache.iotdb.library.util.Util;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
@@ -66,25 +65,25 @@ public class UDTFRange implements UDTF {
case INT32:
intValue = row.getInt(0);
if (intValue > upperBound || intValue < lowerBound) {
- Util.putValue(collector, dataType, timestamp, intValue);
+ collector.putInt(timestamp, intValue);
}
break;
case INT64:
longValue = row.getLong(0);
if (longValue > upperBound || longValue < lowerBound) {
- Util.putValue(collector, dataType, timestamp, longValue);
+ collector.putLong(timestamp, longValue);
}
break;
case FLOAT:
floatValue = row.getFloat(0);
if (floatValue > upperBound || floatValue < lowerBound) {
- Util.putValue(collector, dataType, timestamp, floatValue);
+ collector.putFloat(timestamp, floatValue);
}
break;
case DOUBLE:
doubleValue = row.getDouble(0);
if (doubleValue > upperBound || doubleValue < lowerBound) {
- Util.putValue(collector, dataType, timestamp, doubleValue);
+ collector.putDouble(timestamp, doubleValue);
}
break;
default:
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java
b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java
index 96dc975d086..64209bc568c 100644
---
a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java
+++
b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDTFSample.java
@@ -141,16 +141,16 @@ public class UDTFSample implements UDTF {
for (Point p : output) {
switch (dataType) {
case INT32:
- Util.putValue(collector, dataType, p.getX().longValue(),
p.getY().intValue());
+ collector.putInt(p.getX().longValue(), p.getY().intValue());
break;
case INT64:
- Util.putValue(collector, dataType, p.getX().longValue(),
p.getY().longValue());
+ collector.putLong(p.getX().longValue(), p.getY().longValue());
break;
case FLOAT:
- Util.putValue(collector, dataType, p.getX().longValue(),
p.getY().floatValue());
+ collector.putFloat(p.getX().longValue(),
p.getY().floatValue());
break;
case DOUBLE:
- Util.putValue(collector, dataType, p.getX().longValue(),
p.getY().doubleValue());
+ collector.putDouble(p.getX().longValue(),
p.getY().doubleValue());
break;
default:
throw new NoNumberException();
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexMatch.java
b/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexMatch.java
index 22e8128c566..af2a8de3dea 100644
---
a/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexMatch.java
+++
b/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexMatch.java
@@ -19,15 +19,19 @@
package org.apache.iotdb.library.string;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -56,7 +60,9 @@ public class UDTFRegexMatch implements UDTF {
throws Exception {
pattern = Pattern.compile(udfParameters.getString("regex"));
group = udfParameters.getIntOrDefault("group", 0);
- udtfConfigurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.TEXT);
+ udtfConfigurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.TEXT);
}
@Override
@@ -66,4 +72,38 @@ public class UDTFRegexMatch implements UDTF {
collector.putString(row.getTime(), matcher.group(group));
}
}
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ if (row.isNull(0)) {
+ return null;
+ }
+ Matcher matcher = pattern.matcher(row.getString(0));
+ if (matcher.find() && matcher.groupCount() >= group) {
+ return matcher.group(group);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].toString();
+ Matcher matcher = pattern.matcher(str);
+ if (matcher.find() && matcher.groupCount() >= group) {
+ builder.writeBinary(new Binary(matcher.group(group).getBytes()));
+ } else {
+ builder.appendNull();
+ }
+ }
+ }
+ }
}
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexReplace.java
b/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexReplace.java
index 6e44f5a02c5..b0c0a213f67 100644
---
a/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexReplace.java
+++
b/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexReplace.java
@@ -19,17 +19,21 @@
package org.apache.iotdb.library.string;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList;
+import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -72,13 +76,48 @@ public class UDTFRegexReplace implements UDTF {
limit = udfParameters.getIntOrDefault("limit", -1);
offset = udfParameters.getIntOrDefault("offset", 0);
reverse = udfParameters.getBooleanOrDefault("reverse", false);
- udtfConfigurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.TEXT);
+ udtfConfigurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.TEXT);
}
@Override
public void transform(Row row, PointCollector collector) throws Exception {
String origin = row.getString(0);
Matcher matcher = pattern.matcher(origin);
+ String result = getResult(origin, matcher);
+ collector.putString(row.getTime(), result);
+ }
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ if (row.isNull(0)) {
+ return null;
+ }
+ String origin = row.getString(0);
+ Matcher matcher = pattern.matcher(origin);
+
+ return getResult(origin, matcher);
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String origin = inputs[i].toString();
+ Matcher matcher = pattern.matcher(origin);
+ builder.writeBinary(new Binary(getResult(origin, matcher).getBytes()));
+ }
+ }
+ }
+
+ private String getResult(String origin, Matcher matcher) {
String result;
if (reverse) {
IntArrayList endIndexList = new IntArrayList();
@@ -123,6 +162,7 @@ public class UDTFRegexReplace implements UDTF {
.replaceAll(regex, replace))
.concat(suffix);
}
- collector.putString(row.getTime(), result);
+
+ return result;
}
}
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexSplit.java
b/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexSplit.java
index 368541dc9eb..f12feeb18ce 100644
---
a/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexSplit.java
+++
b/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFRegexSplit.java
@@ -19,15 +19,20 @@
package org.apache.iotdb.library.string;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
+
/** This function splits string from an input series according to given regex.
*/
public class UDTFRegexSplit implements UDTF {
@@ -39,7 +44,7 @@ public class UDTFRegexSplit implements UDTF {
throws Exception {
regex = udfParameters.getString("regex");
index = udfParameters.getIntOrDefault("index", -1);
- udtfConfigurations.setAccessStrategy(new RowByRowAccessStrategy());
+ udtfConfigurations.setAccessStrategy(new MappableRowByRowAccessStrategy());
if (index == -1) {
udtfConfigurations.setOutputDataType(Type.INT32);
} else {
@@ -59,6 +64,61 @@ public class UDTFRegexSplit implements UDTF {
}
}
+ @Override
+ public Object transform(Row row) throws IOException {
+ if (row.isNull(0)) {
+ return null;
+ }
+ String str = row.getString(0);
+ String[] splitResult = str.split(regex);
+
+ if (index == -1) {
+ return splitResult.length;
+ } else {
+ if (index < splitResult.length) {
+ return splitResult[index];
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+
+ if (index == -1) {
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].toString();
+
+ String[] splitResult = str.split(regex);
+ builder.writeInt(splitResult.length);
+ }
+ }
+ } else {
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].toString();
+
+ String[] splitResult = str.split(regex);
+ if (index < splitResult.length) {
+ builder.writeBinary(new Binary(splitResult[index].getBytes()));
+ } else {
+ builder.appendNull();
+ }
+ }
+ }
+ }
+ }
+
@Override
public void validate(UDFParameterValidator validator) throws Exception {
validator
diff --git
a/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFStrReplace.java
b/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFStrReplace.java
index 013ed081684..4339438ab58 100644
---
a/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFStrReplace.java
+++
b/library-udf/src/main/java/org/apache/iotdb/library/string/UDTFStrReplace.java
@@ -19,15 +19,20 @@
package org.apache.iotdb.library.string;
+import org.apache.iotdb.tsfile.access.Column;
+import org.apache.iotdb.tsfile.access.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
-import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
+import java.io.IOException;
+
/** This function does limited times of replacement of substring from an input
series. */
public class UDTFStrReplace implements UDTF {
@@ -65,12 +70,45 @@ public class UDTFStrReplace implements UDTF {
limit = udfParameters.getIntOrDefault("limit", -1);
offset = udfParameters.getIntOrDefault("offset", 0);
reverse = udfParameters.getBooleanOrDefault("reverse", false);
- udtfConfigurations.setAccessStrategy(new
RowByRowAccessStrategy()).setOutputDataType(Type.TEXT);
+ udtfConfigurations
+ .setAccessStrategy(new MappableRowByRowAccessStrategy())
+ .setOutputDataType(Type.TEXT);
}
@Override
public void transform(Row row, PointCollector collector) throws Exception {
String origin = row.getString(0);
+ String result = getResult(origin);
+ collector.putString(row.getTime(), result);
+ }
+
+ @Override
+ public Object transform(Row row) throws IOException {
+ if (row.isNull(0)) {
+ return null;
+ }
+ String str = row.getString(0);
+
+ return getResult(str);
+ }
+
+ @Override
+ public void transform(Column[] columns, ColumnBuilder builder) throws
Exception {
+ Binary[] inputs = columns[0].getBinaries();
+ boolean[] isNulls = columns[0].isNull();
+
+ int count = columns[0].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (isNulls[i]) {
+ builder.appendNull();
+ } else {
+ String str = inputs[i].toString();
+ builder.writeBinary(new Binary(getResult(str).getBytes()));
+ }
+ }
+ }
+
+ private String getResult(String origin) {
String result;
if (reverse) {
int endIndex = origin.length();
@@ -146,6 +184,7 @@ public class UDTFStrReplace implements UDTF {
.concat(origin.substring(prefix.length(),
fromIndex).replace(target, replace))
.concat(suffix);
}
- collector.putString(row.getTime(), result);
+
+ return result;
}
}