This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a725ded2471 Fix overflow edge cases in query utilities (#17875)
a725ded2471 is described below

commit a725ded24717b54383401e11aa5e3680459a152c
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 10 19:36:59 2026 +0800

    Fix overflow edge cases in query utilities (#17875)
---
 iotdb-core/calc-commons/pom.xml                    |   5 +
 .../fill/filter/FixedIntervalFillFilter.java       |  10 +-
 .../relational/aggregation/ExtremeAccumulator.java |  32 ++--
 .../grouped/GroupedExtremeAccumulator.java         |  70 +++++---
 .../fill/filter/FixedIntervalFillFilterTest.java}  |  24 +--
 .../aggregation/ExtremeAccumulatorTest.java        | 150 ++++++++++++++++
 .../balancer/router/leader/HashLeaderBalancer.java |   2 +-
 .../router/leader/HashLeaderBalancerTest.java      |  65 +++++++
 .../iot/client/AsyncIoTConsensusServiceClient.java |   2 +-
 .../downsampling/changing/ChangingValueFilter.java |  20 ++-
 .../sdt/SwingingDoorTrendingFilter.java            |  19 +-
 .../tumbling/TumblingTimeSamplingProcessor.java    |   7 +-
 .../exchange/sender/TwoStageAggregateSender.java   |   2 +-
 .../tsfile/PipeTsFileResourceSegmentLock.java      |  10 +-
 .../execution/aggregation/ExtremeAccumulator.java  |  32 ++--
 .../SlidingWindowAggregatorFactory.java            |  36 ++--
 .../execution/operator/window/SessionWindow.java   |  24 ++-
 .../operator/window/SessionWindowManager.java      |   2 +-
 .../plan/AbstractFragmentParallelPlanner.java      |   3 +-
 .../SubscriptionPipeEventBatchSegmentLock.java     |   4 +-
 .../changing/ChangingValueFilterTest.java          |  55 ++++++
 .../sdt/SwingingDoorTrendingFilterTest.java        |  56 ++++++
 .../TumblingTimeSamplingProcessorTest.java         | 192 +++++++++++++++++++++
 .../tsfile/PipeTsFileResourceSegmentLockTest.java  |  71 ++++++++
 .../execution/aggregation/AccumulatorTest.java     |  46 +++++
 .../SlidingWindowAggregatorFactoryTest.java        |  41 +++++
 .../operator/window/SessionWindowTest.java         |  70 ++++++++
 .../SubscriptionPipeEventBatchSegmentLockTest.java |  50 ++++++
 .../async/AsyncAINodeInternalServiceClient.java    |   2 +-
 .../AsyncConfigNodeInternalServiceClient.java      |   2 +-
 .../async/AsyncDataNodeExternalServiceClient.java  |   2 +-
 .../async/AsyncDataNodeInternalServiceClient.java  |   2 +-
 .../AsyncDataNodeMPPDataExchangeServiceClient.java |   2 +-
 .../async/AsyncIoTConsensusV2ServiceClient.java    |   2 +-
 .../async/AsyncPipeDataTransferServiceClient.java  |   2 +-
 35 files changed, 1019 insertions(+), 95 deletions(-)

diff --git a/iotdb-core/calc-commons/pom.xml b/iotdb-core/calc-commons/pom.xml
index d854f29cce7..fbe69d074c8 100644
--- a/iotdb-core/calc-commons/pom.xml
+++ b/iotdb-core/calc-commons/pom.xml
@@ -99,6 +99,11 @@
             <groupId>at.yawk.lz4</groupId>
             <artifactId>lz4-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java
index 8fbd4ce379d..e7dd783b387 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java
@@ -34,6 +34,14 @@ public class FixedIntervalFillFilter implements IFillFilter {
   public boolean needFill(long time, long previousTime) {
     // the reason that we use Math.abs is that we may use order by time desc 
which will cause
     // previousTime is larger than time
-    return Math.abs(time - previousTime) <= timeInterval;
+    return isTimeDistanceLessThanOrEqual(time, previousTime, timeInterval);
+  }
+
+  private boolean isTimeDistanceLessThanOrEqual(long left, long right, long 
maxDistance) {
+    if (maxDistance < 0) {
+      return false;
+    }
+    long distance = left >= right ? left - right : right - left;
+    return Long.compareUnsigned(distance, maxDistance) <= 0;
   }
 }
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
index 85fbee4bd5e..f546282d31b 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
@@ -247,13 +247,9 @@ public class ExtremeAccumulator implements 
TableAccumulator {
   }
 
   private void updateIntResult(int val) {
-    int absExtVal = Math.abs(val);
     int candidateResult = extremeResult.getInt();
-    int absCandidateResult = Math.abs(extremeResult.getInt());
 
-    if (!initResult
-        || (absExtVal > absCandidateResult)
-        || (absExtVal == absCandidateResult) && val > candidateResult) {
+    if (!initResult || compareExtreme(val, candidateResult) > 0) {
       initResult = true;
       extremeResult.setInt(val);
     }
@@ -281,13 +277,9 @@ public class ExtremeAccumulator implements 
TableAccumulator {
   }
 
   private void updateLongResult(long val) {
-    long absExtVal = Math.abs(val);
     long candidateResult = extremeResult.getLong();
-    long absCandidateResult = Math.abs(extremeResult.getLong());
 
-    if (!initResult
-        || (absExtVal > absCandidateResult)
-        || (absExtVal == absCandidateResult) && val > candidateResult) {
+    if (!initResult || compareExtreme(val, candidateResult) > 0) {
       initResult = true;
       extremeResult.setLong(val);
     }
@@ -360,4 +352,24 @@ public class ExtremeAccumulator implements 
TableAccumulator {
       extremeResult.setDouble(val);
     }
   }
+
+  private int compareExtreme(int left, int right) {
+    int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) 
right));
+    return absComparison == 0 ? Integer.compare(left, right) : absComparison;
+  }
+
+  private int compareExtreme(long left, long right) {
+    int absComparison = compareAbs(left, right);
+    return absComparison == 0 ? Long.compare(left, right) : absComparison;
+  }
+
+  private int compareAbs(long left, long right) {
+    if (left == Long.MIN_VALUE) {
+      return right == Long.MIN_VALUE ? 0 : 1;
+    }
+    if (right == Long.MIN_VALUE) {
+      return -1;
+    }
+    return Long.compare(Math.abs(left), Math.abs(right));
+  }
 }
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
index bd5aca876b7..b4102de7aa0 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
@@ -173,16 +173,16 @@ public class GroupedExtremeAccumulator implements 
GroupedAccumulator {
 
       switch (seriesDataType) {
         case INT32:
-          updateIntValue(groupIds[i], Math.abs(argument.getInt(i)));
+          updateIntValue(groupIds[i], argument.getInt(i));
           break;
         case INT64:
-          updateLongValue(groupIds[i], Math.abs(argument.getLong(i)));
+          updateLongValue(groupIds[i], argument.getLong(i));
           break;
         case FLOAT:
-          updateFloatValue(groupIds[i], Math.abs(argument.getFloat(i)));
+          updateFloatValue(groupIds[i], argument.getFloat(i));
           break;
         case DOUBLE:
-          updateDoubleValue(groupIds[i], Math.abs(argument.getDouble(i)));
+          updateDoubleValue(groupIds[i], argument.getDouble(i));
           break;
         case TEXT:
         case STRING:
@@ -300,7 +300,7 @@ public class GroupedExtremeAccumulator implements 
GroupedAccumulator {
     if (mask.isSelectAll()) {
       for (int i = 0; i < positionCount; i++) {
         if (!valueColumn.isNull(i)) {
-          updateIntValue(groupIds[i], Math.abs(valueColumn.getInt(i)));
+          updateIntValue(groupIds[i], valueColumn.getInt(i));
         }
       }
     } else {
@@ -309,15 +309,15 @@ public class GroupedExtremeAccumulator implements 
GroupedAccumulator {
       for (int i = 0; i < positionCount; i++) {
         position = selectedPositions[i];
         if (!valueColumn.isNull(position)) {
-          updateIntValue(groupIds[position], 
Math.abs(valueColumn.getInt(position)));
+          updateIntValue(groupIds[position], valueColumn.getInt(position));
         }
       }
     }
   }
 
   protected void updateIntValue(int groupId, int value) {
-    int max = intValues.get(groupId);
-    if (value >= max) {
+    int candidate = intValues.get(groupId);
+    if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) {
       inits.set(groupId, true);
       intValues.set(groupId, value);
     }
@@ -329,7 +329,7 @@ public class GroupedExtremeAccumulator implements 
GroupedAccumulator {
     if (mask.isSelectAll()) {
       for (int i = 0; i < positionCount; i++) {
         if (!valueColumn.isNull(i)) {
-          updateLongValue(groupIds[i], Math.abs(valueColumn.getLong(i)));
+          updateLongValue(groupIds[i], valueColumn.getLong(i));
         }
       }
     } else {
@@ -338,15 +338,15 @@ public class GroupedExtremeAccumulator implements 
GroupedAccumulator {
       for (int i = 0; i < positionCount; i++) {
         position = selectedPositions[i];
         if (!valueColumn.isNull(position)) {
-          updateLongValue(groupIds[position], 
Math.abs(valueColumn.getLong(position)));
+          updateLongValue(groupIds[position], valueColumn.getLong(position));
         }
       }
     }
   }
 
   protected void updateLongValue(int groupId, long value) {
-    long max = longValues.get(groupId);
-    if (value >= max) {
+    long candidate = longValues.get(groupId);
+    if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) {
       inits.set(groupId, true);
       longValues.set(groupId, value);
     }
@@ -358,7 +358,7 @@ public class GroupedExtremeAccumulator implements 
GroupedAccumulator {
     if (mask.isSelectAll()) {
       for (int i = 0; i < positionCount; i++) {
         if (!valueColumn.isNull(i)) {
-          updateFloatValue(groupIds[i], Math.abs(valueColumn.getFloat(i)));
+          updateFloatValue(groupIds[i], valueColumn.getFloat(i));
         }
       }
     } else {
@@ -367,15 +367,15 @@ public class GroupedExtremeAccumulator implements 
GroupedAccumulator {
       for (int i = 0; i < positionCount; i++) {
         position = selectedPositions[i];
         if (!valueColumn.isNull(position)) {
-          updateFloatValue(groupIds[position], 
Math.abs(valueColumn.getFloat(position)));
+          updateFloatValue(groupIds[position], valueColumn.getFloat(position));
         }
       }
     }
   }
 
   protected void updateFloatValue(int groupId, float value) {
-    float max = floatValues.get(groupId);
-    if (value >= max) {
+    float candidate = floatValues.get(groupId);
+    if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) {
       inits.set(groupId, true);
       floatValues.set(groupId, value);
     }
@@ -387,7 +387,7 @@ public class GroupedExtremeAccumulator implements 
GroupedAccumulator {
     if (mask.isSelectAll()) {
       for (int i = 0; i < positionCount; i++) {
         if (!valueColumn.isNull(i)) {
-          updateDoubleValue(groupIds[i], Math.abs(valueColumn.getDouble(i)));
+          updateDoubleValue(groupIds[i], valueColumn.getDouble(i));
         }
       }
     } else {
@@ -396,17 +396,47 @@ public class GroupedExtremeAccumulator implements 
GroupedAccumulator {
       for (int i = 0; i < positionCount; i++) {
         position = selectedPositions[i];
         if (!valueColumn.isNull(position)) {
-          updateDoubleValue(groupIds[position], 
Math.abs(valueColumn.getDouble(position)));
+          updateDoubleValue(groupIds[position], 
valueColumn.getDouble(position));
         }
       }
     }
   }
 
   protected void updateDoubleValue(int groupId, double value) {
-    double max = doubleValues.get(groupId);
-    if (value >= max) {
+    double candidate = doubleValues.get(groupId);
+    if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) {
       inits.set(groupId, true);
       doubleValues.set(groupId, value);
     }
   }
+
+  private int compareExtreme(int left, int right) {
+    int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) 
right));
+    return absComparison == 0 ? Integer.compare(left, right) : absComparison;
+  }
+
+  private int compareExtreme(long left, long right) {
+    int absComparison = compareAbs(left, right);
+    return absComparison == 0 ? Long.compare(left, right) : absComparison;
+  }
+
+  private int compareAbs(long left, long right) {
+    if (left == Long.MIN_VALUE) {
+      return right == Long.MIN_VALUE ? 0 : 1;
+    }
+    if (right == Long.MIN_VALUE) {
+      return -1;
+    }
+    return Long.compare(Math.abs(left), Math.abs(right));
+  }
+
+  private int compareExtreme(float left, float right) {
+    int absComparison = Float.compare(Math.abs(left), Math.abs(right));
+    return absComparison == 0 ? Float.compare(left, right) : absComparison;
+  }
+
+  private int compareExtreme(double left, double right) {
+    int absComparison = Double.compare(Math.abs(left), Math.abs(right));
+    return absComparison == 0 ? Double.compare(left, right) : absComparison;
+  }
 }
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java
 
b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java
similarity index 59%
copy from 
iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java
copy to 
iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java
index 8fbd4ce379d..a40f6ec48f6 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java
+++ 
b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java
@@ -19,21 +19,23 @@
 
 package org.apache.iotdb.calc.execution.operator.process.fill.filter;
 
-import org.apache.iotdb.calc.execution.operator.process.fill.IFillFilter;
+import org.junit.Assert;
+import org.junit.Test;
 
-public class FixedIntervalFillFilter implements IFillFilter {
+public class FixedIntervalFillFilterTest {
 
-  // the time precision of this field is same as the system time_precision 
configuration.
-  private final long timeInterval;
+  @Test
+  public void needFillHandlesOverflowedTimeDistance() {
+    FixedIntervalFillFilter filter = new FixedIntervalFillFilter(1);
 
-  public FixedIntervalFillFilter(long timeInterval) {
-    this.timeInterval = timeInterval;
+    Assert.assertTrue(filter.needFill(Long.MIN_VALUE, Long.MIN_VALUE + 1));
+    Assert.assertFalse(filter.needFill(Long.MIN_VALUE, Long.MAX_VALUE));
   }
 
-  @Override
-  public boolean needFill(long time, long previousTime) {
-    // the reason that we use Math.abs is that we may use order by time desc 
which will cause
-    // previousTime is larger than time
-    return Math.abs(time - previousTime) <= timeInterval;
+  @Test
+  public void needFillRejectsNegativeInterval() {
+    FixedIntervalFillFilter filter = new FixedIntervalFillFilter(-1);
+
+    Assert.assertFalse(filter.needFill(0, 0));
   }
 }
diff --git 
a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java
 
b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java
new file mode 100644
index 00000000000..1edb8610013
--- /dev/null
+++ 
b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.calc.execution.operator.source.relational.aggregation;
+
+import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedExtremeAccumulator;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.tsfile.read.common.block.column.LongColumnBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class ExtremeAccumulatorTest {
+
+  @Test
+  public void tableExtremeAccumulatorHandlesMinValues() {
+    ExtremeAccumulator intAccumulator = new 
ExtremeAccumulator(TSDataType.INT32);
+    TsBlock intBlock = buildIntBlock(Integer.MAX_VALUE, Integer.MIN_VALUE);
+    intAccumulator.addInput(
+        new org.apache.tsfile.block.column.Column[] {intBlock.getColumn(0)},
+        AggregationMask.createSelectAll(intBlock.getPositionCount()));
+    ColumnBuilder intResult = new IntColumnBuilder(null, 1);
+    intAccumulator.evaluateFinal(intResult);
+    Assert.assertEquals(Integer.MIN_VALUE, intResult.build().getInt(0));
+
+    ExtremeAccumulator longAccumulator = new 
ExtremeAccumulator(TSDataType.INT64);
+    TsBlock longBlock = buildLongBlock(Long.MAX_VALUE, Long.MIN_VALUE);
+    longAccumulator.addInput(
+        new org.apache.tsfile.block.column.Column[] {longBlock.getColumn(0)},
+        AggregationMask.createSelectAll(longBlock.getPositionCount()));
+    ColumnBuilder longResult = new LongColumnBuilder(null, 1);
+    longAccumulator.evaluateFinal(longResult);
+    Assert.assertEquals(Long.MIN_VALUE, longResult.build().getLong(0));
+  }
+
+  @Test
+  public void groupedExtremeAccumulatorKeepsOriginalValueAndHandlesMinValues() 
{
+    GroupedExtremeAccumulator intAccumulator = new 
GroupedExtremeAccumulator(TSDataType.INT32);
+    intAccumulator.setGroupCount(1);
+    TsBlock intBlock = buildIntBlock(-5, 4);
+    intAccumulator.addInput(
+        new int[] {0, 0},
+        new org.apache.tsfile.block.column.Column[] {intBlock.getColumn(0)},
+        AggregationMask.createSelectAll(intBlock.getPositionCount()));
+    ColumnBuilder intResult = new IntColumnBuilder(null, 1);
+    intAccumulator.evaluateFinal(0, intResult);
+    Assert.assertEquals(-5, intResult.build().getInt(0));
+
+    GroupedExtremeAccumulator longAccumulator = new 
GroupedExtremeAccumulator(TSDataType.INT64);
+    longAccumulator.setGroupCount(1);
+    TsBlock longBlock = buildLongBlock(Long.MAX_VALUE, Long.MIN_VALUE);
+    longAccumulator.addIntermediate(new int[] {0, 0}, longBlock.getColumn(0));
+    ColumnBuilder longResult = new LongColumnBuilder(null, 1);
+    longAccumulator.evaluateFinal(0, longResult);
+    Assert.assertEquals(Long.MIN_VALUE, longResult.build().getLong(0));
+  }
+
+  @Test
+  public void groupedExtremeAccumulatorKeepsOriginalFloatingValues() {
+    GroupedExtremeAccumulator floatAccumulator = new 
GroupedExtremeAccumulator(TSDataType.FLOAT);
+    floatAccumulator.setGroupCount(1);
+    TsBlock floatBlock = buildFloatBlock(-5.5f, 4.5f);
+    floatAccumulator.addInput(
+        new int[] {0, 0},
+        new org.apache.tsfile.block.column.Column[] {floatBlock.getColumn(0)},
+        AggregationMask.createSelectAll(floatBlock.getPositionCount()));
+    ColumnBuilder floatResult = new FloatColumnBuilder(null, 1);
+    floatAccumulator.evaluateFinal(0, floatResult);
+    Assert.assertEquals(-5.5f, floatResult.build().getFloat(0), 0.001);
+
+    GroupedExtremeAccumulator doubleAccumulator = new 
GroupedExtremeAccumulator(TSDataType.DOUBLE);
+    doubleAccumulator.setGroupCount(1);
+    TsBlock doubleBlock = buildDoubleBlock(-10.25, 9.25);
+    doubleAccumulator.addInput(
+        new int[] {0, 0},
+        new org.apache.tsfile.block.column.Column[] {doubleBlock.getColumn(0)},
+        AggregationMask.createSelectAll(doubleBlock.getPositionCount()));
+    ColumnBuilder doubleResult = new DoubleColumnBuilder(null, 1);
+    doubleAccumulator.evaluateFinal(0, doubleResult);
+    Assert.assertEquals(-10.25, doubleResult.build().getDouble(0), 0.001);
+  }
+
+  private TsBlock buildIntBlock(int... values) {
+    TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0];
+    for (int i = 0; i < values.length; i++) {
+      builder.getTimeColumnBuilder().writeLong(i);
+      valueBuilder.writeInt(values[i]);
+      builder.declarePosition();
+    }
+    return builder.build();
+  }
+
+  private TsBlock buildLongBlock(long... values) {
+    TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(TSDataType.INT64));
+    ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0];
+    for (int i = 0; i < values.length; i++) {
+      builder.getTimeColumnBuilder().writeLong(i);
+      valueBuilder.writeLong(values[i]);
+      builder.declarePosition();
+    }
+    return builder.build();
+  }
+
+  private TsBlock buildFloatBlock(float... values) {
+    TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(TSDataType.FLOAT));
+    ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0];
+    for (int i = 0; i < values.length; i++) {
+      builder.getTimeColumnBuilder().writeLong(i);
+      valueBuilder.writeFloat(values[i]);
+      builder.declarePosition();
+    }
+    return builder.build();
+  }
+
+  private TsBlock buildDoubleBlock(double... values) {
+    TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(TSDataType.DOUBLE));
+    ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0];
+    for (int i = 0; i < values.length; i++) {
+      builder.getTimeColumnBuilder().writeLong(i);
+      valueBuilder.writeDouble(values[i]);
+      builder.declarePosition();
+    }
+    return builder.build();
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java
index aff023e82da..06ad4a76eb5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java
@@ -43,7 +43,7 @@ public class HashLeaderBalancer extends 
AbstractLeaderBalancer {
         (gid, nodeSet) -> {
           List<Integer> nodeList = new ArrayList<>(nodeSet);
           nodeList.sort(null);
-          int startNodeIndex = Math.abs(gid.hashCode()) % nodeList.size();
+          int startNodeIndex = (int) (Math.abs((long) gid.hashCode()) % 
nodeList.size());
           int finalNodeId = nodeList.get(startNodeIndex);
           for (int i = 0; i < nodeList.size(); i++) {
             int currentNodeIndex = (startNodeIndex + i) % nodeList.size();
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java
new file mode 100644
index 00000000000..f2ccccd857d
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class HashLeaderBalancerTest {
+
+  private static final HashLeaderBalancer BALANCER = new HashLeaderBalancer();
+  private static final int MIN_VALUE_HASH_REGION_ID = 268255235;
+
+  @Test
+  public void minValueHashCodeTest() {
+    TConsensusGroupId regionGroupId =
+        new TConsensusGroupId(TConsensusGroupType.DataRegion, 
MIN_VALUE_HASH_REGION_ID);
+    Assert.assertEquals(Integer.MIN_VALUE, regionGroupId.hashCode());
+
+    Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
+    regionReplicaSetMap.put(regionGroupId, new HashSet<>(Arrays.asList(1, 2, 
3)));
+
+    Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
+    dataNodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running));
+    dataNodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Running));
+    dataNodeStatisticsMap.put(3, new NodeStatistics(NodeStatus.Running));
+
+    Map<TConsensusGroupId, Integer> leaderDistribution =
+        BALANCER.generateOptimalLeaderDistribution(
+            new TreeMap<>(),
+            regionReplicaSetMap,
+            new TreeMap<>(),
+            dataNodeStatisticsMap,
+            new TreeMap<>());
+
+    Assert.assertEquals(Integer.valueOf(3), 
leaderDistribution.get(regionGroupId));
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index ad0bcc0d9a2..99e6cbcf71e 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -160,7 +160,7 @@ public class AsyncIoTConsensusServiceClient extends 
IoTConsensusIService.AsyncCl
           new AsyncIoTConsensusServiceClient(
               thriftClientProperty,
               endPoint,
-              tManagers[clientCnt.incrementAndGet() % tManagers.length],
+              tManagers[Math.floorMod(clientCnt.incrementAndGet(), 
tManagers.length)],
               clientManager));
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
index 7dc8c87c09b..9bb5dfcb9a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
@@ -59,13 +59,13 @@ public class ChangingValueFilter<T> {
   }
 
   private boolean tryFilter(final long timestamp, final T value) {
-    final long timeDiff = Math.abs(timestamp - lastStoredTimestamp);
-
-    if (timeDiff <= processor.getCompressionMinTimeInterval()) {
+    if (isTimeDistanceLessThanOrEqual(
+        timestamp, lastStoredTimestamp, 
processor.getCompressionMinTimeInterval())) {
       return false;
     }
 
-    if (timeDiff >= processor.getCompressionMaxTimeInterval()) {
+    if (isTimeDistanceGreaterThanOrEqual(
+        timestamp, lastStoredTimestamp, 
processor.getCompressionMaxTimeInterval())) {
       reset(timestamp, value);
       return true;
     }
@@ -94,6 +94,18 @@ public class ChangingValueFilter<T> {
     return false;
   }
 
+  private boolean isTimeDistanceLessThanOrEqual(
+      final long left, final long right, final long maxDistance) {
+    final long distance = left >= right ? left - right : right - left;
+    return Long.compareUnsigned(distance, maxDistance) <= 0;
+  }
+
+  private boolean isTimeDistanceGreaterThanOrEqual(
+      final long left, final long right, final long minDistance) {
+    final long distance = left >= right ? left - right : right - left;
+    return Long.compareUnsigned(distance, minDistance) >= 0;
+  }
+
   private void reset(final long timestamp, final T value) {
     lastStoredTimestamp = timestamp;
     lastStoredValue = value;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
index 850cbd3ed07..bfc0bc5d3f7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
@@ -86,13 +86,14 @@ public class SwingingDoorTrendingFilter<T> {
 
   private boolean tryFilter(final long timestamp, final T value) {
     final long timeDiff = timestamp - lastStoredTimestamp;
-    final long absTimeDiff = Math.abs(timeDiff);
 
-    if (absTimeDiff <= processor.getCompressionMinTimeInterval()) {
+    if (isTimeDistanceLessThanOrEqual(
+        timestamp, lastStoredTimestamp, 
processor.getCompressionMinTimeInterval())) {
       return false;
     }
 
-    if (absTimeDiff >= processor.getCompressionMaxTimeInterval()) {
+    if (isTimeDistanceGreaterThanOrEqual(
+        timestamp, lastStoredTimestamp, 
processor.getCompressionMaxTimeInterval())) {
       reset(timestamp, value);
       return true;
     }
@@ -144,6 +145,18 @@ public class SwingingDoorTrendingFilter<T> {
     return false;
   }
 
+  private boolean isTimeDistanceLessThanOrEqual(
+      final long left, final long right, final long maxDistance) {
+    final long distance = left >= right ? left - right : right - left;
+    return Long.compareUnsigned(distance, maxDistance) <= 0;
+  }
+
+  private boolean isTimeDistanceGreaterThanOrEqual(
+      final long left, final long right, final long minDistance) {
+    final long distance = left >= right ? left - right : right - left;
+    return Long.compareUnsigned(distance, minDistance) >= 0;
+  }
+
   private void reset(final long timestamp, final T value) {
     upperDoor = Double.MIN_VALUE;
     lowerDoor = Double.MAX_VALUE;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
index 36fa0b8382f..8beab134388 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
@@ -116,7 +116,7 @@ public class TumblingTimeSamplingProcessor extends 
DownSamplingProcessor {
       final Long lastSampleTime = 
pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix);
 
       if (lastSampleTime == null
-          || Math.abs(currentRowTime - lastSampleTime) >= 
intervalInCurrentPrecision) {
+          || isTimeDistanceGreaterThanOrEqual(currentRowTime, lastSampleTime)) 
{
         try {
           rowCollector.collectRow(row);
 
@@ -135,4 +135,9 @@ public class TumblingTimeSamplingProcessor extends 
DownSamplingProcessor {
       }
     }
   }
+
+  private boolean isTimeDistanceGreaterThanOrEqual(long left, long right) {
+    long distance = left >= right ? left - right : right - left;
+    return Long.compareUnsigned(distance, intervalInCurrentPrecision) >= 0;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index 9b6e17e6e4a..e221b4a19c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -76,7 +76,7 @@ public class TwoStageAggregateSender implements AutoCloseable 
{
     final boolean endPointsChanged = tryFetchEndPointsIfNecessary();
     tryConstructClients(endPointsChanged);
 
-    final TEndPoint endPoint = endPoints[(int) watermark % endPoints.length];
+    final TEndPoint endPoint = endPoints[(int) Math.floorMod(watermark, 
endPoints.length)];
     IoTDBSyncClient client = endPointIoTDBSyncClientMap.get(endPoint);
     if (client == null) {
       client = reconstructIoTDBSyncClient(endPoint);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
index aaf5ce4db5a..a73198c2713 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
@@ -78,17 +78,21 @@ public class PipeTsFileResourceSegmentLock {
 
   public void lock(final File file) {
     initIfNecessary();
-    locks[Math.abs(file.hashCode()) % locks.length].lock();
+    locks[getLockIndex(file)].lock();
   }
 
   public boolean tryLock(final File file, final long timeout, final TimeUnit 
timeUnit)
       throws InterruptedException {
     initIfNecessary();
-    return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout, 
timeUnit);
+    return locks[getLockIndex(file)].tryLock(timeout, timeUnit);
   }
 
   public void unlock(final File file) {
     initIfNecessary();
-    locks[Math.abs(file.hashCode()) % locks.length].unlock();
+    locks[getLockIndex(file)].unlock();
+  }
+
+  private int getLockIndex(final File file) {
+    return (int) (Math.abs((long) file.hashCode()) % locks.length);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
index f99e01e8d5a..af9812e0a17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
@@ -270,13 +270,9 @@ public class ExtremeAccumulator implements Accumulator {
   }
 
   private void updateIntResult(int extVal) {
-    int absExtVal = Math.abs(extVal);
     int candidateResult = extremeResult.getInt();
-    int absCandidateResult = Math.abs(extremeResult.getInt());
 
-    if (!initResult
-        || (absExtVal > absCandidateResult)
-        || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+    if (!initResult || compareExtreme(extVal, candidateResult) > 0) {
       initResult = true;
       extremeResult.setInt(extVal);
     }
@@ -295,13 +291,9 @@ public class ExtremeAccumulator implements Accumulator {
   }
 
   private void updateLongResult(long extVal) {
-    long absExtVal = Math.abs(extVal);
     long candidateResult = extremeResult.getLong();
-    long absCandidateResult = Math.abs(extremeResult.getLong());
 
-    if (!initResult
-        || (absExtVal > absCandidateResult)
-        || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+    if (!initResult || compareExtreme(extVal, candidateResult) > 0) {
       initResult = true;
       extremeResult.setLong(extVal);
     }
@@ -356,4 +348,24 @@ public class ExtremeAccumulator implements Accumulator {
       extremeResult.setDouble(extVal);
     }
   }
+
+  private int compareExtreme(int left, int right) {
+    int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) 
right));
+    return absComparison == 0 ? Integer.compare(left, right) : absComparison;
+  }
+
+  private int compareExtreme(long left, long right) {
+    int absComparison = compareAbs(left, right);
+    return absComparison == 0 ? Long.compare(left, right) : absComparison;
+  }
+
+  private int compareAbs(long left, long right) {
+    if (left == Long.MIN_VALUE) {
+      return right == Long.MIN_VALUE ? 0 : 1;
+    }
+    if (right == Long.MIN_VALUE) {
+      return -1;
+    }
+    return Long.compare(Math.abs(left), Math.abs(right));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
index e46a2597b22..8a11ffc7739 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
@@ -77,26 +77,14 @@ public class SlidingWindowAggregatorFactory {
         (o1, o2) -> {
           int value1 = o1.getInt(0);
           int value2 = o2.getInt(0);
-          if (Math.abs(value1) > Math.abs(value2)
-              || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
-            return 1;
-          } else if (value1 == value2) {
-            return 0;
-          }
-          return -1;
+          return compareExtreme(value1, value2);
         });
     extremeComparators.put(
         TSDataType.INT64,
         (o1, o2) -> {
           long value1 = o1.getLong(0);
           long value2 = o2.getLong(0);
-          if (Math.abs(value1) > Math.abs(value2)
-              || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
-            return 1;
-          } else if (value1 == value2) {
-            return 0;
-          }
-          return -1;
+          return compareExtreme(value1, value2);
         });
     extremeComparators.put(
         TSDataType.FLOAT,
@@ -249,4 +237,24 @@ public class SlidingWindowAggregatorFactory {
             DataNodeQueryMessages.INVALID_AGGREGATION_TYPE + aggregationType);
     }
   }
+
+  static int compareExtreme(int left, int right) {
+    int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) 
right));
+    return absComparison == 0 ? Integer.compare(left, right) : absComparison;
+  }
+
+  static int compareExtreme(long left, long right) {
+    int absComparison = compareAbs(left, right);
+    return absComparison == 0 ? Long.compare(left, right) : absComparison;
+  }
+
+  private static int compareAbs(long left, long right) {
+    if (left == Long.MIN_VALUE) {
+      return right == Long.MIN_VALUE ? 0 : 1;
+    }
+    if (right == Long.MIN_VALUE) {
+      return -1;
+    }
+    return Long.compare(Math.abs(left), Math.abs(right));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java
index 4c5e004c60d..59f955580b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java
@@ -54,9 +54,9 @@ public class SessionWindow implements IWindow {
       return true;
     }
     if (index == 0) {
-      return Math.abs(column.getLong(index) - lastTsBlockTime) <= timeInterval;
+      return isTimeDistanceLessThanOrEqual(column.getLong(index), 
lastTsBlockTime);
     }
-    return Math.abs(column.getLong(index) - column.getLong(index - 1)) <= 
timeInterval;
+    return isTimeDistanceLessThanOrEqual(column.getLong(index), 
column.getLong(index - 1));
   }
 
   @Override
@@ -92,8 +92,8 @@ public class SessionWindow implements IWindow {
     long maxTime = Math.max(columnStartTime, columnEndTime);
 
     boolean contains =
-        Math.abs(columnStartTime - lastTsBlockTime) < timeInterval
-            && maxTime - minTime <= timeInterval;
+        isTimeDistanceLessThan(columnStartTime, lastTsBlockTime)
+            && isTimeDistanceLessThanOrEqual(maxTime, minTime);
     if (contains) {
       if (!initializedTimeValue) {
         startTime = Long.MAX_VALUE;
@@ -113,6 +113,22 @@ public class SessionWindow implements IWindow {
     return timeInterval;
   }
 
+  boolean isTimeDistanceLessThanOrEqual(long left, long right) {
+    return compareTimeDistance(left, right) <= 0;
+  }
+
+  private boolean isTimeDistanceLessThan(long left, long right) {
+    return compareTimeDistance(left, right) < 0;
+  }
+
+  private int compareTimeDistance(long left, long right) {
+    if (timeInterval < 0) {
+      return 1;
+    }
+    long distance = left >= right ? left - right : right - left;
+    return Long.compareUnsigned(distance, timeInterval);
+  }
+
   public long getTimeValue() {
     return timeValue;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java
index 42ca3becedd..bef50233b22 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java
@@ -93,7 +93,7 @@ public class SessionWindowManager implements IWindowManager {
 
     for (; i < size; i++) {
       long currentTime = timeColumn.getLong(i);
-      if (Math.abs(currentTime - previousTimeValue) > 
sessionWindow.getTimeInterval()) {
+      if (!sessionWindow.isTimeDistanceLessThanOrEqual(currentTime, 
previousTimeValue)) {
         sessionWindow.setTimeValue(previousTimeValue);
         break;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
index e5bd6ca38c2..0fddb41d6e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
@@ -138,7 +138,8 @@ public abstract class AbstractFragmentParallelPlanner 
implements IFragmentParall
     if (!selectRandomDataNode || queryContext.getSession() == null) {
       targetIndex = 0;
     } else {
-      targetIndex = (int) (queryContext.getSession().getSessionId() % 
availableDataNodes.size());
+      targetIndex =
+          (int) Math.floorMod(queryContext.getSession().getSessionId(), 
availableDataNodes.size());
     }
     return availableDataNodes.get(targetIndex);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
index 194eb5a8fa5..0c91492e4b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
@@ -65,11 +65,11 @@ public class SubscriptionPipeEventBatchSegmentLock {
 
   public void lock(final int regionId) {
     initIfNecessary();
-    locks[regionId % locks.length].lock();
+    locks[Math.floorMod(regionId, locks.length)].lock();
   }
 
   public void unlock(final int regionId) {
     initIfNecessary();
-    locks[regionId % locks.length].unlock();
+    locks[Math.floorMod(regionId, locks.length)].unlock();
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java
new file mode 100644
index 00000000000..2bf4da9957c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.changing;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+
+public class ChangingValueFilterTest {
+
+  @Test
+  public void testExtremeTimestampDistanceReachesMaxInterval() throws 
Exception {
+    final ChangingValueFilter<Integer> filter =
+        new ChangingValueFilter<>(createProcessor(0, Long.MAX_VALUE, 0), 
Long.MIN_VALUE, 0);
+
+    Assert.assertTrue(filter.filter(Long.MAX_VALUE, 0));
+  }
+
+  private ChangingValueSamplingProcessor createProcessor(
+      final long compressionMinTimeInterval,
+      final long compressionMaxTimeInterval,
+      final double compressionDeviation)
+      throws Exception {
+    final ChangingValueSamplingProcessor processor = new 
ChangingValueSamplingProcessor();
+    setField(processor, "compressionMinTimeInterval", 
compressionMinTimeInterval);
+    setField(processor, "compressionMaxTimeInterval", 
compressionMaxTimeInterval);
+    setField(processor, "compressionDeviation", compressionDeviation);
+    return processor;
+  }
+
+  private void setField(final Object target, final String fieldName, final 
Object value)
+      throws Exception {
+    final Field field = target.getClass().getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java
new file mode 100644
index 00000000000..cd04df04194
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.sdt;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+
+public class SwingingDoorTrendingFilterTest {
+
+  @Test
+  public void testExtremeTimestampDistanceReachesMaxInterval() throws 
Exception {
+    final SwingingDoorTrendingFilter<Integer> filter =
+        new SwingingDoorTrendingFilter<>(createProcessor(0, Long.MAX_VALUE, 
0), Long.MIN_VALUE, 0);
+
+    Assert.assertTrue(filter.filter(Long.MAX_VALUE, 0));
+  }
+
+  private SwingingDoorTrendingSamplingProcessor createProcessor(
+      final long compressionMinTimeInterval,
+      final long compressionMaxTimeInterval,
+      final double compressionDeviation)
+      throws Exception {
+    final SwingingDoorTrendingSamplingProcessor processor =
+        new SwingingDoorTrendingSamplingProcessor();
+    setField(processor, "compressionMinTimeInterval", 
compressionMinTimeInterval);
+    setField(processor, "compressionMaxTimeInterval", 
compressionMaxTimeInterval);
+    setField(processor, "compressionDeviation", compressionDeviation);
+    return processor;
+  }
+
+  private void setField(final Object target, final String fieldName, final 
Object value)
+      throws Exception {
+    final Field field = target.getClass().getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java
new file mode 100644
index 00000000000..df18dba434e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.tumbling;
+
+import 
org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Binary;
+import org.apache.iotdb.pipe.api.type.Type;
+
+import org.apache.tsfile.read.common.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.LocalDate;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TumblingTimeSamplingProcessorTest {
+
+  @Test
+  public void testExtremeTimestampDistanceReachesInterval() throws Exception {
+    final TumblingTimeSamplingProcessor processor = new 
TumblingTimeSamplingProcessor();
+    final TestLastObjectCache cache = new TestLastObjectCache();
+    setField(processor, "intervalInCurrentPrecision", Long.MAX_VALUE);
+    setField(processor, "pathLastObjectCache", cache);
+
+    try {
+      final CountingRowCollector rowCollector = new CountingRowCollector();
+
+      processor.processRow(
+          new TestRow(Long.MIN_VALUE), rowCollector, "root.db.d1", new 
AtomicReference<>());
+      processor.processRow(
+          new TestRow(Long.MAX_VALUE), rowCollector, "root.db.d1", new 
AtomicReference<>());
+      processor.processRow(
+          new TestRow(Long.MAX_VALUE - 1), rowCollector, "root.db.d1", new 
AtomicReference<>());
+
+      Assert.assertEquals(2, rowCollector.getCollectedRowCount());
+    } finally {
+      cache.close();
+    }
+  }
+
+  private void setField(final Object target, final String fieldName, final 
Object value)
+      throws Exception {
+    final Field field = target.getClass().getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+
+  private static class TestLastObjectCache extends 
PartialPathLastObjectCache<Long> {
+
+    private TestLastObjectCache() {
+      super(1024);
+    }
+
+    @Override
+    protected long calculateMemoryUsage(final Long object) {
+      return Long.BYTES;
+    }
+  }
+
+  private static class CountingRowCollector implements RowCollector {
+
+    private final AtomicInteger collectedRowCount = new AtomicInteger();
+
+    @Override
+    public void collectRow(final Row row) throws IOException {
+      collectedRowCount.incrementAndGet();
+    }
+
+    private int getCollectedRowCount() {
+      return collectedRowCount.get();
+    }
+  }
+
+  private static class TestRow implements Row {
+
+    private final long timestamp;
+
+    private TestRow(final long timestamp) {
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public long getTime() {
+      return timestamp;
+    }
+
+    @Override
+    public int getInt(final int columnIndex) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LocalDate getDate(final int columnIndex) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getLong(final int columnIndex) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public float getFloat(final int columnIndex) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public double getDouble(final int columnIndex) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean getBoolean(final int columnIndex) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Binary getBinary(final int columnIndex) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getString(final int columnIndex) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Object getObject(final int columnIndex) {
+      return 1;
+    }
+
+    @Override
+    public Type getDataType(final int columnIndex) {
+      return Type.INT32;
+    }
+
+    @Override
+    public boolean isNull(final int columnIndex) {
+      return false;
+    }
+
+    @Override
+    public int size() {
+      return 1;
+    }
+
+    @Override
+    public int getColumnIndex(final Path columnName) throws 
PipeParameterNotValidException {
+      return 0;
+    }
+
+    @Override
+    public String getColumnName(final int columnIndex) {
+      return "s1";
+    }
+
+    @Override
+    public List<Type> getColumnTypes() {
+      return Collections.singletonList(Type.INT32);
+    }
+
+    @Override
+    public String getDeviceId() {
+      return "root.db.d1";
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java
new file mode 100644
index 00000000000..e1b53e0dc6c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.tsfile;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+public class PipeTsFileResourceSegmentLockTest {
+
+  @Test
+  public void minValueHashCodeTest() throws InterruptedException {
+    int originalSegmentLockNum =
+        
CommonDescriptor.getInstance().getConfig().getPipeTsFileResourceSegmentLockNum();
+    
CommonDescriptor.getInstance().getConfig().setPipeTsFileResourceSegmentLockNum(32);
+
+    try {
+      PipeTsFileResourceSegmentLock segmentLock = new 
PipeTsFileResourceSegmentLock();
+      File file = new 
MinValueHashCodeFile("target/min-value-hash-code.tsfile");
+
+      Assert.assertEquals(Integer.MIN_VALUE, file.hashCode());
+
+      segmentLock.lock(file);
+      try {
+        Assert.assertTrue(segmentLock.tryLock(file, 1, TimeUnit.MILLISECONDS));
+        segmentLock.unlock(file);
+      } finally {
+        segmentLock.unlock(file);
+      }
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeTsFileResourceSegmentLockNum(originalSegmentLockNum);
+    }
+  }
+
+  private static class MinValueHashCodeFile extends File {
+
+    private static final long serialVersionUID = 1L;
+
+    private MinValueHashCodeFile(String pathname) {
+      super(pathname);
+    }
+
+    @Override
+    public int hashCode() {
+      return Integer.MIN_VALUE;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
index c81981eb161..108f5822448 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
@@ -271,6 +271,52 @@ public class AccumulatorTest {
     Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
   }
 
+  @Test
+  public void extremeAccumulatorMinValueTest() {
+    Accumulator intAccumulator =
+        AccumulatorFactory.createBuiltinAccumulator(
+            TAggregationType.EXTREME,
+            Collections.singletonList(TSDataType.INT32),
+            Collections.emptyList(),
+            Collections.emptyMap(),
+            true);
+    TsBlockBuilder intBlockBuilder =
+        new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    intBlockBuilder.getTimeColumnBuilder().writeLong(0);
+    intBlockBuilder.getValueColumnBuilders()[0].writeInt(Integer.MAX_VALUE);
+    intBlockBuilder.declarePosition();
+    intBlockBuilder.getTimeColumnBuilder().writeLong(1);
+    intBlockBuilder.getValueColumnBuilders()[0].writeInt(Integer.MIN_VALUE);
+    intBlockBuilder.declarePosition();
+    TsBlock intBlock = intBlockBuilder.build();
+    intAccumulator.addInput(new Column[] {intBlock.getTimeColumn(), 
intBlock.getColumn(0)}, null);
+    ColumnBuilder intFinalResult = new IntColumnBuilder(null, 1);
+    intAccumulator.outputFinal(intFinalResult);
+    Assert.assertEquals(Integer.MIN_VALUE, intFinalResult.build().getInt(0));
+
+    Accumulator longAccumulator =
+        AccumulatorFactory.createBuiltinAccumulator(
+            TAggregationType.EXTREME,
+            Collections.singletonList(TSDataType.INT64),
+            Collections.emptyList(),
+            Collections.emptyMap(),
+            true);
+    TsBlockBuilder longBlockBuilder =
+        new TsBlockBuilder(Collections.singletonList(TSDataType.INT64));
+    longBlockBuilder.getTimeColumnBuilder().writeLong(0);
+    longBlockBuilder.getValueColumnBuilders()[0].writeLong(Long.MAX_VALUE);
+    longBlockBuilder.declarePosition();
+    longBlockBuilder.getTimeColumnBuilder().writeLong(1);
+    longBlockBuilder.getValueColumnBuilders()[0].writeLong(Long.MIN_VALUE);
+    longBlockBuilder.declarePosition();
+    TsBlock longBlock = longBlockBuilder.build();
+    longAccumulator.addInput(
+        new Column[] {longBlock.getTimeColumn(), longBlock.getColumn(0)}, 
null);
+    ColumnBuilder longFinalResult = new LongColumnBuilder(null, 1);
+    longAccumulator.outputFinal(longFinalResult);
+    Assert.assertEquals(Long.MIN_VALUE, longFinalResult.build().getLong(0));
+  }
+
   @Test
   public void firstValueAccumulatorTest() {
     Accumulator firstValueAccumulator =
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java
new file mode 100644
index 00000000000..a029441b4dc
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.aggregation.slidingwindow;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SlidingWindowAggregatorFactoryTest {
+
+  @Test
+  public void compareExtremeMinValueTest() {
+    Assert.assertTrue(
+        SlidingWindowAggregatorFactory.compareExtreme(Integer.MIN_VALUE, 
Integer.MAX_VALUE) > 0);
+    Assert.assertTrue(
+        SlidingWindowAggregatorFactory.compareExtreme(Integer.MAX_VALUE, 
Integer.MIN_VALUE) < 0);
+    Assert.assertTrue(SlidingWindowAggregatorFactory.compareExtreme(1, -1) > 
0);
+
+    Assert.assertTrue(
+        SlidingWindowAggregatorFactory.compareExtreme(Long.MIN_VALUE, 
Long.MAX_VALUE) > 0);
+    Assert.assertTrue(
+        SlidingWindowAggregatorFactory.compareExtreme(Long.MAX_VALUE, 
Long.MIN_VALUE) < 0);
+    Assert.assertTrue(SlidingWindowAggregatorFactory.compareExtreme(1L, -1L) > 
0);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java
new file mode 100644
index 00000000000..ea534a76ebc
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.window;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class SessionWindowTest {
+
+  @Test
+  public void satisfyHandlesOverflowedTimeDistance() {
+    SessionWindow window = new SessionWindow(1, true);
+    Column timeColumn = buildTimeColumn(Long.MIN_VALUE, Long.MAX_VALUE);
+    window.mergeOnePoint(new Column[] {timeColumn}, 0);
+
+    Assert.assertFalse(window.satisfy(timeColumn, 1));
+  }
+
+  @Test
+  public void skipPointsOutOfCurWindowHandlesOverflowedTimeDistance() {
+    SessionWindowManager manager = new SessionWindowManager(false, 1, true);
+    manager.initCurWindow();
+    Column previousTimeColumn = buildTimeColumn(Long.MIN_VALUE);
+    manager.getCurWindow().mergeOnePoint(new Column[] {previousTimeColumn}, 0);
+    manager.next();
+
+    TsBlock nextBlock = buildTsBlock(Long.MAX_VALUE);
+    TsBlock skippedBlock = manager.skipPointsOutOfCurWindow(nextBlock);
+
+    Assert.assertEquals(1, skippedBlock.getPositionCount());
+    Assert.assertEquals(Long.MAX_VALUE, 
skippedBlock.getTimeColumn().getLong(0));
+  }
+
+  private Column buildTimeColumn(long... timestamps) {
+    return buildTsBlock(timestamps).getTimeColumn();
+  }
+
+  private TsBlock buildTsBlock(long... timestamps) {
+    TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    for (long timestamp : timestamps) {
+      builder.getTimeColumnBuilder().writeLong(timestamp);
+      builder.getColumnBuilder(0).appendNull();
+      builder.declarePosition();
+    }
+    return builder.build();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java
new file mode 100644
index 00000000000..7488f5ffa26
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.batch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class SubscriptionPipeEventBatchSegmentLockTest {
+
+  @Test
+  public void negativeRegionIdTest() throws Exception {
+    final SubscriptionPipeEventBatchSegmentLock segmentLock =
+        new SubscriptionPipeEventBatchSegmentLock();
+
+    segmentLock.lock(-1);
+    try {
+      final ReentrantLock[] locks = getLocks(segmentLock);
+      Assert.assertTrue(locks[locks.length - 1].isHeldByCurrentThread());
+    } finally {
+      segmentLock.unlock(-1);
+    }
+  }
+
+  private ReentrantLock[] getLocks(final SubscriptionPipeEventBatchSegmentLock 
segmentLock)
+      throws Exception {
+    final Field locksField = 
SubscriptionPipeEventBatchSegmentLock.class.getDeclaredField("locks");
+    locksField.setAccessible(true);
+    return (ReentrantLock[]) locksField.get(segmentLock);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java
index 7977b436536..b031198be4f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java
@@ -145,7 +145,7 @@ public class AsyncAINodeInternalServiceClient extends 
IAINodeRPCService.AsyncCli
           new AsyncAINodeInternalServiceClient(
               thriftClientProperty,
               endPoint,
-              tManagers[clientCnt.incrementAndGet() % tManagers.length],
+              tManagers[Math.floorMod(clientCnt.incrementAndGet(), 
tManagers.length)],
               clientManager));
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
index 26aa3c80b0b..ac7772be3e1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
@@ -190,7 +190,7 @@ public class AsyncConfigNodeInternalServiceClient extends 
IConfigNodeRPCService.
           new AsyncConfigNodeInternalServiceClient(
               thriftClientProperty,
               endPoint,
-              tManagers[clientCnt.incrementAndGet() % tManagers.length],
+              tManagers[Math.floorMod(clientCnt.incrementAndGet(), 
tManagers.length)],
               clientManager));
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
index f6096eb44cd..06127464fb7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
@@ -174,7 +174,7 @@ public class AsyncDataNodeExternalServiceClient extends 
IDataNodeRPCService.Asyn
           new AsyncDataNodeExternalServiceClient(
               thriftClientProperty,
               endPoint,
-              tManagers[clientCnt.incrementAndGet() % tManagers.length],
+              tManagers[Math.floorMod(clientCnt.incrementAndGet(), 
tManagers.length)],
               clientManager));
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 5f6aa0a6bb4..3d1566d4350 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -202,7 +202,7 @@ public class AsyncDataNodeInternalServiceClient extends 
IDataNodeRPCService.Asyn
           new AsyncDataNodeInternalServiceClient(
               thriftClientProperty,
               endPoint,
-              tManagers[clientCnt.incrementAndGet() % tManagers.length],
+              tManagers[Math.floorMod(clientCnt.incrementAndGet(), 
tManagers.length)],
               clientManager));
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 2329281062d..072a5f89a62 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -162,7 +162,7 @@ public class AsyncDataNodeMPPDataExchangeServiceClient 
extends MPPDataExchangeSe
           new AsyncDataNodeMPPDataExchangeServiceClient(
               thriftClientProperty,
               endPoint,
-              tManagers[clientCnt.incrementAndGet() % tManagers.length],
+              tManagers[Math.floorMod(clientCnt.incrementAndGet(), 
tManagers.length)],
               clientManager));
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java
index f4b02eda0f6..cc8b6f6e1bf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java
@@ -171,7 +171,7 @@ public class AsyncIoTConsensusV2ServiceClient extends 
IoTConsensusV2IService.Asy
           new AsyncIoTConsensusV2ServiceClient(
               thriftClientProperty,
               endPoint,
-              tManagers[clientCnt.incrementAndGet() % tManagers.length],
+              tManagers[Math.floorMod(clientCnt.incrementAndGet(), 
tManagers.length)],
               clientManager));
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 42bba91f1fb..7e01b08ef21 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -234,7 +234,7 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
           new AsyncPipeDataTransferServiceClient(
               thriftClientProperty,
               endPoint,
-              tManagers[clientCnt.incrementAndGet() % tManagers.length],
+              tManagers[Math.floorMod(clientCnt.incrementAndGet(), 
tManagers.length)],
               clientManager));
     }
 

Reply via email to