This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new e23e9c5adf8 [To rel/1.1] [IOTDB-6096] Make M4 Function do the nullable 
judgement
e23e9c5adf8 is described below

commit e23e9c5adf84720db7d2d0c87646756db4257a6a
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Aug 4 16:48:49 2023 +0800

    [To rel/1.1] [IOTDB-6096] Make M4 Function do the nullable judgement
---
 .../org/apache/iotdb/db/it/udf/IoTDBUDFM4IT.java   |  58 ++++
 .../db/it/udf/IoTDBUDTFBuiltinFunctionIT.java      |   1 -
 .../apache/iotdb/commons/udf/builtin/UDTFM4.java   | 338 +++++++++++++--------
 ...ticSerializableTVListBackedSingleColumnRow.java |   6 +-
 .../LayerPointReaderBackedSingleColumnRow.java     |   6 +-
 5 files changed, 273 insertions(+), 136 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFM4IT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFM4IT.java
index 15025684525..f0e47ca2abd 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFM4IT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFM4IT.java
@@ -127,6 +127,64 @@ public class IoTDBUDFM4IT {
     }
   }
 
+  @Test
+  public void test_M4_slidingTimeWindow_2() {
+    String[] res =
+        new String[] {
+          "0,null,1",
+          "1,5.0,null",
+          "10,30.0,null",
+          "20,20.0,null",
+          "24,null,1",
+          "25,8.0,1",
+          "30,40.0,null",
+          "45,30.0,null",
+          "49,null,1",
+          "50,null,1",
+          "52,8.0,null",
+          "54,18.0,null",
+          "74,null,1",
+          "75,null,1",
+          "99,null,1",
+          "120,8.0,null"
+        };
+
+    String sql =
+        String.format(
+            "select M4(s1, '%s'='%s','%s'='%s','%s'='%s','%s'='%s'), M4(s2, 
'%s'='%s','%s'='%s','%s'='%s','%s'='%s') from root.vehicle.d1",
+            TIME_INTERVAL_KEY,
+            25,
+            SLIDING_STEP_KEY,
+            25,
+            DISPLAY_WINDOW_BEGIN_KEY,
+            0,
+            DISPLAY_WINDOW_END_KEY,
+            150,
+            TIME_INTERVAL_KEY,
+            25,
+            SLIDING_STEP_KEY,
+            25,
+            DISPLAY_WINDOW_BEGIN_KEY,
+            0,
+            DISPLAY_WINDOW_END_KEY,
+            150);
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String str =
+            resultSet.getString(1) + "," + resultSet.getString(2) + "," + 
resultSet.getString(3);
+        Assert.assertEquals(res[count], str);
+        count++;
+      }
+      Assert.assertEquals(res.length, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
   @Test
   public void test_M4_slidingSizeWindow() {
     String[] res = new String[] {"1,5.0", "30,40.0", "33,9.0", "35,10.0", 
"45,30.0", "120,8.0"};
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
index 86411918a39..925b13b2370 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
@@ -38,7 +38,6 @@ import java.sql.Statement;
 import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
 import static org.apache.iotdb.itbase.constant.TestConstant.DEVICE;
 import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
-import static org.apache.iotdb.itbase.constant.TestConstant.count;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java
index 134d5a26490..9aee8121b91 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.udf.api.UDTF;
-import org.apache.iotdb.udf.api.access.Row;
 import org.apache.iotdb.udf.api.access.RowWindow;
 import org.apache.iotdb.udf.api.collector.PointCollector;
 import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
@@ -151,168 +150,241 @@ public class UDTFM4 implements UDTF {
 
   public void transformInt(RowWindow rowWindow, PointCollector collector) 
throws IOException {
     if (rowWindow.windowSize() > 0) { // else empty window do nothing
-      int firstValue = rowWindow.getRow(0).getInt(0);
-      int lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getInt(0);
-
-      int minValue = Math.min(firstValue, lastValue);
-      int maxValue = Math.max(firstValue, lastValue);
-      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
-      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
-
-      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
-        int value = rowWindow.getRow(i).getInt(0);
-        if (value < minValue) {
-          minValue = value;
-          minIndex = i;
-        }
-        if (value > maxValue) {
-          maxValue = value;
-          maxIndex = i;
+      int index = 0;
+      int size = rowWindow.windowSize();
+      int firstValueIndex = -1;
+      int firstValue = 0;
+
+      for (; index < size; index++) {
+        if (!rowWindow.getRow(index).isNull(0)) {
+          firstValueIndex = index;
+          firstValue = rowWindow.getRow(index).getInt(0);
+          break;
         }
       }
 
-      Row row = rowWindow.getRow(0);
-      collector.putInt(row.getTime(), row.getInt(0));
-
-      int smallerIndex = Math.min(minIndex, maxIndex);
-      int largerIndex = Math.max(minIndex, maxIndex);
-      if (smallerIndex > 0) {
-        row = rowWindow.getRow(smallerIndex);
-        collector.putInt(row.getTime(), row.getInt(0));
-      }
-      if (largerIndex > smallerIndex) {
-        row = rowWindow.getRow(largerIndex);
-        collector.putInt(row.getTime(), row.getInt(0));
-      }
-      if (largerIndex < rowWindow.windowSize() - 1) {
-        row = rowWindow.getRow(rowWindow.windowSize() - 1);
-        collector.putInt(row.getTime(), row.getInt(0));
+      if (firstValueIndex != -1) { // else empty window do nothing
+        int lastValueIndex = firstValueIndex;
+        int lastValue = firstValue;
+        int minValueIndex = firstValueIndex;
+        int minValue = firstValue;
+        int maxValueIndex = firstValueIndex;
+        int maxValue = firstValue;
+
+        for (; index < size; index++) {
+          if (!rowWindow.getRow(index).isNull(0)) {
+            lastValueIndex = index;
+            lastValue = rowWindow.getRow(index).getInt(0);
+            if (lastValue < minValue) {
+              minValue = lastValue;
+              minValueIndex = index;
+            }
+            if (lastValue > maxValue) {
+              maxValue = lastValue;
+              maxValueIndex = index;
+            }
+          }
+        }
+        // first value
+        collector.putInt(rowWindow.getRow(firstValueIndex).getTime(), 
firstValue);
+        // min and max value if not duplicate
+        // if min/max value is equal to first/last value, we keep first/last 
value
+        int smallerIndex = Math.min(minValueIndex, maxValueIndex);
+        int largerIndex = Math.max(minValueIndex, maxValueIndex);
+        if (smallerIndex > firstValueIndex
+            && rowWindow.getRow(smallerIndex).getInt(0) != lastValue) {
+          collector.putInt(
+              rowWindow.getRow(smallerIndex).getTime(), 
rowWindow.getRow(smallerIndex).getInt(0));
+        }
+        if (largerIndex > smallerIndex && 
rowWindow.getRow(largerIndex).getInt(0) != lastValue) {
+          collector.putInt(
+              rowWindow.getRow(largerIndex).getTime(), 
rowWindow.getRow(largerIndex).getInt(0));
+        }
+        // last value
+        if (lastValueIndex > firstValueIndex) {
+          collector.putInt(rowWindow.getRow(lastValueIndex).getTime(), 
lastValue);
+        }
       }
     }
   }
 
   public void transformLong(RowWindow rowWindow, PointCollector collector) 
throws IOException {
     if (rowWindow.windowSize() > 0) { // else empty window do nothing
-      long firstValue = rowWindow.getRow(0).getLong(0);
-      long lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getLong(0);
-
-      long minValue = Math.min(firstValue, lastValue);
-      long maxValue = Math.max(firstValue, lastValue);
-      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
-      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
-
-      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
-        long value = rowWindow.getRow(i).getLong(0);
-        if (value < minValue) {
-          minValue = value;
-          minIndex = i;
-        }
-        if (value > maxValue) {
-          maxValue = value;
-          maxIndex = i;
+      int index = 0;
+      int size = rowWindow.windowSize();
+      int firstValueIndex = -1;
+      long firstValue = 0;
+
+      for (; index < size; index++) {
+        if (!rowWindow.getRow(index).isNull(0)) {
+          firstValueIndex = index;
+          firstValue = rowWindow.getRow(index).getLong(0);
+          break;
         }
       }
 
-      Row row = rowWindow.getRow(0);
-      collector.putLong(row.getTime(), row.getLong(0));
-
-      int smallerIndex = Math.min(minIndex, maxIndex);
-      int largerIndex = Math.max(minIndex, maxIndex);
-      if (smallerIndex > 0) {
-        row = rowWindow.getRow(smallerIndex);
-        collector.putLong(row.getTime(), row.getLong(0));
-      }
-      if (largerIndex > smallerIndex) {
-        row = rowWindow.getRow(largerIndex);
-        collector.putLong(row.getTime(), row.getLong(0));
-      }
-      if (largerIndex < rowWindow.windowSize() - 1) {
-        row = rowWindow.getRow(rowWindow.windowSize() - 1);
-        collector.putLong(row.getTime(), row.getLong(0));
+      if (firstValueIndex != -1) { // else empty window do nothing
+        int lastValueIndex = firstValueIndex;
+        long lastValue = firstValue;
+        int minValueIndex = firstValueIndex;
+        long minValue = firstValue;
+        int maxValueIndex = firstValueIndex;
+        long maxValue = firstValue;
+
+        for (; index < size; index++) {
+          if (!rowWindow.getRow(index).isNull(0)) {
+            lastValueIndex = index;
+            lastValue = rowWindow.getRow(index).getLong(0);
+            if (lastValue < minValue) {
+              minValue = lastValue;
+              minValueIndex = index;
+            }
+            if (lastValue > maxValue) {
+              maxValue = lastValue;
+              maxValueIndex = index;
+            }
+          }
+        }
+        // first value
+        collector.putLong(rowWindow.getRow(firstValueIndex).getTime(), 
firstValue);
+        // min and max value if not duplicate
+        // if min/max value is equal to first/last value, we keep first/last 
value
+        int smallerIndex = Math.min(minValueIndex, maxValueIndex);
+        int largerIndex = Math.max(minValueIndex, maxValueIndex);
+        if (smallerIndex > firstValueIndex
+            && rowWindow.getRow(smallerIndex).getLong(0) != lastValue) {
+          collector.putLong(
+              rowWindow.getRow(smallerIndex).getTime(), 
rowWindow.getRow(smallerIndex).getLong(0));
+        }
+        if (largerIndex > smallerIndex && 
rowWindow.getRow(largerIndex).getLong(0) != lastValue) {
+          collector.putLong(
+              rowWindow.getRow(largerIndex).getTime(), 
rowWindow.getRow(largerIndex).getLong(0));
+        }
+        // last value
+        if (lastValueIndex > firstValueIndex) {
+          collector.putLong(rowWindow.getRow(lastValueIndex).getTime(), 
lastValue);
+        }
       }
     }
   }
 
   public void transformFloat(RowWindow rowWindow, PointCollector collector) 
throws IOException {
     if (rowWindow.windowSize() > 0) { // else empty window do nothing
-      float firstValue = rowWindow.getRow(0).getFloat(0);
-      float lastValue = rowWindow.getRow(rowWindow.windowSize() - 
1).getFloat(0);
-
-      float minValue = Math.min(firstValue, lastValue);
-      float maxValue = Math.max(firstValue, lastValue);
-      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
-      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
-
-      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
-        float value = rowWindow.getRow(i).getFloat(0);
-        if (value < minValue) {
-          minValue = value;
-          minIndex = i;
-        }
-        if (value > maxValue) {
-          maxValue = value;
-          maxIndex = i;
+      int index = 0;
+      int size = rowWindow.windowSize();
+      int firstValueIndex = -1;
+      float firstValue = 0;
+
+      for (; index < size; index++) {
+        if (!rowWindow.getRow(index).isNull(0)) {
+          firstValueIndex = index;
+          firstValue = rowWindow.getRow(index).getFloat(0);
+          break;
         }
       }
 
-      Row row = rowWindow.getRow(0);
-      collector.putFloat(row.getTime(), row.getFloat(0));
-
-      int smallerIndex = Math.min(minIndex, maxIndex);
-      int largerIndex = Math.max(minIndex, maxIndex);
-      if (smallerIndex > 0) {
-        row = rowWindow.getRow(smallerIndex);
-        collector.putFloat(row.getTime(), row.getFloat(0));
-      }
-      if (largerIndex > smallerIndex) {
-        row = rowWindow.getRow(largerIndex);
-        collector.putFloat(row.getTime(), row.getFloat(0));
-      }
-      if (largerIndex < rowWindow.windowSize() - 1) {
-        row = rowWindow.getRow(rowWindow.windowSize() - 1);
-        collector.putFloat(row.getTime(), row.getFloat(0));
+      if (firstValueIndex != -1) { // else empty window do nothing
+        int lastValueIndex = firstValueIndex;
+        float lastValue = firstValue;
+        int minValueIndex = firstValueIndex;
+        float minValue = firstValue;
+        int maxValueIndex = firstValueIndex;
+        float maxValue = firstValue;
+
+        for (; index < size; index++) {
+          if (!rowWindow.getRow(index).isNull(0)) {
+            lastValueIndex = index;
+            lastValue = rowWindow.getRow(index).getFloat(0);
+            if (lastValue < minValue) {
+              minValue = lastValue;
+              minValueIndex = index;
+            }
+            if (lastValue > maxValue) {
+              maxValue = lastValue;
+              maxValueIndex = index;
+            }
+          }
+        }
+        // first value
+        collector.putFloat(rowWindow.getRow(firstValueIndex).getTime(), 
firstValue);
+        // min and max value if not duplicate
+        // if min/max value is equal to first/last value, we keep first/last 
value
+        int smallerIndex = Math.min(minValueIndex, maxValueIndex);
+        int largerIndex = Math.max(minValueIndex, maxValueIndex);
+        if (smallerIndex > firstValueIndex
+            && rowWindow.getRow(smallerIndex).getFloat(0) != lastValue) {
+          collector.putFloat(
+              rowWindow.getRow(smallerIndex).getTime(), 
rowWindow.getRow(smallerIndex).getFloat(0));
+        }
+        if (largerIndex > smallerIndex && 
rowWindow.getRow(largerIndex).getFloat(0) != lastValue) {
+          collector.putFloat(
+              rowWindow.getRow(largerIndex).getTime(), 
rowWindow.getRow(largerIndex).getFloat(0));
+        }
+        // last value
+        if (lastValueIndex > firstValueIndex) {
+          collector.putFloat(rowWindow.getRow(lastValueIndex).getTime(), 
lastValue);
+        }
       }
     }
   }
 
   public void transformDouble(RowWindow rowWindow, PointCollector collector) 
throws IOException {
     if (rowWindow.windowSize() > 0) { // else empty window do nothing
-      double firstValue = rowWindow.getRow(0).getDouble(0);
-      double lastValue = rowWindow.getRow(rowWindow.windowSize() - 
1).getDouble(0);
-
-      double minValue = Math.min(firstValue, lastValue);
-      double maxValue = Math.max(firstValue, lastValue);
-      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
-      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
-
-      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
-        double value = rowWindow.getRow(i).getDouble(0);
-        if (value < minValue) {
-          minValue = value;
-          minIndex = i;
-        }
-        if (value > maxValue) {
-          maxValue = value;
-          maxIndex = i;
+      int index = 0;
+      int size = rowWindow.windowSize();
+      int firstValueIndex = -1;
+      double firstValue = 0;
+
+      for (; index < size; index++) {
+        if (!rowWindow.getRow(index).isNull(0)) {
+          firstValueIndex = index;
+          firstValue = rowWindow.getRow(index).getDouble(0);
+          break;
         }
       }
 
-      Row row = rowWindow.getRow(0);
-      collector.putDouble(row.getTime(), row.getDouble(0));
-
-      int smallerIndex = Math.min(minIndex, maxIndex);
-      int largerIndex = Math.max(minIndex, maxIndex);
-      if (smallerIndex > 0) {
-        row = rowWindow.getRow(smallerIndex);
-        collector.putDouble(row.getTime(), row.getDouble(0));
-      }
-      if (largerIndex > smallerIndex) {
-        row = rowWindow.getRow(largerIndex);
-        collector.putDouble(row.getTime(), row.getDouble(0));
-      }
-      if (largerIndex < rowWindow.windowSize() - 1) {
-        row = rowWindow.getRow(rowWindow.windowSize() - 1);
-        collector.putDouble(row.getTime(), row.getDouble(0));
+      if (firstValueIndex != -1) { // else empty window do nothing
+        int lastValueIndex = firstValueIndex;
+        double lastValue = firstValue;
+        int minValueIndex = firstValueIndex;
+        double minValue = firstValue;
+        int maxValueIndex = firstValueIndex;
+        double maxValue = firstValue;
+
+        for (; index < size; index++) {
+          if (!rowWindow.getRow(index).isNull(0)) {
+            lastValueIndex = index;
+            lastValue = rowWindow.getRow(index).getDouble(0);
+            if (lastValue < minValue) {
+              minValue = lastValue;
+              minValueIndex = index;
+            }
+            if (lastValue > maxValue) {
+              maxValue = lastValue;
+              maxValueIndex = index;
+            }
+          }
+        }
+        // first value
+        collector.putDouble(rowWindow.getRow(firstValueIndex).getTime(), 
firstValue);
+        // min and max value if not duplicate
+        // if min/max value is equal to first/last value, we keep first/last 
value
+        int smallerIndex = Math.min(minValueIndex, maxValueIndex);
+        int largerIndex = Math.max(minValueIndex, maxValueIndex);
+        if (smallerIndex > firstValueIndex
+            && rowWindow.getRow(smallerIndex).getDouble(0) != lastValue) {
+          collector.putDouble(
+              rowWindow.getRow(smallerIndex).getTime(),
+              rowWindow.getRow(smallerIndex).getDouble(0));
+        }
+        if (largerIndex > smallerIndex && 
rowWindow.getRow(largerIndex).getDouble(0) != lastValue) {
+          collector.putDouble(
+              rowWindow.getRow(largerIndex).getTime(), 
rowWindow.getRow(largerIndex).getDouble(0));
+        }
+        // last value
+        if (lastValueIndex > firstValueIndex) {
+          collector.putDouble(rowWindow.getRow(lastValueIndex).getTime(), 
lastValue);
+        }
       }
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnRow.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnRow.java
index 18c490555f8..ccbd5e3e9c6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnRow.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnRow.java
@@ -86,7 +86,11 @@ public class ElasticSerializableTVListBackedSingleColumnRow 
implements Row {
 
   @Override
   public boolean isNull(int columnIndex) {
-    return false;
+    try {
+      return tvList.isNull(currentRowIndex);
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/LayerPointReaderBackedSingleColumnRow.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/LayerPointReaderBackedSingleColumnRow.java
index 325a9a2f8b7..a7ef0fff662 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/LayerPointReaderBackedSingleColumnRow.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/LayerPointReaderBackedSingleColumnRow.java
@@ -83,7 +83,11 @@ public class LayerPointReaderBackedSingleColumnRow 
implements Row {
 
   @Override
   public boolean isNull(int columnIndex) {
-    return false;
+    try {
+      return layerPointReader.isCurrentNull();
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
   }
 
   @Override

Reply via email to