This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch memtable_sort_in_query
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/memtable_sort_in_query by this 
push:
     new 27727b6  add test
27727b6 is described below

commit 27727b63619583dd90819c50f6139b47faa7d850
Author: 151250176 <[email protected]>
AuthorDate: Wed Nov 18 16:45:41 2020 +0800

    add test
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   2 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |   7 +-
 .../db/engine/memtable/IWritableMemChunk.java      |  14 +--
 .../iotdb/db/engine/memtable/WritableMemChunk.java |  24 +----
 .../db/engine/memtable/PrimitiveMemTableTest.java  |   2 +-
 .../db/integration/IoTDBInsertWithQueryIT.java     | 117 +++++++++++++++++++--
 6 files changed, 125 insertions(+), 41 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 98a79ce..e9f4c92 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -89,7 +89,7 @@ public class MemTableFlushTask {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = 
memTable.getMemTableMap().get(deviceId).get(measurementId);
         MeasurementSchema desc = series.getSchema();
-        TVList tvList = series.getSortedTVListForFlush();
+        TVList tvList = series.getSortedTVList();
         sortTime += System.currentTimeMillis() - startTime;
         encodingTaskQueue.add(new Pair<>(tvList, desc));
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 026b343..b94c2e1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -252,7 +252,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     // when next query come, it will find the data has been sorted and get 
reference of the data
     synchronized (memTableMap) {
       IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
-      chunkCopy = memChunk.getSortedTVListForQuery();
+      chunkCopy = memChunk.getSortedTVList();
       chunkCopy.increaseReferenceCount();
       curSize = chunkCopy.size();
     }
@@ -333,7 +333,10 @@ public abstract class AbstractMemTable implements 
IMemTable {
   public void release() {
     for (Entry<String, Map<String, IWritableMemChunk>> entry : 
memTableMap.entrySet()) {
       for (Entry<String, IWritableMemChunk> subEntry : 
entry.getValue().entrySet()) {
-        TVListAllocator.getInstance().release(subEntry.getValue().getTVList());
+        TVList list = subEntry.getValue().getTVList();
+        if (list.getReferenceCount() == 0) {
+          TVListAllocator.getInstance().release(list);
+        }
       }
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index f733864..6051668 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -62,7 +62,7 @@ public interface IWritableMemChunk {
   MeasurementSchema getSchema();
 
   /**
-   * served for query requests.
+   * served for query and flush requests.
    * <p>
    * if tv list has been sorted, just return reference of it
    * <p>
@@ -75,18 +75,8 @@ public interface IWritableMemChunk {
    *
    * @return sorted tv list
    */
-  TVList getSortedTVListForQuery();
+  TVList getSortedTVList();
 
-  /**
-   * served for flush requests.
-   * <p>
-   * if tv list has reference, copy it. Then sort it
-   * <p>
-   * the mechanism is just like copy on write
-   *
-   * @return sorted tv list
-   */
-  TVList getSortedTVListForFlush();
 
   default TVList getTVList() {
     return null;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 7bc76ca..929ec92 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -153,23 +153,9 @@ public class WritableMemChunk implements IWritableMemChunk 
{
   }
 
   @Override
-  public synchronized TVList getSortedTVListForQuery() {
+  public TVList getSortedTVList() {
     // check reference count
-    if (list.getReferenceCount() > 0 && !list.isSorted()) {
-      list = list.clone();
-    }
-
-    if (!list.isSorted()) {
-      list.sort();
-    }
-
-    return list;
-  }
-
-  @Override
-  public TVList getSortedTVListForFlush() {
-    // check reference count
-    if (list.getReferenceCount() > 0) {
+    if ((list.getReferenceCount() > 0 && !list.isSorted())) {
       list = list.clone();
     }
 
@@ -207,13 +193,13 @@ public class WritableMemChunk implements 
IWritableMemChunk {
 
   @Override
   public String toString() {
-    int size = getSortedTVListForQuery().size();
+    int size = getSortedTVList().size();
     StringBuilder out = new StringBuilder("MemChunk Size: " + size + 
System.lineSeparator());
     if (size != 0) {
       out.append("Data 
type:").append(schema.getType()).append(System.lineSeparator());
-      out.append("First 
point:").append(getSortedTVListForQuery().getTimeValuePair(0))
+      out.append("First point:").append(getSortedTVList().getTimeValuePair(0))
           .append(System.lineSeparator());
-      out.append("Last 
point:").append(getSortedTVListForQuery().getTimeValuePair(size - 1))
+      out.append("Last point:").append(getSortedTVList().getTimeValuePair(size 
- 1))
           .append(System.lineSeparator());
       ;
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index b967fe1..94c6378 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -58,7 +58,7 @@ public class PrimitiveMemTableTest {
     for (int i = 0; i < count; i++) {
       series.write(i, i);
     }
-    IPointReader it = series.getSortedTVListForQuery().getIterator();
+    IPointReader it = series.getSortedTVList().getIterator();
     int i = 0;
     while (it.hasNextTimeValuePair()) {
       Assert.assertEquals(i, it.nextTimeValuePair().getTimestamp());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBInsertWithQueryIT.java
 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBInsertWithQueryIT.java
index d318260..a5a044d 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBInsertWithQueryIT.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBInsertWithQueryIT.java
@@ -87,7 +87,7 @@ public class IoTDBInsertWithQueryIT {
   }
 
   @Test
-  public void insertWithQueryTestUnsequence() throws ClassNotFoundException {
+  public void insertWithQueryUnsequenceTest() throws ClassNotFoundException {
     // insert
     insertData(0, 1000);
 
@@ -108,7 +108,7 @@ public class IoTDBInsertWithQueryIT {
   }
 
   @Test
-  public void insertWithQueryMultiThreadTestUnsequence()
+  public void insertWithQueryMultiThreadUnsequenceTest()
       throws ClassNotFoundException, InterruptedException {
     // insert
     insertData(0, 1000);
@@ -128,6 +128,98 @@ public class IoTDBInsertWithQueryIT {
     selectWithMultiThread(2500);
   }
 
+  @Test
+  public void insertWithQueryFlushTest() throws ClassNotFoundException {
+    // insert
+    insertData(0, 1000);
+
+    // select
+    selectAndCount(1000);
+
+    flush();
+
+    // insert
+    insertData(1000, 2000);
+
+    // select
+    selectAndCount(2000);
+  }
+
+  @Test
+  public void flushWithQueryTest() throws ClassNotFoundException, 
InterruptedException {
+    // insert
+    insertData(0, 1000);
+
+    // select with flush
+    selectWithMultiThreadAndFlush(1000);
+
+    // insert
+    insertData(500, 1500);
+
+    // select
+    selectWithMultiThreadAndFlush(1500);
+  }
+
+  @Test
+  public void flushWithQueryUnorderTest() throws ClassNotFoundException, 
InterruptedException {
+    // insert
+    insertData(0, 100);
+    insertData(500, 600);
+
+    // select
+    selectWithMultiThread(200);
+
+    insertData(200, 400);
+
+    selectWithMultiThreadAndFlush(400);
+
+    insertData(0, 1000);
+
+    selectWithMultiThread(1000);
+  }
+
+  private void selectWithMultiThreadAndFlush(int res) throws 
InterruptedException {
+    List<Thread> queryThreadList = new ArrayList<>();
+
+    // select with multi thread
+    for (int i = 0; i < 2; i++) {
+      Thread cur = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            selectAndCount(res);
+          } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+          }
+        }
+      });
+
+      if(i == 1){
+        Thread flushThread = new Thread(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              flush();
+            } catch (ClassNotFoundException e) {
+              e.printStackTrace();
+            }
+          }
+        });
+
+        flushThread.start();
+        queryThreadList.add(flushThread);
+      }
+
+      queryThreadList.add(cur);
+      cur.start();
+    }
+
+    for (Thread thread : queryThreadList) {
+      thread.join();
+    }
+  }
+
+
   private void selectWithMultiThread(int res) throws InterruptedException {
     List<Thread> queryThreadList = new ArrayList<>();
 
@@ -172,6 +264,17 @@ public class IoTDBInsertWithQueryIT {
     }
   }
 
+  private void flush() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", 
"root");
+        Statement statement = connection.createStatement()) {
+      // insert of data time range : start-end into fans
+      statement.execute("flush");
+    } catch (SQLException e) {
+      e.printStackTrace();
+    }
+  }
 
   // "select * from root.vehicle" : test select wild data
   private void selectAndCount(int res) throws ClassNotFoundException {
@@ -185,11 +288,13 @@ public class IoTDBInsertWithQueryIT {
       Assert.assertTrue(hasResultSet);
       try (ResultSet resultSet = statement.getResultSet()) {
         int cnt = 0;
+        long before = -1;
         while (resultSet.next()) {
-          String ans =
-              resultSet.getString(TestConstant.TIMESTAMP_STR) + "," + resultSet
-                  .getString("root.fans.d0.s0")
-                  + "," + resultSet.getString("root.fans.d0.s1");
+          long cur = 
Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR));
+          if(cur <= before){
+            fail("time order is wrong");
+          }
+          before = cur;
           cnt++;
         }
         assertEquals(res, cnt);

Reply via email to