This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 8110e69 [IOTDB-1126] [To rel/0.11]cherry pick IOTDB-1126 unseq tsfile
delete due to merge (#2577)
8110e69 is described below
commit 8110e69ccb4e152343ff89e2131915bb0b334dbd
Author: wangchao316 <[email protected]>
AuthorDate: Thu Jan 28 14:56:28 2021 +0800
[IOTDB-1126] [To rel/0.11]cherry pick IOTDB-1126 unseq tsfile delete due to
merge (#2577)
---
.../db/engine/merge/task/MergeMultiChunkTask.java | 1 +
.../iotdb/db/engine/merge/task/MergeTask.java | 5 +
.../engine/storagegroup/StorageGroupProcessor.java | 41 ++++++++
.../iotdb/db/engine/merge/MergeTaskTest.java | 28 ++++++
.../storagegroup/StorageGroupProcessorTest.java | 27 ++++++
.../db/integration/IoTDBRemovePartitionIT.java | 103 +++++++++++++++++++++
6 files changed, 205 insertions(+)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index b1b03d4..a57bcf7 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -288,6 +288,7 @@ public class MergeMultiChunkTask {
try {
futures.get(i).get();
} catch (InterruptedException e) {
+ logger.error("MergeChunkHeapTask interrupted", e);
Thread.currentThread().interrupt();
return false;
} catch (ExecutionException e) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 8490410..5ee9142 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -115,6 +115,11 @@ public class MergeTask implements Callable<Void> {
}
private void doMerge() throws IOException, MetadataException {
+ if (resource.getSeqFiles().isEmpty()) {
+ logger.info("{} no sequence file to merge into, so will abort task.",
taskName);
+ abort();
+ return;
+ }
if (logger.isInfoEnabled()) {
logger.info("{} starts to merge {} seqFiles, {} unseqFiles", taskName,
resource.getSeqFiles().size(), resource.getUnseqFiles().size());
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a67666b..9a5f07c 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1696,6 +1696,45 @@ public class StorageGroupProcessor {
/**
+ * <p>
+ * update latest flush time for partition id
+ * </>
+ * @param partitionId partition id
+ * @param latestFlushTime lastest flush time
+ * @return true if update latest flush time success
+ */
+ private boolean updateLatestFlushTimeToPartition(long partitionId, long
latestFlushTime) {
+ // update the largest timestamp in the last flushing memtable
+ Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
+ .get(partitionId);
+
+ if (curPartitionDeviceLatestTime == null) {
+ logger.warn("Partition: {} does't have latest time for each device. "
+ + "No valid record is written into memtable. latest
flush time is: {}",
+ partitionId, latestFlushTime);
+ return false;
+ }
+
+ for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+ // set lastest flush time to latestTimeForEachDevice
+ entry.setValue(latestFlushTime);
+
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(partitionId, id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ newlyFlushedPartitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(partitionId, id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ if (globalLatestFlushedTimeForEachDevice
+ .getOrDefault(entry.getKey(), Long.MIN_VALUE) <
entry.getValue()) {
+ globalLatestFlushedTimeForEachDevice.put(entry.getKey(),
entry.getValue());
+ }
+ }
+ return true;
+ }
+
+
+ /**
* used for upgrading
*/
public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long
partitionId,
@@ -2480,6 +2519,7 @@ public class StorageGroupProcessor {
if (filter.satisfy(storageGroupName, partitionId)) {
processor.syncClose();
iterator.remove();
+ updateLatestFlushTimeToPartition(partitionId, Long.MIN_VALUE);
logger.debug("{} is removed during deleting partitions",
processor.getTsFileResource().getTsFilePath());
}
@@ -2493,6 +2533,7 @@ public class StorageGroupProcessor {
if (filter.satisfy(storageGroupName, tsFileResource.getTimePartition()))
{
tsFileResource.remove();
iterator.remove();
+ updateLatestFlushTimeToPartition(tsFileResource.getTimePartition(),
Long.MIN_VALUE);
logger.debug("{} is removed during deleting partitions",
tsFileResource.getTsFilePath());
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index 4ecae39..ebbf8a9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -279,4 +279,32 @@ public class MergeTaskTest extends MergeTest {
assertEquals(70, count);
tsFilesReader.close();
}
+
+ @Test
+ public void testOnlyUnseqMerge() throws Exception {
+ // unseq and no seq merge
+ List<TsFileResource> testSeqResources = new ArrayList<>();
+ List<TsFileResource> testUnseqResource = unseqResources.subList(5, 6);
+ MergeTask mergeTask =
+ new MergeTask(new MergeResource(testSeqResources,
testUnseqResource), tempSGDir.getPath(),
+ (k, v, l) -> {
+ }, "test", false, 1, MERGE_TEST_SG);
+ mergeTask.call();
+
+ QueryContext context = new QueryContext();
+ PartialPath path = new PartialPath(
+ deviceIds[0] + TsFileConstant.PATH_SEPARATOR +
measurementSchemas[0].getMeasurementId());
+ List<TsFileResource> resources = new ArrayList<>();
+ resources.add(seqResources.get(2));
+ IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path,
measurementSchemas[0].getType(),
+ context, resources, new ArrayList<>(), null, null, true);
+ int count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ assertEquals(batchData.getTimeByIndex(i) + 0.0,
batchData.getDoubleByIndex(i), 0.001);
+ }
+ }
+ tsFilesReader.close();
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 6e0efbc..8cd8d1a 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -162,6 +162,33 @@ public class StorageGroupProcessorTest {
}
@Test
+ public void testInsertDataAndRemovePartitionAndInsert()
+ throws WriteProcessException, QueryProcessException,
IllegalPathException {
+ for (int j = 0; j < 10; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ processor.insert(new InsertRowPlan(record));
+ processor.asyncCloseAllWorkingTsFileProcessors();
+ }
+ processor.syncCloseAllWorkingTsFileProcessors();
+
+ processor.removePartitions((storageGroupName, timePartitionId) -> true);
+
+ for (int j = 0; j < 10; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ processor.insert(new InsertRowPlan(record));
+ processor.asyncCloseAllWorkingTsFileProcessors();
+ }
+ processor.syncCloseAllWorkingTsFileProcessors();
+
+ QueryDataSource queryDataSource = processor
+ .query(new PartialPath(deviceId), measurementId, context,
+ null, null);
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ }
+
+ @Test
public void testIoTDBTabletWriteAndSyncClose()
throws QueryProcessException, IllegalPathException {
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
index 860ff45..67d7375 100644
---
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
@@ -151,6 +151,109 @@ public class IoTDBRemovePartitionIT {
}
}
+ @Test
+ public void testRemoveOnePartitionAndInsertData() {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("set storage group to root.test");
+ statement.execute("insert into root.test.wf02.wt02(timestamp,status)
values(1,true)");
+ statement.execute("select * from root.test.wf02.wt02");
+ statement.execute("DELETE PARTITION root.test 0");
+ statement.execute("select * from root.test.wf02.wt02");
+ statement.execute("insert into root.test.wf02.wt02(timestamp,status)
values(1,true)");
+ try (ResultSet resultSet = statement.executeQuery("select * from
root.test.wf02.wt02")) {
+ assertEquals(true, resultSet.next());
+ }
+ statement.execute("flush");
+ try (ResultSet resultSet = statement.executeQuery("select * from
root.test.wf02.wt02")) {
+ assertEquals(true, resultSet.next());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testRemovePartitionAndInsertUnSeqDataAndMerge() {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("set storage group to root.test");
+ statement.execute("insert into root.test.wf02.wt02(timestamp,status)
values(2,true)");
+ statement.execute("select * from root.test.wf02.wt02");
+ statement.execute("DELETE PARTITION root.test 0");
+ statement.execute("select * from root.test.wf02.wt02");
+ statement.execute("insert into root.test.wf02.wt02(timestamp,status)
values(1,true)");
+ try (ResultSet resultSet = statement.executeQuery("select * from
root.test.wf02.wt02")) {
+ assertEquals(true, resultSet.next());
+ }
+ statement.execute("insert into root.test.wf02.wt02(timestamp,status)
values(3,true)");
+ statement.execute("merge");
+ int count = 0;
+ try (ResultSet resultSet = statement.executeQuery("select * from
root.test.wf02.wt02")) {
+ while (resultSet.next()) {
+ count ++;
+ }
+ assertEquals(2, count);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testRemovePartitionAndInsertUnSeqDataAndUnSeqDataMerge() {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("set storage group to root.test");
+ statement.execute("insert into root.test.wf02.wt02(timestamp,status)
values(2,true)");
+ statement.execute("select * from root.test.wf02.wt02");
+ statement.execute("DELETE PARTITION root.test 0");
+ statement.execute("select * from root.test.wf02.wt02");
+ statement.execute("insert into root.test.wf02.wt02(timestamp,status)
values(1,true)");
+ try (ResultSet resultSet = statement.executeQuery("select * from
root.test.wf02.wt02")) {
+ assertEquals(true, resultSet.next());
+ }
+ statement.execute("insert into root.test.wf02.wt02(timestamp,status)
values(2,true)");
+ statement.execute("merge");
+ int count = 0;
+ try (ResultSet resultSet = statement.executeQuery("select * from
root.test.wf02.wt02")) {
+ while (resultSet.next()) {
+ count ++;
+ }
+ assertEquals(2, count);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testFlushAndRemoveOnePartitionAndInsertData() {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("set storage group to root.test");
+ statement.execute("insert into root.test.wf02.wt02(timestamp,status)
values(1,true)");
+ statement.execute("flush");
+ statement.execute("DELETE PARTITION root.test 0");
+ statement.execute("select * from root.test.wf02.wt02");
+ statement.execute("insert into root.test.wf02.wt02(timestamp,status)
values(1,true)");
+ try (ResultSet resultSet = statement.executeQuery("select * from
root.test.wf02.wt02")) {
+ assertEquals(true, resultSet.next());
+ }
+ statement.execute("flush");
+ int count = 0;
+ try (ResultSet resultSet = statement.executeQuery("select * from
root.test.wf02.wt02")) {
+ assertEquals(true, resultSet.next());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
private static void insertData() throws ClassNotFoundException {
List<String> sqls = new ArrayList<>(Arrays.asList(
"SET STORAGE GROUP TO root.test1",