This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 8110e69  [IOTDB-1126] [To rel/0.11]cherry pick IOTDB-1126 unseq tsfile 
delete due to merge (#2577)
8110e69 is described below

commit 8110e69ccb4e152343ff89e2131915bb0b334dbd
Author: wangchao316 <[email protected]>
AuthorDate: Thu Jan 28 14:56:28 2021 +0800

    [IOTDB-1126] [To rel/0.11]cherry pick IOTDB-1126 unseq tsfile delete due to 
merge (#2577)
---
 .../db/engine/merge/task/MergeMultiChunkTask.java  |   1 +
 .../iotdb/db/engine/merge/task/MergeTask.java      |   5 +
 .../engine/storagegroup/StorageGroupProcessor.java |  41 ++++++++
 .../iotdb/db/engine/merge/MergeTaskTest.java       |  28 ++++++
 .../storagegroup/StorageGroupProcessorTest.java    |  27 ++++++
 .../db/integration/IoTDBRemovePartitionIT.java     | 103 +++++++++++++++++++++
 6 files changed, 205 insertions(+)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index b1b03d4..a57bcf7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -288,6 +288,7 @@ public class MergeMultiChunkTask {
       try {
         futures.get(i).get();
       } catch (InterruptedException e) {
+        logger.error("MergeChunkHeapTask interrupted", e);
         Thread.currentThread().interrupt();
         return false;
       } catch (ExecutionException e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 8490410..5ee9142 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -115,6 +115,11 @@ public class MergeTask implements Callable<Void> {
   }
 
   private void doMerge() throws IOException, MetadataException {
+    if (resource.getSeqFiles().isEmpty()) {
+      logger.info("{} no sequence file to merge into, so will abort task.", 
taskName);
+      abort();
+      return;
+    }
     if (logger.isInfoEnabled()) {
       logger.info("{} starts to merge {} seqFiles, {} unseqFiles", taskName,
           resource.getSeqFiles().size(), resource.getUnseqFiles().size());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a67666b..9a5f07c 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1696,6 +1696,45 @@ public class StorageGroupProcessor {
 
 
   /**
+   * <p>
+   *  update latest flush time for partition id
+   *  </>
+   * @param partitionId partition id
+   * @param latestFlushTime lastest flush time
+   * @return true if update latest flush time success
+   */
+  private boolean updateLatestFlushTimeToPartition(long partitionId, long 
latestFlushTime) {
+    // update the largest timestamp in the last flushing memtable
+    Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
+            .get(partitionId);
+
+    if (curPartitionDeviceLatestTime == null) {
+      logger.warn("Partition: {} does't have latest time for each device. "
+                      + "No valid record is written into memtable.  latest 
flush time is: {}",
+              partitionId, latestFlushTime);
+      return false;
+    }
+
+    for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+      // set lastest flush time to latestTimeForEachDevice
+      entry.setValue(latestFlushTime);
+
+      partitionLatestFlushedTimeForEachDevice
+              .computeIfAbsent(partitionId, id -> new HashMap<>())
+              .put(entry.getKey(), entry.getValue());
+      newlyFlushedPartitionLatestFlushedTimeForEachDevice
+              .computeIfAbsent(partitionId, id -> new HashMap<>())
+              .put(entry.getKey(), entry.getValue());
+      if (globalLatestFlushedTimeForEachDevice
+              .getOrDefault(entry.getKey(), Long.MIN_VALUE) < 
entry.getValue()) {
+        globalLatestFlushedTimeForEachDevice.put(entry.getKey(), 
entry.getValue());
+      }
+    }
+    return true;
+  }
+
+
+  /**
    * used for upgrading
    */
   public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long 
partitionId,
@@ -2480,6 +2519,7 @@ public class StorageGroupProcessor {
       if (filter.satisfy(storageGroupName, partitionId)) {
         processor.syncClose();
         iterator.remove();
+        updateLatestFlushTimeToPartition(partitionId, Long.MIN_VALUE);
         logger.debug("{} is removed during deleting partitions",
             processor.getTsFileResource().getTsFilePath());
       }
@@ -2493,6 +2533,7 @@ public class StorageGroupProcessor {
       if (filter.satisfy(storageGroupName, tsFileResource.getTimePartition())) 
{
         tsFileResource.remove();
         iterator.remove();
+        updateLatestFlushTimeToPartition(tsFileResource.getTimePartition(), 
Long.MIN_VALUE);
         logger.debug("{} is removed during deleting partitions", 
tsFileResource.getTsFilePath());
       }
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index 4ecae39..ebbf8a9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -279,4 +279,32 @@ public class MergeTaskTest extends MergeTest {
     assertEquals(70, count);
     tsFilesReader.close();
   }
+
+  @Test
+  public void testOnlyUnseqMerge() throws Exception {
+    // unseq and no seq merge
+    List<TsFileResource> testSeqResources = new ArrayList<>();
+    List<TsFileResource> testUnseqResource = unseqResources.subList(5, 6);
+    MergeTask mergeTask =
+            new MergeTask(new MergeResource(testSeqResources, 
testUnseqResource), tempSGDir.getPath(),
+                    (k, v, l) -> {
+                    }, "test", false, 1, MERGE_TEST_SG);
+    mergeTask.call();
+
+    QueryContext context = new QueryContext();
+    PartialPath path = new PartialPath(
+            deviceIds[0] + TsFileConstant.PATH_SEPARATOR + 
measurementSchemas[0].getMeasurementId());
+    List<TsFileResource> resources = new ArrayList<>();
+    resources.add(seqResources.get(2));
+    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, 
measurementSchemas[0].getType(),
+            context, resources, new ArrayList<>(), null, null, true);
+    int count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        assertEquals(batchData.getTimeByIndex(i) + 0.0, 
batchData.getDoubleByIndex(i), 0.001);
+      }
+    }
+    tsFilesReader.close();
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 6e0efbc..8cd8d1a 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -162,6 +162,33 @@ public class StorageGroupProcessorTest {
   }
 
   @Test
+  public void testInsertDataAndRemovePartitionAndInsert()
+          throws WriteProcessException, QueryProcessException, 
IllegalPathException {
+    for (int j = 0; j < 10; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
+      processor.insert(new InsertRowPlan(record));
+      processor.asyncCloseAllWorkingTsFileProcessors();
+    }
+    processor.syncCloseAllWorkingTsFileProcessors();
+
+    processor.removePartitions((storageGroupName, timePartitionId) -> true);
+
+    for (int j = 0; j < 10; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
+      processor.insert(new InsertRowPlan(record));
+      processor.asyncCloseAllWorkingTsFileProcessors();
+    }
+    processor.syncCloseAllWorkingTsFileProcessors();
+
+    QueryDataSource queryDataSource = processor
+            .query(new PartialPath(deviceId), measurementId, context,
+                    null, null);
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+  }
+
+  @Test
   public void testIoTDBTabletWriteAndSyncClose()
       throws QueryProcessException, IllegalPathException {
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
index 860ff45..67d7375 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
@@ -151,6 +151,109 @@ public class IoTDBRemovePartitionIT {
     }
   }
 
+  @Test
+  public void testRemoveOnePartitionAndInsertData() {
+    try (Connection connection = DriverManager
+            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", 
"root", "root");
+         Statement statement = connection.createStatement()) {
+      statement.execute("set storage group to root.test");
+      statement.execute("insert into root.test.wf02.wt02(timestamp,status) 
values(1,true)");
+      statement.execute("select * from root.test.wf02.wt02");
+      statement.execute("DELETE PARTITION root.test 0");
+      statement.execute("select * from root.test.wf02.wt02");
+      statement.execute("insert into root.test.wf02.wt02(timestamp,status) 
values(1,true)");
+      try (ResultSet resultSet = statement.executeQuery("select * from 
root.test.wf02.wt02")) {
+        assertEquals(true, resultSet.next());
+      }
+      statement.execute("flush");
+      try (ResultSet resultSet = statement.executeQuery("select * from 
root.test.wf02.wt02")) {
+        assertEquals(true, resultSet.next());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testRemovePartitionAndInsertUnSeqDataAndMerge() {
+    try (Connection connection = DriverManager
+            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", 
"root", "root");
+         Statement statement = connection.createStatement()) {
+      statement.execute("set storage group to root.test");
+      statement.execute("insert into root.test.wf02.wt02(timestamp,status) 
values(2,true)");
+      statement.execute("select * from root.test.wf02.wt02");
+      statement.execute("DELETE PARTITION root.test 0");
+      statement.execute("select * from root.test.wf02.wt02");
+      statement.execute("insert into root.test.wf02.wt02(timestamp,status) 
values(1,true)");
+      try (ResultSet resultSet = statement.executeQuery("select * from 
root.test.wf02.wt02")) {
+        assertEquals(true, resultSet.next());
+      }
+      statement.execute("insert into root.test.wf02.wt02(timestamp,status) 
values(3,true)");
+      statement.execute("merge");
+      int count = 0;
+      try (ResultSet resultSet = statement.executeQuery("select * from 
root.test.wf02.wt02")) {
+        while (resultSet.next()) {
+          count ++;
+        }
+        assertEquals(2, count);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testRemovePartitionAndInsertUnSeqDataAndUnSeqDataMerge() {
+    try (Connection connection = DriverManager
+            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", 
"root", "root");
+         Statement statement = connection.createStatement()) {
+      statement.execute("set storage group to root.test");
+      statement.execute("insert into root.test.wf02.wt02(timestamp,status) 
values(2,true)");
+      statement.execute("select * from root.test.wf02.wt02");
+      statement.execute("DELETE PARTITION root.test 0");
+      statement.execute("select * from root.test.wf02.wt02");
+      statement.execute("insert into root.test.wf02.wt02(timestamp,status) 
values(1,true)");
+      try (ResultSet resultSet = statement.executeQuery("select * from 
root.test.wf02.wt02")) {
+        assertEquals(true, resultSet.next());
+      }
+      statement.execute("insert into root.test.wf02.wt02(timestamp,status) 
values(2,true)");
+      statement.execute("merge");
+      int count = 0;
+      try (ResultSet resultSet = statement.executeQuery("select * from 
root.test.wf02.wt02")) {
+        while (resultSet.next()) {
+          count ++;
+        }
+        assertEquals(2, count);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testFlushAndRemoveOnePartitionAndInsertData() {
+    try (Connection connection = DriverManager
+            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", 
"root", "root");
+         Statement statement = connection.createStatement()) {
+      statement.execute("set storage group to root.test");
+      statement.execute("insert into root.test.wf02.wt02(timestamp,status) 
values(1,true)");
+      statement.execute("flush");
+      statement.execute("DELETE PARTITION root.test 0");
+      statement.execute("select * from root.test.wf02.wt02");
+      statement.execute("insert into root.test.wf02.wt02(timestamp,status) 
values(1,true)");
+      try (ResultSet resultSet = statement.executeQuery("select * from 
root.test.wf02.wt02")) {
+        assertEquals(true, resultSet.next());
+      }
+      statement.execute("flush");
+      int count = 0;
+      try (ResultSet resultSet = statement.executeQuery("select * from 
root.test.wf02.wt02")) {
+        assertEquals(true, resultSet.next());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
   private static void insertData() throws ClassNotFoundException {
     List<String> sqls = new ArrayList<>(Arrays.asList(
         "SET STORAGE GROUP TO root.test1",

Reply via email to