This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new fa3def7e42e [To rel/1.2] [IOTDB-6096] Make M4 Function do the nullable
judgement
fa3def7e42e is described below
commit fa3def7e42e1453f583a050ac6a45ca969f941a0
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Aug 4 16:48:37 2023 +0800
[To rel/1.2] [IOTDB-6096] Make M4 Function do the nullable judgement
---
.../db/it/udf/IoTDBUDTFBuiltinFunctionIT.java | 58 ++++
...ticSerializableTVListBackedSingleColumnRow.java | 2 +-
.../LayerPointReaderBackedSingleColumnRow.java | 6 +-
.../apache/iotdb/commons/udf/builtin/UDTFM4.java | 338 +++++++++++++--------
4 files changed, 269 insertions(+), 135 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
index 3aa8e8ee23e..5c6a82efe8b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
@@ -1237,6 +1237,7 @@ public class IoTDBUDTFBuiltinFunctionIT {
// query tests
test_M4_firstWindowEmpty();
test_M4_slidingTimeWindow();
+ test_M4_slidingTimeWindow_2();
test_M4_slidingSizeWindow();
test_M4_constantTimeSeries();
}
@@ -1305,6 +1306,63 @@ public class IoTDBUDTFBuiltinFunctionIT {
}
}
+ private void test_M4_slidingTimeWindow_2() {
+ String[] res =
+ new String[] {
+ "0,null,1",
+ "1,5.0,null",
+ "10,30.0,null",
+ "20,20.0,null",
+ "24,null,1",
+ "25,8.0,1",
+ "30,40.0,null",
+ "45,30.0,null",
+ "49,null,1",
+ "50,null,1",
+ "52,8.0,null",
+ "54,18.0,null",
+ "74,null,1",
+ "75,null,1",
+ "99,null,1",
+ "120,8.0,null"
+ };
+
+ String sql =
+ String.format(
+ "select M4(s1, '%s'='%s','%s'='%s','%s'='%s','%s'='%s'), M4(s2,
'%s'='%s','%s'='%s','%s'='%s','%s'='%s') from root.m4.d1",
+ TIME_INTERVAL_KEY,
+ 25,
+ SLIDING_STEP_KEY,
+ 25,
+ DISPLAY_WINDOW_BEGIN_KEY,
+ 0,
+ DISPLAY_WINDOW_END_KEY,
+ 150,
+ TIME_INTERVAL_KEY,
+ 25,
+ SLIDING_STEP_KEY,
+ 25,
+ DISPLAY_WINDOW_BEGIN_KEY,
+ 0,
+ DISPLAY_WINDOW_END_KEY,
+ 150);
+
+ try (Connection conn = EnvFactory.getEnv().getConnection();
+ Statement statement = conn.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sql);
+ int count = 0;
+ while (resultSet.next()) {
+ String str =
+ resultSet.getString(1) + "," + resultSet.getString(2) + "," +
resultSet.getString(3);
+ Assert.assertEquals(res[count], str);
+ count++;
+ }
+ Assert.assertEquals(res.length, count);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
private void test_M4_slidingSizeWindow() {
String[] res = new String[] {"1,5.0", "30,40.0", "33,9.0", "35,10.0",
"45,30.0", "120,8.0"};
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnRow.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnRow.java
index ecebafffe21..92a257182b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnRow.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnRow.java
@@ -86,7 +86,7 @@ public class ElasticSerializableTVListBackedSingleColumnRow
implements Row {
@Override
public boolean isNull(int columnIndex) {
- return false;
+ return tvList.isNull(currentRowIndex);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/adapter/LayerPointReaderBackedSingleColumnRow.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/adapter/LayerPointReaderBackedSingleColumnRow.java
index 6dd778ebcd4..fd11b828345 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/adapter/LayerPointReaderBackedSingleColumnRow.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/adapter/LayerPointReaderBackedSingleColumnRow.java
@@ -83,7 +83,11 @@ public class LayerPointReaderBackedSingleColumnRow
implements Row {
@Override
public boolean isNull(int columnIndex) {
- return false;
+ try {
+ return layerPointReader.isCurrentNull();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java
index 134d5a26490..9aee8121b91 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.udf.api.UDTF;
-import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.access.RowWindow;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
@@ -151,168 +150,241 @@ public class UDTFM4 implements UDTF {
public void transformInt(RowWindow rowWindow, PointCollector collector)
throws IOException {
if (rowWindow.windowSize() > 0) { // else empty window do nothing
- int firstValue = rowWindow.getRow(0).getInt(0);
- int lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getInt(0);
-
- int minValue = Math.min(firstValue, lastValue);
- int maxValue = Math.max(firstValue, lastValue);
- int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
- int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
-
- for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
- int value = rowWindow.getRow(i).getInt(0);
- if (value < minValue) {
- minValue = value;
- minIndex = i;
- }
- if (value > maxValue) {
- maxValue = value;
- maxIndex = i;
+ int index = 0;
+ int size = rowWindow.windowSize();
+ int firstValueIndex = -1;
+ int firstValue = 0;
+
+ for (; index < size; index++) {
+ if (!rowWindow.getRow(index).isNull(0)) {
+ firstValueIndex = index;
+ firstValue = rowWindow.getRow(index).getInt(0);
+ break;
}
}
- Row row = rowWindow.getRow(0);
- collector.putInt(row.getTime(), row.getInt(0));
-
- int smallerIndex = Math.min(minIndex, maxIndex);
- int largerIndex = Math.max(minIndex, maxIndex);
- if (smallerIndex > 0) {
- row = rowWindow.getRow(smallerIndex);
- collector.putInt(row.getTime(), row.getInt(0));
- }
- if (largerIndex > smallerIndex) {
- row = rowWindow.getRow(largerIndex);
- collector.putInt(row.getTime(), row.getInt(0));
- }
- if (largerIndex < rowWindow.windowSize() - 1) {
- row = rowWindow.getRow(rowWindow.windowSize() - 1);
- collector.putInt(row.getTime(), row.getInt(0));
+ if (firstValueIndex != -1) { // else empty window do nothing
+ int lastValueIndex = firstValueIndex;
+ int lastValue = firstValue;
+ int minValueIndex = firstValueIndex;
+ int minValue = firstValue;
+ int maxValueIndex = firstValueIndex;
+ int maxValue = firstValue;
+
+ for (; index < size; index++) {
+ if (!rowWindow.getRow(index).isNull(0)) {
+ lastValueIndex = index;
+ lastValue = rowWindow.getRow(index).getInt(0);
+ if (lastValue < minValue) {
+ minValue = lastValue;
+ minValueIndex = index;
+ }
+ if (lastValue > maxValue) {
+ maxValue = lastValue;
+ maxValueIndex = index;
+ }
+ }
+ }
+ // first value
+ collector.putInt(rowWindow.getRow(firstValueIndex).getTime(),
firstValue);
+ // min and max value if not duplicate
+ // if min/max value is equal to first/last value, we keep first/last
value
+ int smallerIndex = Math.min(minValueIndex, maxValueIndex);
+ int largerIndex = Math.max(minValueIndex, maxValueIndex);
+ if (smallerIndex > firstValueIndex
+ && rowWindow.getRow(smallerIndex).getInt(0) != lastValue) {
+ collector.putInt(
+ rowWindow.getRow(smallerIndex).getTime(),
rowWindow.getRow(smallerIndex).getInt(0));
+ }
+ if (largerIndex > smallerIndex &&
rowWindow.getRow(largerIndex).getInt(0) != lastValue) {
+ collector.putInt(
+ rowWindow.getRow(largerIndex).getTime(),
rowWindow.getRow(largerIndex).getInt(0));
+ }
+ // last value
+ if (lastValueIndex > firstValueIndex) {
+ collector.putInt(rowWindow.getRow(lastValueIndex).getTime(),
lastValue);
+ }
}
}
}
public void transformLong(RowWindow rowWindow, PointCollector collector)
throws IOException {
if (rowWindow.windowSize() > 0) { // else empty window do nothing
- long firstValue = rowWindow.getRow(0).getLong(0);
- long lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getLong(0);
-
- long minValue = Math.min(firstValue, lastValue);
- long maxValue = Math.max(firstValue, lastValue);
- int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
- int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
-
- for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
- long value = rowWindow.getRow(i).getLong(0);
- if (value < minValue) {
- minValue = value;
- minIndex = i;
- }
- if (value > maxValue) {
- maxValue = value;
- maxIndex = i;
+ int index = 0;
+ int size = rowWindow.windowSize();
+ int firstValueIndex = -1;
+ long firstValue = 0;
+
+ for (; index < size; index++) {
+ if (!rowWindow.getRow(index).isNull(0)) {
+ firstValueIndex = index;
+ firstValue = rowWindow.getRow(index).getLong(0);
+ break;
}
}
- Row row = rowWindow.getRow(0);
- collector.putLong(row.getTime(), row.getLong(0));
-
- int smallerIndex = Math.min(minIndex, maxIndex);
- int largerIndex = Math.max(minIndex, maxIndex);
- if (smallerIndex > 0) {
- row = rowWindow.getRow(smallerIndex);
- collector.putLong(row.getTime(), row.getLong(0));
- }
- if (largerIndex > smallerIndex) {
- row = rowWindow.getRow(largerIndex);
- collector.putLong(row.getTime(), row.getLong(0));
- }
- if (largerIndex < rowWindow.windowSize() - 1) {
- row = rowWindow.getRow(rowWindow.windowSize() - 1);
- collector.putLong(row.getTime(), row.getLong(0));
+ if (firstValueIndex != -1) { // else empty window do nothing
+ int lastValueIndex = firstValueIndex;
+ long lastValue = firstValue;
+ int minValueIndex = firstValueIndex;
+ long minValue = firstValue;
+ int maxValueIndex = firstValueIndex;
+ long maxValue = firstValue;
+
+ for (; index < size; index++) {
+ if (!rowWindow.getRow(index).isNull(0)) {
+ lastValueIndex = index;
+ lastValue = rowWindow.getRow(index).getLong(0);
+ if (lastValue < minValue) {
+ minValue = lastValue;
+ minValueIndex = index;
+ }
+ if (lastValue > maxValue) {
+ maxValue = lastValue;
+ maxValueIndex = index;
+ }
+ }
+ }
+ // first value
+ collector.putLong(rowWindow.getRow(firstValueIndex).getTime(),
firstValue);
+ // min and max value if not duplicate
+ // if min/max value is equal to first/last value, we keep first/last
value
+ int smallerIndex = Math.min(minValueIndex, maxValueIndex);
+ int largerIndex = Math.max(minValueIndex, maxValueIndex);
+ if (smallerIndex > firstValueIndex
+ && rowWindow.getRow(smallerIndex).getLong(0) != lastValue) {
+ collector.putLong(
+ rowWindow.getRow(smallerIndex).getTime(),
rowWindow.getRow(smallerIndex).getLong(0));
+ }
+ if (largerIndex > smallerIndex &&
rowWindow.getRow(largerIndex).getLong(0) != lastValue) {
+ collector.putLong(
+ rowWindow.getRow(largerIndex).getTime(),
rowWindow.getRow(largerIndex).getLong(0));
+ }
+ // last value
+ if (lastValueIndex > firstValueIndex) {
+ collector.putLong(rowWindow.getRow(lastValueIndex).getTime(),
lastValue);
+ }
}
}
}
public void transformFloat(RowWindow rowWindow, PointCollector collector)
throws IOException {
if (rowWindow.windowSize() > 0) { // else empty window do nothing
- float firstValue = rowWindow.getRow(0).getFloat(0);
- float lastValue = rowWindow.getRow(rowWindow.windowSize() -
1).getFloat(0);
-
- float minValue = Math.min(firstValue, lastValue);
- float maxValue = Math.max(firstValue, lastValue);
- int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
- int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
-
- for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
- float value = rowWindow.getRow(i).getFloat(0);
- if (value < minValue) {
- minValue = value;
- minIndex = i;
- }
- if (value > maxValue) {
- maxValue = value;
- maxIndex = i;
+ int index = 0;
+ int size = rowWindow.windowSize();
+ int firstValueIndex = -1;
+ float firstValue = 0;
+
+ for (; index < size; index++) {
+ if (!rowWindow.getRow(index).isNull(0)) {
+ firstValueIndex = index;
+ firstValue = rowWindow.getRow(index).getFloat(0);
+ break;
}
}
- Row row = rowWindow.getRow(0);
- collector.putFloat(row.getTime(), row.getFloat(0));
-
- int smallerIndex = Math.min(minIndex, maxIndex);
- int largerIndex = Math.max(minIndex, maxIndex);
- if (smallerIndex > 0) {
- row = rowWindow.getRow(smallerIndex);
- collector.putFloat(row.getTime(), row.getFloat(0));
- }
- if (largerIndex > smallerIndex) {
- row = rowWindow.getRow(largerIndex);
- collector.putFloat(row.getTime(), row.getFloat(0));
- }
- if (largerIndex < rowWindow.windowSize() - 1) {
- row = rowWindow.getRow(rowWindow.windowSize() - 1);
- collector.putFloat(row.getTime(), row.getFloat(0));
+ if (firstValueIndex != -1) { // else empty window do nothing
+ int lastValueIndex = firstValueIndex;
+ float lastValue = firstValue;
+ int minValueIndex = firstValueIndex;
+ float minValue = firstValue;
+ int maxValueIndex = firstValueIndex;
+ float maxValue = firstValue;
+
+ for (; index < size; index++) {
+ if (!rowWindow.getRow(index).isNull(0)) {
+ lastValueIndex = index;
+ lastValue = rowWindow.getRow(index).getFloat(0);
+ if (lastValue < minValue) {
+ minValue = lastValue;
+ minValueIndex = index;
+ }
+ if (lastValue > maxValue) {
+ maxValue = lastValue;
+ maxValueIndex = index;
+ }
+ }
+ }
+ // first value
+ collector.putFloat(rowWindow.getRow(firstValueIndex).getTime(),
firstValue);
+ // min and max value if not duplicate
+ // if min/max value is equal to first/last value, we keep first/last
value
+ int smallerIndex = Math.min(minValueIndex, maxValueIndex);
+ int largerIndex = Math.max(minValueIndex, maxValueIndex);
+ if (smallerIndex > firstValueIndex
+ && rowWindow.getRow(smallerIndex).getFloat(0) != lastValue) {
+ collector.putFloat(
+ rowWindow.getRow(smallerIndex).getTime(),
rowWindow.getRow(smallerIndex).getFloat(0));
+ }
+ if (largerIndex > smallerIndex &&
rowWindow.getRow(largerIndex).getFloat(0) != lastValue) {
+ collector.putFloat(
+ rowWindow.getRow(largerIndex).getTime(),
rowWindow.getRow(largerIndex).getFloat(0));
+ }
+ // last value
+ if (lastValueIndex > firstValueIndex) {
+ collector.putFloat(rowWindow.getRow(lastValueIndex).getTime(),
lastValue);
+ }
}
}
}
public void transformDouble(RowWindow rowWindow, PointCollector collector)
throws IOException {
if (rowWindow.windowSize() > 0) { // else empty window do nothing
- double firstValue = rowWindow.getRow(0).getDouble(0);
- double lastValue = rowWindow.getRow(rowWindow.windowSize() -
1).getDouble(0);
-
- double minValue = Math.min(firstValue, lastValue);
- double maxValue = Math.max(firstValue, lastValue);
- int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
- int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
-
- for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
- double value = rowWindow.getRow(i).getDouble(0);
- if (value < minValue) {
- minValue = value;
- minIndex = i;
- }
- if (value > maxValue) {
- maxValue = value;
- maxIndex = i;
+ int index = 0;
+ int size = rowWindow.windowSize();
+ int firstValueIndex = -1;
+ double firstValue = 0;
+
+ for (; index < size; index++) {
+ if (!rowWindow.getRow(index).isNull(0)) {
+ firstValueIndex = index;
+ firstValue = rowWindow.getRow(index).getDouble(0);
+ break;
}
}
- Row row = rowWindow.getRow(0);
- collector.putDouble(row.getTime(), row.getDouble(0));
-
- int smallerIndex = Math.min(minIndex, maxIndex);
- int largerIndex = Math.max(minIndex, maxIndex);
- if (smallerIndex > 0) {
- row = rowWindow.getRow(smallerIndex);
- collector.putDouble(row.getTime(), row.getDouble(0));
- }
- if (largerIndex > smallerIndex) {
- row = rowWindow.getRow(largerIndex);
- collector.putDouble(row.getTime(), row.getDouble(0));
- }
- if (largerIndex < rowWindow.windowSize() - 1) {
- row = rowWindow.getRow(rowWindow.windowSize() - 1);
- collector.putDouble(row.getTime(), row.getDouble(0));
+ if (firstValueIndex != -1) { // else empty window do nothing
+ int lastValueIndex = firstValueIndex;
+ double lastValue = firstValue;
+ int minValueIndex = firstValueIndex;
+ double minValue = firstValue;
+ int maxValueIndex = firstValueIndex;
+ double maxValue = firstValue;
+
+ for (; index < size; index++) {
+ if (!rowWindow.getRow(index).isNull(0)) {
+ lastValueIndex = index;
+ lastValue = rowWindow.getRow(index).getDouble(0);
+ if (lastValue < minValue) {
+ minValue = lastValue;
+ minValueIndex = index;
+ }
+ if (lastValue > maxValue) {
+ maxValue = lastValue;
+ maxValueIndex = index;
+ }
+ }
+ }
+ // first value
+ collector.putDouble(rowWindow.getRow(firstValueIndex).getTime(),
firstValue);
+ // min and max value if not duplicate
+ // if min/max value is equal to first/last value, we keep first/last
value
+ int smallerIndex = Math.min(minValueIndex, maxValueIndex);
+ int largerIndex = Math.max(minValueIndex, maxValueIndex);
+ if (smallerIndex > firstValueIndex
+ && rowWindow.getRow(smallerIndex).getDouble(0) != lastValue) {
+ collector.putDouble(
+ rowWindow.getRow(smallerIndex).getTime(),
+ rowWindow.getRow(smallerIndex).getDouble(0));
+ }
+ if (largerIndex > smallerIndex &&
rowWindow.getRow(largerIndex).getDouble(0) != lastValue) {
+ collector.putDouble(
+ rowWindow.getRow(largerIndex).getTime(),
rowWindow.getRow(largerIndex).getDouble(0));
+ }
+ // last value
+ if (lastValueIndex > firstValueIndex) {
+ collector.putDouble(rowWindow.getRow(lastValueIndex).getTime(),
lastValue);
+ }
}
}
}