This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cbc7ba56b19 Load: support load tsfile with data in partition whose id 
is negative (#12307)
cbc7ba56b19 is described below

commit cbc7ba56b197ff9d5dcee49d841568af974218b9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 9 23:29:28 2024 +0800

    Load: support load tsfile with data in partition whose id is negative 
(#12307)
---
 .../pipe/it/autocreate/IoTDBPipeClusterIT.java     | 63 +++++++++++++++++++++
 .../db/exception/PartitionViolationException.java  |  6 +-
 .../storageengine/dataregion/tsfile/TsFileID.java  | 18 +++---
 .../tsfile/timeindex/DeviceTimeIndex.java          | 64 +++++++++++++---------
 .../dataregion/tsfile/timeindex/FileTimeIndex.java | 22 +++-----
 .../dataregion/tsfile/timeindex/ITimeIndex.java    |  2 -
 6 files changed, 123 insertions(+), 52 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index e0194f8c591..9113e3669ba 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -862,4 +862,67 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
       Assert.assertEquals(pipeCount, showPipeResult.size());
     }
   }
+
+  @Test
+  public void testNegativeTimestamp() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s1) values (0, 1)",
+              "insert into root.db.d1(time, s1) values (-1, 2)",
+              "insert into root.db.d1(time, s1) values 
(1960-01-02T10:00:00+08:00, 2)",
+              "flush"))) {
+        return;
+      }
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor", "iotdb-extractor");
+
+      processorAttributes.put("processor", "do-nothing-processor");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("3,"));
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s1) values (-123, 3)",
+              "insert into root.db.d1(time, s1) values (now(), 3)",
+              "flush"))) {
+        return;
+      }
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("5,"));
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
index 75d1f954e02..8b4c8dfb8ab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
@@ -22,6 +22,10 @@ package org.apache.iotdb.db.exception;
 public class PartitionViolationException extends LoadFileException {
 
   public PartitionViolationException(String file) {
-    super(String.format("The data of file %s cross partitions", file));
+    super(String.format("The data of file %s crosses partitions", file));
+  }
+
+  public PartitionViolationException() {
+    super("The data of file crosses partitions");
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
index 9a315732074..7dcbbde8fb8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
@@ -19,16 +19,11 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
 import static org.apache.iotdb.tsfile.utils.FilePathUtils.splitTsFilePath;
 
 public class TsFileID {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(TsFileID.class);
-
   public final int regionId;
   public final long timePartitionId;
   public final long fileVersion;
@@ -66,12 +61,17 @@ public class TsFileID {
         // ignore, load will get in here
       }
     }
-
     this.regionId = tmpRegionId;
     this.timePartitionId = tmpTimePartitionId;
-    long[] arr = splitAndGetVersionArray(pathSegments[pathLength - 1]);
-    this.fileVersion = arr[0];
-    this.compactionVersion = arr[1];
+
+    long[] arr = null;
+    try {
+      arr = splitAndGetVersionArray(pathSegments[pathLength - 1]);
+    } catch (NumberFormatException e) {
+      // ignore, load will get in here
+    }
+    this.fileVersion = arr == null || arr.length != 2 ? -1 : arr[0];
+    this.compactionVersion = arr == null || arr.length != 2 ? -1 : arr[1];
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/DeviceTimeIndex.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/DeviceTimeIndex.java
index 024d4154f1e..777d092fee3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/DeviceTimeIndex.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/DeviceTimeIndex.java
@@ -261,40 +261,52 @@ public class DeviceTimeIndex implements ITimeIndex {
     }
   }
 
-  /** @return the time partition id, if spans multi time partitions, return 
-1. */
-  private long getTimePartitionWithCheck() {
-    long partitionId = SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
-    for (int index : deviceToIndex.values()) {
-      long p = TimePartitionUtils.getTimePartitionId(startTimes[index]);
-      if (partitionId == SPANS_MULTI_TIME_PARTITIONS_FLAG_ID) {
-        partitionId = p;
-      } else {
-        if (partitionId != p) {
-          return SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
-        }
-      }
-
-      p = TimePartitionUtils.getTimePartitionId(endTimes[index]);
-      if (partitionId != p) {
-        return SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
-      }
-    }
-    return partitionId;
-  }
-
   @Override
   public long getTimePartitionWithCheck(String tsFilePath) throws 
PartitionViolationException {
-    long partitionId = getTimePartitionWithCheck();
-    if (partitionId == SPANS_MULTI_TIME_PARTITIONS_FLAG_ID) {
+    try {
+      return getTimePartitionWithCheck();
+    } catch (PartitionViolationException e) {
       throw new PartitionViolationException(tsFilePath);
     }
-    return partitionId;
   }
 
   @Override
   public boolean isSpanMultiTimePartitions() {
-    long partitionId = getTimePartitionWithCheck();
-    return partitionId == SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
+    try {
+      getTimePartitionWithCheck();
+      return false;
+    } catch (PartitionViolationException e) {
+      return true;
+    }
+  }
+
+  private long getTimePartitionWithCheck() throws PartitionViolationException {
+    Long partitionId = null;
+
+    for (final int index : deviceToIndex.values()) {
+      final long startTimePartitionId = 
TimePartitionUtils.getTimePartitionId(startTimes[index]);
+      final long endTimePartitionId = 
TimePartitionUtils.getTimePartitionId(endTimes[index]);
+
+      if (startTimePartitionId != endTimePartitionId) {
+        throw new PartitionViolationException();
+      }
+
+      if (partitionId == null) {
+        partitionId = startTimePartitionId;
+        continue;
+      }
+
+      if (partitionId != startTimePartitionId) {
+        throw new PartitionViolationException();
+      }
+    }
+
+    // Just in case
+    if (partitionId == null) {
+      throw new PartitionViolationException();
+    }
+
+    return partitionId;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
index d397dd384c2..65aa8153a2a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
@@ -142,28 +142,22 @@ public class FileTimeIndex implements ITimeIndex {
     }
   }
 
-  private long getTimePartitionWithCheck() {
-    long startPartitionId = TimePartitionUtils.getTimePartitionId(startTime);
-    long endPartitionId = TimePartitionUtils.getTimePartitionId(endTime);
+  @Override
+  public long getTimePartitionWithCheck(String tsFilePath) throws 
PartitionViolationException {
+    final long startPartitionId = 
TimePartitionUtils.getTimePartitionId(startTime);
+    final long endPartitionId = TimePartitionUtils.getTimePartitionId(endTime);
+
     if (startPartitionId == endPartitionId) {
       return startPartitionId;
     }
-    return SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
-  }
 
-  @Override
-  public long getTimePartitionWithCheck(String tsFilePath) throws 
PartitionViolationException {
-    long partitionId = getTimePartitionWithCheck();
-    if (partitionId == SPANS_MULTI_TIME_PARTITIONS_FLAG_ID) {
-      throw new PartitionViolationException(tsFilePath);
-    }
-    return partitionId;
+    throw new PartitionViolationException(tsFilePath);
   }
 
   @Override
   public boolean isSpanMultiTimePartitions() {
-    long partitionId = getTimePartitionWithCheck();
-    return partitionId == SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
+    return TimePartitionUtils.getTimePartitionId(startTime)
+        != TimePartitionUtils.getTimePartitionId(endTime);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
index 84169b1087b..0edbe81745d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
@@ -37,8 +37,6 @@ public interface ITimeIndex {
   byte DEVICE_TIME_INDEX_TYPE = 1;
   byte FILE_TIME_INDEX_TYPE = 2;
 
-  int SPANS_MULTI_TIME_PARTITIONS_FLAG_ID = -1;
-
   /**
    * serialize to outputStream
    *

Reply via email to