This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch memtable_sort_in_query
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/memtable_sort_in_query by this
push:
new 27727b6 add test
27727b6 is described below
commit 27727b63619583dd90819c50f6139b47faa7d850
Author: 151250176 <[email protected]>
AuthorDate: Wed Nov 18 16:45:41 2020 +0800
add test
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 2 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 7 +-
.../db/engine/memtable/IWritableMemChunk.java | 14 +--
.../iotdb/db/engine/memtable/WritableMemChunk.java | 24 +----
.../db/engine/memtable/PrimitiveMemTableTest.java | 2 +-
.../db/integration/IoTDBInsertWithQueryIT.java | 117 +++++++++++++++++++--
6 files changed, 125 insertions(+), 41 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 98a79ce..e9f4c92 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -89,7 +89,7 @@ public class MemTableFlushTask {
long startTime = System.currentTimeMillis();
IWritableMemChunk series =
memTable.getMemTableMap().get(deviceId).get(measurementId);
MeasurementSchema desc = series.getSchema();
- TVList tvList = series.getSortedTVListForFlush();
+ TVList tvList = series.getSortedTVList();
sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.add(new Pair<>(tvList, desc));
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 026b343..b94c2e1 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -252,7 +252,7 @@ public abstract class AbstractMemTable implements IMemTable
{
// when next query come, it will find the data has been sorted and get
reference of the data
synchronized (memTableMap) {
IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
- chunkCopy = memChunk.getSortedTVListForQuery();
+ chunkCopy = memChunk.getSortedTVList();
chunkCopy.increaseReferenceCount();
curSize = chunkCopy.size();
}
@@ -333,7 +333,10 @@ public abstract class AbstractMemTable implements
IMemTable {
public void release() {
for (Entry<String, Map<String, IWritableMemChunk>> entry :
memTableMap.entrySet()) {
for (Entry<String, IWritableMemChunk> subEntry :
entry.getValue().entrySet()) {
- TVListAllocator.getInstance().release(subEntry.getValue().getTVList());
+ TVList list = subEntry.getValue().getTVList();
+ if (list.getReferenceCount() == 0) {
+ TVListAllocator.getInstance().release(list);
+ }
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index f733864..6051668 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -62,7 +62,7 @@ public interface IWritableMemChunk {
MeasurementSchema getSchema();
/**
- * served for query requests.
+ * served for query and flush requests.
* <p>
* if tv list has been sorted, just return reference of it
* <p>
@@ -75,18 +75,8 @@ public interface IWritableMemChunk {
*
* @return sorted tv list
*/
- TVList getSortedTVListForQuery();
+ TVList getSortedTVList();
- /**
- * served for flush requests.
- * <p>
- * if tv list has reference, copy it. Then sort it
- * <p>
- * the mechanism is just like copy on write
- *
- * @return sorted tv list
- */
- TVList getSortedTVListForFlush();
default TVList getTVList() {
return null;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 7bc76ca..929ec92 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -153,23 +153,9 @@ public class WritableMemChunk implements IWritableMemChunk
{
}
@Override
- public synchronized TVList getSortedTVListForQuery() {
+ public TVList getSortedTVList() {
// check reference count
- if (list.getReferenceCount() > 0 && !list.isSorted()) {
- list = list.clone();
- }
-
- if (!list.isSorted()) {
- list.sort();
- }
-
- return list;
- }
-
- @Override
- public TVList getSortedTVListForFlush() {
- // check reference count
- if (list.getReferenceCount() > 0) {
+ if ((list.getReferenceCount() > 0 && !list.isSorted())) {
list = list.clone();
}
@@ -207,13 +193,13 @@ public class WritableMemChunk implements
IWritableMemChunk {
@Override
public String toString() {
- int size = getSortedTVListForQuery().size();
+ int size = getSortedTVList().size();
StringBuilder out = new StringBuilder("MemChunk Size: " + size +
System.lineSeparator());
if (size != 0) {
out.append("Data
type:").append(schema.getType()).append(System.lineSeparator());
- out.append("First
point:").append(getSortedTVListForQuery().getTimeValuePair(0))
+ out.append("First point:").append(getSortedTVList().getTimeValuePair(0))
.append(System.lineSeparator());
- out.append("Last
point:").append(getSortedTVListForQuery().getTimeValuePair(size - 1))
+ out.append("Last point:").append(getSortedTVList().getTimeValuePair(size
- 1))
.append(System.lineSeparator());
;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index b967fe1..94c6378 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -58,7 +58,7 @@ public class PrimitiveMemTableTest {
for (int i = 0; i < count; i++) {
series.write(i, i);
}
- IPointReader it = series.getSortedTVListForQuery().getIterator();
+ IPointReader it = series.getSortedTVList().getIterator();
int i = 0;
while (it.hasNextTimeValuePair()) {
Assert.assertEquals(i, it.nextTimeValuePair().getTimestamp());
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBInsertWithQueryIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBInsertWithQueryIT.java
index d318260..a5a044d 100644
---
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBInsertWithQueryIT.java
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBInsertWithQueryIT.java
@@ -87,7 +87,7 @@ public class IoTDBInsertWithQueryIT {
}
@Test
- public void insertWithQueryTestUnsequence() throws ClassNotFoundException {
+ public void insertWithQueryUnsequenceTest() throws ClassNotFoundException {
// insert
insertData(0, 1000);
@@ -108,7 +108,7 @@ public class IoTDBInsertWithQueryIT {
}
@Test
- public void insertWithQueryMultiThreadTestUnsequence()
+ public void insertWithQueryMultiThreadUnsequenceTest()
throws ClassNotFoundException, InterruptedException {
// insert
insertData(0, 1000);
@@ -128,6 +128,98 @@ public class IoTDBInsertWithQueryIT {
selectWithMultiThread(2500);
}
+ @Test
+ public void insertWithQueryFlushTest() throws ClassNotFoundException {
+ // insert
+ insertData(0, 1000);
+
+ // select
+ selectAndCount(1000);
+
+ flush();
+
+ // insert
+ insertData(1000, 2000);
+
+ // select
+ selectAndCount(2000);
+ }
+
+ @Test
+ public void flushWithQueryTest() throws ClassNotFoundException,
InterruptedException {
+ // insert
+ insertData(0, 1000);
+
+ // select with flush
+ selectWithMultiThreadAndFlush(1000);
+
+ // insert
+ insertData(500, 1500);
+
+ // select
+ selectWithMultiThreadAndFlush(1500);
+ }
+
+ @Test
+ public void flushWithQueryUnorderTest() throws ClassNotFoundException,
InterruptedException {
+ // insert
+ insertData(0, 100);
+ insertData(500, 600);
+
+ // select
+ selectWithMultiThread(200);
+
+ insertData(200, 400);
+
+ selectWithMultiThreadAndFlush(400);
+
+ insertData(0, 1000);
+
+ selectWithMultiThread(1000);
+ }
+
+ private void selectWithMultiThreadAndFlush(int res) throws
InterruptedException {
+ List<Thread> queryThreadList = new ArrayList<>();
+
+ // select with multi thread
+ for (int i = 0; i < 2; i++) {
+ Thread cur = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ selectAndCount(res);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ if(i == 1){
+ Thread flushThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ flush();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ flushThread.start();
+ queryThreadList.add(flushThread);
+ }
+
+ queryThreadList.add(cur);
+ cur.start();
+ }
+
+ for (Thread thread : queryThreadList) {
+ thread.join();
+ }
+ }
+
+
private void selectWithMultiThread(int res) throws InterruptedException {
List<Thread> queryThreadList = new ArrayList<>();
@@ -172,6 +264,17 @@ public class IoTDBInsertWithQueryIT {
}
}
+ private void flush() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
+ Statement statement = connection.createStatement()) {
+ // insert of data time range : start-end into fans
+ statement.execute("flush");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
// "select * from root.vehicle" : test select wild data
private void selectAndCount(int res) throws ClassNotFoundException {
@@ -185,11 +288,13 @@ public class IoTDBInsertWithQueryIT {
Assert.assertTrue(hasResultSet);
try (ResultSet resultSet = statement.getResultSet()) {
int cnt = 0;
+ long before = -1;
while (resultSet.next()) {
- String ans =
- resultSet.getString(TestConstant.TIMESTAMP_STR) + "," + resultSet
- .getString("root.fans.d0.s0")
- + "," + resultSet.getString("root.fans.d0.s1");
+ long cur =
Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR));
+ if(cur <= before){
+ fail("time order is wrong");
+ }
+ before = cur;
cnt++;
}
assertEquals(res, cnt);