This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cbc7ba56b19 Load: support load tsfile with data in partition whose id
is negative (#12307)
cbc7ba56b19 is described below
commit cbc7ba56b197ff9d5dcee49d841568af974218b9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 9 23:29:28 2024 +0800
Load: support load tsfile with data in partition whose id is negative
(#12307)
---
.../pipe/it/autocreate/IoTDBPipeClusterIT.java | 63 +++++++++++++++++++++
.../db/exception/PartitionViolationException.java | 6 +-
.../storageengine/dataregion/tsfile/TsFileID.java | 18 +++---
.../tsfile/timeindex/DeviceTimeIndex.java | 64 +++++++++++++---------
.../dataregion/tsfile/timeindex/FileTimeIndex.java | 22 +++-----
.../dataregion/tsfile/timeindex/ITimeIndex.java | 2 -
6 files changed, 123 insertions(+), 52 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index e0194f8c591..9113e3669ba 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -862,4 +862,67 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualAutoIT {
Assert.assertEquals(pipeCount, showPipeResult.size());
}
}
+
+ @Test
+ public void testNegativeTimestamp() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values (0, 1)",
+ "insert into root.db.d1(time, s1) values (-1, 2)",
+ "insert into root.db.d1(time, s1) values
(1960-01-02T10:00:00+08:00, 2)",
+ "flush"))) {
+ return;
+ }
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor", "iotdb-extractor");
+
+ processorAttributes.put("processor", "do-nothing-processor");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("3,"));
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values (-123, 3)",
+ "insert into root.db.d1(time, s1) values (now(), 3)",
+ "flush"))) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("5,"));
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
index 75d1f954e02..8b4c8dfb8ab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
@@ -22,6 +22,10 @@ package org.apache.iotdb.db.exception;
public class PartitionViolationException extends LoadFileException {
public PartitionViolationException(String file) {
- super(String.format("The data of file %s cross partitions", file));
+ super(String.format("The data of file %s crosses partitions", file));
+ }
+
+ public PartitionViolationException() {
+ super("The data of file crosses partitions");
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
index 9a315732074..7dcbbde8fb8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
@@ -19,16 +19,11 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static org.apache.iotdb.tsfile.utils.FilePathUtils.splitTsFilePath;
public class TsFileID {
- private static final Logger LOGGER = LoggerFactory.getLogger(TsFileID.class);
-
public final int regionId;
public final long timePartitionId;
public final long fileVersion;
@@ -66,12 +61,17 @@ public class TsFileID {
// ignore, load will get in here
}
}
-
this.regionId = tmpRegionId;
this.timePartitionId = tmpTimePartitionId;
- long[] arr = splitAndGetVersionArray(pathSegments[pathLength - 1]);
- this.fileVersion = arr[0];
- this.compactionVersion = arr[1];
+
+ long[] arr = null;
+ try {
+ arr = splitAndGetVersionArray(pathSegments[pathLength - 1]);
+ } catch (NumberFormatException e) {
+ // ignore, load will get in here
+ }
+ this.fileVersion = arr == null || arr.length != 2 ? -1 : arr[0];
+ this.compactionVersion = arr == null || arr.length != 2 ? -1 : arr[1];
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/DeviceTimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/DeviceTimeIndex.java
index 024d4154f1e..777d092fee3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/DeviceTimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/DeviceTimeIndex.java
@@ -261,40 +261,52 @@ public class DeviceTimeIndex implements ITimeIndex {
}
}
- /** @return the time partition id, if spans multi time partitions, return
-1. */
- private long getTimePartitionWithCheck() {
- long partitionId = SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
- for (int index : deviceToIndex.values()) {
- long p = TimePartitionUtils.getTimePartitionId(startTimes[index]);
- if (partitionId == SPANS_MULTI_TIME_PARTITIONS_FLAG_ID) {
- partitionId = p;
- } else {
- if (partitionId != p) {
- return SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
- }
- }
-
- p = TimePartitionUtils.getTimePartitionId(endTimes[index]);
- if (partitionId != p) {
- return SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
- }
- }
- return partitionId;
- }
-
@Override
public long getTimePartitionWithCheck(String tsFilePath) throws
PartitionViolationException {
- long partitionId = getTimePartitionWithCheck();
- if (partitionId == SPANS_MULTI_TIME_PARTITIONS_FLAG_ID) {
+ try {
+ return getTimePartitionWithCheck();
+ } catch (PartitionViolationException e) {
throw new PartitionViolationException(tsFilePath);
}
- return partitionId;
}
@Override
public boolean isSpanMultiTimePartitions() {
- long partitionId = getTimePartitionWithCheck();
- return partitionId == SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
+ try {
+ getTimePartitionWithCheck();
+ return false;
+ } catch (PartitionViolationException e) {
+ return true;
+ }
+ }
+
+ private long getTimePartitionWithCheck() throws PartitionViolationException {
+ Long partitionId = null;
+
+ for (final int index : deviceToIndex.values()) {
+ final long startTimePartitionId =
TimePartitionUtils.getTimePartitionId(startTimes[index]);
+ final long endTimePartitionId =
TimePartitionUtils.getTimePartitionId(endTimes[index]);
+
+ if (startTimePartitionId != endTimePartitionId) {
+ throw new PartitionViolationException();
+ }
+
+ if (partitionId == null) {
+ partitionId = startTimePartitionId;
+ continue;
+ }
+
+ if (partitionId != startTimePartitionId) {
+ throw new PartitionViolationException();
+ }
+ }
+
+ // Just in case
+ if (partitionId == null) {
+ throw new PartitionViolationException();
+ }
+
+ return partitionId;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
index d397dd384c2..65aa8153a2a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
@@ -142,28 +142,22 @@ public class FileTimeIndex implements ITimeIndex {
}
}
- private long getTimePartitionWithCheck() {
- long startPartitionId = TimePartitionUtils.getTimePartitionId(startTime);
- long endPartitionId = TimePartitionUtils.getTimePartitionId(endTime);
+ @Override
+ public long getTimePartitionWithCheck(String tsFilePath) throws
PartitionViolationException {
+ final long startPartitionId =
TimePartitionUtils.getTimePartitionId(startTime);
+ final long endPartitionId = TimePartitionUtils.getTimePartitionId(endTime);
+
if (startPartitionId == endPartitionId) {
return startPartitionId;
}
- return SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
- }
- @Override
- public long getTimePartitionWithCheck(String tsFilePath) throws
PartitionViolationException {
- long partitionId = getTimePartitionWithCheck();
- if (partitionId == SPANS_MULTI_TIME_PARTITIONS_FLAG_ID) {
- throw new PartitionViolationException(tsFilePath);
- }
- return partitionId;
+ throw new PartitionViolationException(tsFilePath);
}
@Override
public boolean isSpanMultiTimePartitions() {
- long partitionId = getTimePartitionWithCheck();
- return partitionId == SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
+ return TimePartitionUtils.getTimePartitionId(startTime)
+ != TimePartitionUtils.getTimePartitionId(endTime);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
index 84169b1087b..0edbe81745d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
@@ -37,8 +37,6 @@ public interface ITimeIndex {
byte DEVICE_TIME_INDEX_TYPE = 1;
byte FILE_TIME_INDEX_TYPE = 2;
- int SPANS_MULTI_TIME_PARTITIONS_FLAG_ID = -1;
-
/**
* serialize to outputStream
*