This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_vector by this push:
new 6ffb37c add some interfaces in IWritableMemChunk for flushing
6ffb37c is described below
commit 6ffb37c147483ab99578fc5cc19bd761013150de
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Oct 28 11:20:07 2021 +0800
add some interfaces in IWritableMemChunk for flushing
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 48 ++++++++--------------
.../db/engine/memtable/IWritableMemChunk.java | 7 +++-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 9 +++-
3 files changed, 29 insertions(+), 35 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 7962da4..3c29135 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -18,6 +18,13 @@
*/
package org.apache.iotdb.db.engine.flush;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
@@ -28,24 +35,12 @@ import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.db.utils.datastructure.VectorTVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.chunk.VectorChunkWriterImpl;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
/**
* flush task to flush one memtable using a pipeline model to flush, which is
sort memtable ->
* encoding -> write to disk (io task)
@@ -118,13 +113,12 @@ public class MemTableFlushTask {
for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry :
value.entrySet()) {
long startTime = System.currentTimeMillis();
IWritableMemChunk series = iWritableMemChunkEntry.getValue();
- IMeasurementSchema desc = series.getSchema();
/*
* sort task (first task of flush pipeline)
*/
- TVList tvList = series.getSortedTvListForFlush();
+ series.sortTvListForFlush();
sortTime += System.currentTimeMillis() - startTime;
- encodingTaskQueue.put(new Pair<>(tvList, desc));
+ encodingTaskQueue.put(series);
}
encodingTaskQueue.put(new EndChunkGroupIoTask());
@@ -290,7 +284,7 @@ public class MemTableFlushTask {
writer.getFile().getName());
while (true) {
- Object task = null;
+ Object task;
try {
task = encodingTaskQueue.take();
} catch (InterruptedException e1) {
@@ -316,15 +310,9 @@ public class MemTableFlushTask {
break;
} else {
long starTime = System.currentTimeMillis();
- Pair<TVList, IMeasurementSchema> encodingMessage =
- (Pair<TVList, IMeasurementSchema>) task;
- IChunkWriter seriesWriter;
- if (encodingMessage.left.getDataType() == TSDataType.VECTOR) {
- seriesWriter = new
VectorChunkWriterImpl(encodingMessage.right);
- } else {
- seriesWriter = new ChunkWriterImpl(encodingMessage.right);
- }
- writeOneSeries(encodingMessage.left, seriesWriter,
encodingMessage.right.getType());
+ IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
+ IChunkWriter seriesWriter = writableMemChunk.createIChunkWrite();
+ writableMemChunk.encode(seriesWriter);
seriesWriter.sealCurrentPage();
seriesWriter.clearPageWriter();
try {
@@ -374,16 +362,12 @@ public class MemTableFlushTask {
this.writer.startChunkGroup(((StartFlushGroupIOTask)
ioMessage).deviceId);
} else if (ioMessage instanceof TaskEnd) {
break;
- } else if (ioMessage instanceof ChunkWriterImpl) {
- ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
- chunkWriter.writeToFileWriter(this.writer);
- } else if (ioMessage instanceof VectorChunkWriterImpl) {
- VectorChunkWriterImpl chunkWriter = (VectorChunkWriterImpl)
ioMessage;
- chunkWriter.writeToFileWriter(this.writer);
- } else {
+ } else if (ioMessage instanceof EndChunkGroupIoTask) {
this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
this.writer.endChunkGroup();
+ } else {
+ ((IChunkWriter) ioMessage).writeToFileWriter(this.writer);
}
} catch (IOException e) {
LOGGER.error(
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index b58f3b7..d6a3862 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import java.util.List;
@@ -101,7 +102,7 @@ public interface IWritableMemChunk {
*
* @return sorted tv list
*/
- TVList getSortedTvListForFlush();
+ void sortTvListForFlush();
default TVList getTVList() {
return null;
@@ -116,4 +117,8 @@ public interface IWritableMemChunk {
// For delete one column in the vector
int delete(long lowerBound, long upperBound, int columnIndex);
+
+ IChunkWriter createIChunkWrite();
+
+ void encode(IChunkWriter chunkWriter);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 09b30e8..f7d359e 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import java.util.List;
@@ -205,9 +206,8 @@ public class WritableMemChunk implements IWritableMemChunk {
}
@Override
- public synchronized TVList getSortedTvListForFlush() {
+ public synchronized void sortTvListForFlush() {
sortTVList();
- return list;
}
@Override
@@ -254,6 +254,11 @@ public class WritableMemChunk implements IWritableMemChunk
{
}
@Override
+ public IChunkWriter createIChunkWrite() {
+ return null;
+ }
+
+ @Override
public String toString() {
int size = list.size();
int firstIndex = 0;