This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e45c251ecb9 Fix TsFileResource endTime error when insert empty tablet
and flush (#11404)
e45c251ecb9 is described below
commit e45c251ecb9d52e386ba55ed2e24ae7f9d70525a
Author: Haonan <[email protected]>
AuthorDate: Sat Oct 28 15:20:11 2023 +0800
Fix TsFileResource endTime error when insert empty tablet and flush (#11404)
---
.../db/storageengine/dataregion/DataRegion.java | 2 +-
.../dataregion/memtable/AbstractMemTable.java | 7 +-
.../dataregion/tsfile/TsFileResource.java | 4 ++
.../storageengine/dataregion/DataRegionTest.java | 82 ++++++++++++++++++++++
4 files changed, 93 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 8bce9f4f6ee..0eed0ceefcf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2053,7 +2053,7 @@ public class DataRegion implements IDataRegionForQuery {
closeQueryLock.writeLock().lock();
try {
tsFileProcessor.close();
- if (tsFileProcessor.isEmpty()) {
+ if (tsFileProcessor.isEmpty() ||
tsFileProcessor.getTsFileResource().isEmpty()) {
try {
fsFactory.deleteIfExists(tsFileProcessor.getTsFileResource().getTsFile());
tsFileManager.remove(tsFileProcessor.getTsFileResource(),
tsFileProcessor.isSequence());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index f3264c09738..71863d10997 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -660,7 +660,12 @@ public abstract class AbstractMemTable implements
IMemTable {
public Map<String, Long> getMaxTime() {
Map<String, Long> latestTimeForEachDevice = new HashMap<>();
for (Entry<IDeviceID, IWritableMemChunkGroup> entry :
memTableMap.entrySet()) {
- latestTimeForEachDevice.put(entry.getKey().toStringID(),
entry.getValue().getMaxTime());
+ // When insert null values in to IWritableMemChunkGroup, the maxTime
will not be updated.
+ // In this scenario, the maxTime will be Long.MIN_VALUE. We shouldn't
return this device.
+ long maxTime = entry.getValue().getMaxTime();
+ if (maxTime != Long.MIN_VALUE) {
+ latestTimeForEachDevice.put(entry.getKey().toStringID(), maxTime);
+ }
}
return latestTimeForEachDevice;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index cd8eb0b65f9..2875fabd178 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -1167,6 +1167,10 @@ public class TsFileResource {
return maxProgressIndex == null ? MinimumProgressIndex.INSTANCE :
maxProgressIndex;
}
+ public boolean isEmpty() {
+ return getDevices().isEmpty();
+ }
+
public String getDatabaseName() {
return file.getParentFile().getParentFile().getParentFile().getName();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 4e7b678298c..f898ab3b80e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -302,6 +303,87 @@ public class DataRegionTest {
}
}
+ @Test
+ public void testEmptyTabletWriteAndSyncClose()
+ throws QueryProcessException, IllegalPathException,
WriteProcessException {
+ String[] measurements = new String[2];
+ measurements[0] = "s0";
+ measurements[1] = "s1";
+ TSDataType[] dataTypes = new TSDataType[2];
+ dataTypes[0] = TSDataType.INT32;
+ dataTypes[1] = TSDataType.INT64;
+
+ MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
+ measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32,
TSEncoding.PLAIN);
+ measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64,
TSEncoding.PLAIN);
+
+ long[] times = new long[100];
+ Object[] columns = new Object[2];
+ columns[0] = new int[100];
+ columns[1] = new long[100];
+ BitMap[] bitMaps = new BitMap[2];
+ bitMaps[0] = new BitMap(100);
+ bitMaps[1] = new BitMap(100);
+
+ for (int r = 0; r < 100; r++) {
+ times[r] = r;
+ bitMaps[0].mark(r);
+ bitMaps[1].mark(r);
+ }
+
+ InsertTabletNode insertTabletNode1 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ times,
+ bitMaps,
+ columns,
+ times.length);
+
+ dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+ for (int r = 50; r < 149; r++) {
+ times[r - 50] = r;
+ bitMaps[0].mark(r - 50);
+ bitMaps[1].mark(r - 50);
+ }
+
+ InsertTabletNode insertTabletNode2 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ times,
+ bitMaps,
+ columns,
+ times.length);
+
+ dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId,
measurementId)),
+ deviceId,
+ context,
+ null);
+
+ Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ }
+
@Test
public void testSeqAndUnSeqSyncClose()
throws WriteProcessException, QueryProcessException,
IllegalPathException {