This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch two_stage_pipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/two_stage_pipeline by this
push:
new a6a7bc7 find reason
a6a7bc7 is described below
commit a6a7bc7b6bfc1df19b389563a7a45a0c4e168022
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Jan 28 19:27:23 2021 +0800
find reason
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 326 +++++++++++++++++----
.../writelog/recover/TsFileRecoverPerformer.java | 3 +-
.../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 5 +
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 32 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 3 +-
6 files changed, 304 insertions(+), 77 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d0918f1..e35fe72 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -128,17 +128,17 @@ public class IoTDBConfig {
/**
* Memory allocated for the write process
*/
- private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 4 /
10;
+ private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 7 /
10;
/**
* Memory allocated for the read process
*/
- private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 /
10;
+ private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() / 10;
/**
* Memory allocated for the mtree
*/
- private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() * 1
/ 10;
+ private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10;
/**
* Memory allocated for the read process besides cache
@@ -298,7 +298,7 @@ public class IoTDBConfig {
/**
* Is the write mem control for writing enable.
*/
- private boolean enableMemControl = true;
+ private boolean enableMemControl = false;
/**
* Is the write ahead log enable.
@@ -340,12 +340,12 @@ public class IoTDBConfig {
/**
* When a memTable's size (in byte) exceeds this, the memtable is flushed to
disk.
*/
- private long memtableSizeThreshold = 1024 * 1024 * 1024L;
+ private long memtableSizeThreshold = 5 * 1024 * 1024 * 1024L;
/**
* When average series point number reaches this, flush the memtable to disk
*/
- private int avgSeriesPointNumberThreshold = 100000;
+ private int avgSeriesPointNumberThreshold = 1000000000;
/**
* Work when tsfile_manage_strategy is level_strategy. When merge point
number reaches this, merge
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index eaf2fc7..0328eab 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -20,10 +20,17 @@ package org.apache.iotdb.db.engine.flush;
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -34,23 +41,35 @@ import org.slf4j.LoggerFactory;
public class MemTableFlushTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(MemTableFlushTask.class);
- private final RestorableTsFileIOWriter writer;
+ private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER =
FlushSubTaskPoolManager
+ .getInstance();
+ private final Future<?> encodingTaskFuture;
+ private final Future<?> ioTaskFuture;
+ private RestorableTsFileIOWriter writer;
- private final String storageGroup;
+ private final ConcurrentLinkedQueue<Object> ioTaskQueue = new
ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new
ConcurrentLinkedQueue<>();
+ private String storageGroup;
- private final IMemTable memTable;
+ private IMemTable memTable;
+ private volatile boolean noMoreEncodingTask = false;
+ private volatile boolean noMoreIOTask = false;
/**
- * @param memTable the memTable to flush
- * @param writer the writer where memTable will be flushed to (current
tsfile writer or vm writer)
+ * @param memTable the memTable to flush
+ * @param writer the writer where memTable will be flushed to (current
tsfile writer or vm
+ * writer)
* @param storageGroup current storage group
*/
- public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter
writer, String storageGroup) {
+ public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer,
+ String storageGroup) {
this.memTable = memTable;
this.writer = writer;
this.storageGroup = storageGroup;
+ this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
+ this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
LOGGER.debug("flush task of Storage group {} memtable {} is created ",
storageGroup, memTable.getVersion());
}
@@ -59,97 +78,280 @@ public class MemTableFlushTask {
* the function for flushing memtable.
*/
public void syncFlushMemTable()
- throws InterruptedException, IOException {
+ throws ExecutionException, InterruptedException {
LOGGER.info("The memTable size of SG {} is {}, the avg series points num
in chunk is {} ",
storageGroup,
memTable.memSize(),
memTable.getTotalPointsNum() / memTable.getSeriesNumber());
long start = System.currentTimeMillis();
long sortTime = 0;
- long encodingTime = 0;
- long ioTime = 0;
//for map do not use get(key) to iteratate
- for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry :
memTable.getMemTableMap().entrySet()) {
- writer.startChunkGroup(memTableEntry.getKey());
+ for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry :
memTable.getMemTableMap()
+ .entrySet()) {
+ encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
+
final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry :
value.entrySet()) {
long startTime = System.currentTimeMillis();
IWritableMemChunk series = iWritableMemChunkEntry.getValue();
MeasurementSchema desc = series.getSchema();
TVList tvList = series.getSortedTVListForFlush();
- long encodingStartTime = System.currentTimeMillis();
- sortTime += encodingStartTime - startTime;
- IChunkWriter seriesWriter = new ChunkWriterImpl(desc);
- writeOneSeries(tvList, seriesWriter, desc.getType());
- seriesWriter.sealCurrentPage();
- seriesWriter.clearPageWriter();
- long ioStartTime = System.currentTimeMillis();
- encodingTime += (ioStartTime - encodingStartTime);
- seriesWriter.writeToFileWriter(this.writer);
- ioTime += (System.currentTimeMillis() - ioStartTime);
+ sortTime += System.currentTimeMillis() - startTime;
+ encodingTaskQueue.add(new Pair<>(tvList, desc));
}
- long ioStartTime = System.currentTimeMillis();
- writer.setMinPlanIndex(memTable.getMinPlanIndex());
- writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
- writer.endChunkGroup();
- ioTime += (System.currentTimeMillis() - ioStartTime);
+
+ encodingTaskQueue.add(new EndChunkGroupIoTask());
}
+ noMoreEncodingTask = true;
LOGGER.info(
"Storage group {} memtable {}, flushing into disk: data sort time cost
{} ms.",
storageGroup, memTable.getVersion(), sortTime);
- LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding
data cost "
- + "{} ms.",
- storageGroup, memTable.getVersion(), encodingTime);
- LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms",
memTable.getVersion(),
- storageGroup, ioTime);
- writer.writeVersion(memTable.getVersion());
- writer.writePlanIndices();
+ try {
+ encodingTaskFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ // avoid ioTask waiting forever
+ noMoreIOTask = true;
+ ioTaskFuture.cancel(true);
+ throw e;
+ }
+
+ ioTaskFuture.get();
+
+ try {
+ writer.writeVersion(memTable.getVersion());
+ writer.writePlanIndices();
+ } catch (IOException e) {
+ throw new ExecutionException(e);
+ }
LOGGER.info(
"Storage group {} memtable {} flushing a memtable has finished! Time
consumption: {}ms",
storageGroup, memTable, System.currentTimeMillis() - start);
+ }
+ private Runnable encodingTask = new Runnable() {
+ private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
+ TSDataType dataType) {
+ for (int i = 0; i < tvPairs.size(); i++) {
+ long time = tvPairs.getTime(i);
- }
+ // skip duplicated data
+ if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) {
+ continue;
+ }
- private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
- TSDataType dataType) {
- for (int i = 0; i < tvPairs.size(); i++) {
- long time = tvPairs.getTime(i);
+ switch (dataType) {
+ case BOOLEAN:
+ seriesWriterImpl.write(time, tvPairs.getBoolean(i));
+ break;
+ case INT32:
+ seriesWriterImpl.write(time, tvPairs.getInt(i));
+ break;
+ case INT64:
+ seriesWriterImpl.write(time, tvPairs.getLong(i));
+ break;
+ case FLOAT:
+ seriesWriterImpl.write(time, tvPairs.getFloat(i));
+ break;
+ case DOUBLE:
+ seriesWriterImpl.write(time, tvPairs.getDouble(i));
+ break;
+ case TEXT:
+ seriesWriterImpl.write(time, tvPairs.getBinary(i));
+ break;
+ default:
+ LOGGER.error("Storage group {} does not support data type: {}",
storageGroup,
+ dataType);
+ break;
+ }
+ }
+ }
- // skip duplicated data
- if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) {
- continue;
+ @SuppressWarnings("squid:S135")
+ @Override
+ public void run() {
+ long memSerializeTime = 0;
+ boolean noMoreMessages = false;
+ LOGGER.debug("Storage group {} memtable {}, starts to encoding data.",
storageGroup,
+ memTable.getVersion());
+ while (true) {
+ if (noMoreEncodingTask) {
+ noMoreMessages = true;
+ }
+ Object task = encodingTaskQueue.poll();
+ if (task == null) {
+ if (noMoreMessages) {
+ break;
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(10);
+ } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
+ LOGGER.error("Storage group {} memtable {}, encoding task is
interrupted.",
+ storageGroup, memTable.getVersion(), e);
+ // generally it is because the thread pool is shutdown so the task
should be aborted
+ break;
+ }
+ } else {
+ if (task instanceof StartFlushGroupIOTask || task instanceof
EndChunkGroupIoTask) {
+ ioTaskQueue.add(task);
+ } else {
+ long starTime = System.currentTimeMillis();
+ Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList,
MeasurementSchema>) task;
+ IChunkWriter seriesWriter = new
ChunkWriterImpl(encodingMessage.right);
+ writeOneSeries(encodingMessage.left, seriesWriter,
encodingMessage.right.getType());
+ seriesWriter.sealCurrentPage();
+ seriesWriter.clearPageWriter();
+ ioTaskQueue.add(seriesWriter);
+ memSerializeTime += System.currentTimeMillis() - starTime;
+ }
+ }
}
+ noMoreIOTask = true;
+ LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding
data cost "
+ + "{} ms.",
+ storageGroup, memTable.getVersion(), memSerializeTime);
+ }
+ };
- switch (dataType) {
- case BOOLEAN:
- seriesWriterImpl.write(time, tvPairs.getBoolean(i));
- break;
- case INT32:
- seriesWriterImpl.write(time, tvPairs.getInt(i));
- break;
- case INT64:
- seriesWriterImpl.write(time, tvPairs.getLong(i));
- break;
- case FLOAT:
- seriesWriterImpl.write(time, tvPairs.getFloat(i));
- break;
- case DOUBLE:
- seriesWriterImpl.write(time, tvPairs.getDouble(i));
- break;
- case TEXT:
- seriesWriterImpl.write(time, tvPairs.getBinary(i));
+ @SuppressWarnings("squid:S135")
+ private Runnable ioTask = () -> {
+ long ioTime = 0;
+ boolean returnWhenNoTask = false;
+ LOGGER.debug("Storage group {} memtable {}, start io.", storageGroup,
memTable.getVersion());
+ while (true) {
+ if (noMoreIOTask) {
+ returnWhenNoTask = true;
+ }
+ Object ioMessage = ioTaskQueue.poll();
+ if (ioMessage == null) {
+ if (returnWhenNoTask) {
break;
- default:
- LOGGER.error("Storage group {} does not support data type: {}",
storageGroup,
- dataType);
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(10);
+ } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
+ LOGGER.error("Storage group {} memtable {}, io task is
interrupted.", storageGroup
+ , memTable.getVersion());
+ // generally it is because the thread pool is shutdown so the task
should be aborted
break;
+ }
+ } else {
+ long starTime = System.currentTimeMillis();
+ try {
+ if (ioMessage instanceof StartFlushGroupIOTask) {
+ this.writer.startChunkGroup(((StartFlushGroupIOTask)
ioMessage).deviceId);
+ } else if (ioMessage instanceof IChunkWriter) {
+ /*
+ can work
+ */
+// ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+// System.out.println(chunkWriter.getPageWriter() == null);
+// writer.currentChunkMetadata = new
ChunkMetadata(chunkWriter.getMeasurementSchema().getMeasurementId(),
chunkWriter.getMeasurementSchema().getType(),
+// 0, chunkWriter.getStatistics());
+// writer.endCurrentChunk();
+// chunkWriter.getPageBuffer().reset();
+
+ /*
+ can work
+ */
+// ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+// System.out.println(chunkWriter.getPageWriter() == null);
+// writer.currentChunkMetadata = new
ChunkMetadata(chunkWriter.getMeasurementSchema().getMeasurementId(),
chunkWriter.getMeasurementSchema().getType(),
+// 0, chunkWriter.getStatistics());
+// chunkWriter.getPageBuffer().reset();
+
+ /*
+ can work
+ */
+// ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+// System.out.println(chunkWriter.getPageWriter() == null);
+// ChunkMetadata chunkMetadata = new
ChunkMetadata(chunkWriter.getMeasurementSchema().getMeasurementId(),
chunkWriter.getMeasurementSchema().getType(),
+// 0, chunkWriter.getStatistics());
+// System.out.println(chunkMetadata.getNumOfPoints());
+// chunkWriter.getPageBuffer().reset();
+
+ /*
+ can work
+ */
+// ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+// System.out.println(chunkWriter.getPageWriter() == null);
+// chunkWriter.getPageBuffer().reset();
+
+ /*
+ can work
+ */
+// ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+// chunkWriter.getPageBuffer().reset();
+
+ /*
+ don't work
+ */
+// ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+
+ /*
+ don't work
+ */
+// ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+// System.out.println(chunkWriter.getPageBuffer().getBuf().length);
+
+ /*
+ don't work
+ */
+// ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+// chunkWriter.getPageBuffer().getBuf()[0] = (byte) 128;
+
+ ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+ chunkWriter.writeToFileWriter(this.writer);
+
+ /*
+ don't work
+ */
+// ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+// System.out.println(chunkWriter.getPageWriter() == null);
+// writer.currentChunkMetadata = new
ChunkMetadata(chunkWriter.getMeasurementSchema().getMeasurementId(),
chunkWriter.getMeasurementSchema().getType(),
+// 0, chunkWriter.getStatistics());
+// writer.endCurrentChunk();
+
+ /*
+ don't work
+ */
+// ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+// chunkWriter.numOfPages = 0;
+// chunkWriter.statistics = Statistics
+//
.getStatsByType(chunkWriter.getMeasurementSchema().getType());
+ } else {
+ this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
+ this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+ this.writer.endChunkGroup();
+ }
+ } catch (Exception e) {
+ LOGGER.error("Storage group {} memtable {}, io task meets error.",
storageGroup,
+ memTable.getVersion(), e);
+ throw new FlushRunTimeException(e);
+ }
+ ioTime += System.currentTimeMillis() - starTime;
}
}
+ LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms",
memTable.getVersion(),
+ storageGroup, ioTime);
+ };
+
+ static class EndChunkGroupIoTask {
+
+ EndChunkGroupIoTask() {
+
+ }
}
+ static class StartFlushGroupIOTask {
+
+ private final String deviceId;
+
+ StartFlushGroupIOTask(String deviceId) {
+ this.deviceId = deviceId;
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index f4adacf..56f6077 100644
---
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -220,7 +221,7 @@ public class TsFileRecoverPerformer {
// into it
} catch (IOException e) {
throw new StorageGroupProcessorException(e);
- } catch (InterruptedException e) {
+ } catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
throw new StorageGroupProcessorException(e);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index 79f587a..7cbdc87 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.tsfile.utils;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
/**
* A subclass extending <code>ByteArrayOutputStream</code>. It's used to return
@@ -46,4 +48,7 @@ public class PublicBAOS extends ByteArrayOutputStream {
return this.buf;
}
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(buf, 0, count);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index d441962..68e3a5e 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -51,13 +51,15 @@ public class ChunkWriterImpl implements IChunkWriter {
*/
private PublicBAOS pageBuffer;
- private int numOfPages;
+ public int numOfPages;
/**
* write data into current page
*/
private PageWriter pageWriter;
+ private int chunkDataSize;
+
/**
* page size threshold.
*/
@@ -76,7 +78,7 @@ public class ChunkWriterImpl implements IChunkWriter {
/**
* statistic of this chunk.
*/
- private Statistics<?> statistics;
+ public Statistics<?> statistics;
private boolean isSdtEncoding;
@@ -319,8 +321,9 @@ public class ChunkWriterImpl implements IChunkWriter {
writeAllPagesOfChunkToTsFile(tsfileWriter, statistics);
// reinit this chunk writer
- pageBuffer.reset();
+// pageBuffer.reset();
numOfPages = 0;
+ chunkDataSize = 0;
this.statistics = Statistics.getStatsByType(measurementSchema.getType());
}
@@ -347,6 +350,7 @@ public class ChunkWriterImpl implements IChunkWriter {
public void clearPageWriter() {
pageWriter = null;
+ chunkDataSize = pageBuffer.size();
}
@Override
@@ -406,15 +410,15 @@ public class ChunkWriterImpl implements IChunkWriter {
// start to write this column chunk
writer.startFlushChunk(measurementSchema, compressor.getType(),
measurementSchema.getType(),
- measurementSchema.getEncodingType(), statistics, pageBuffer.size(),
numOfPages);
+ measurementSchema.getEncodingType(), statistics, chunkDataSize,
numOfPages);
long dataOffset = writer.getPos();
- // write all pages of this column
+// write all pages of this column
writer.writeBytesToStream(pageBuffer);
long dataSize = writer.getPos() - dataOffset;
- if (dataSize != pageBuffer.size()) {
+ if (dataSize != chunkDataSize) {
throw new IOException(
"Bytes written is inconsistent with the size of data: " + dataSize +
" !="
+ " " + pageBuffer.size());
@@ -442,4 +446,20 @@ public class ChunkWriterImpl implements IChunkWriter {
public boolean isMerging() {
return isMerging;
}
+
+ public PublicBAOS getPageBuffer() {
+ return pageBuffer;
+ }
+
+ public MeasurementSchema getMeasurementSchema() {
+ return measurementSchema;
+ }
+
+ public Statistics<?> getStatistics() {
+ return statistics;
+ }
+
+ public PageWriter getPageWriter() {
+ return pageWriter;
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 0275909..cf7f8bb 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -77,7 +77,7 @@ public class TsFileIOWriter {
protected File file;
// current flushed Chunk
- private ChunkMetadata currentChunkMetadata;
+ public ChunkMetadata currentChunkMetadata;
// current flushed ChunkGroup
protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
// all flushed ChunkGroups
@@ -191,7 +191,6 @@ public class TsFileIOWriter {
ChunkHeader header = new ChunkHeader(measurementSchema.getMeasurementId(),
dataSize, tsDataType,
compressionCodecName, encodingType, numOfPages);
header.serializeTo(out.wrapAsStream());
-
}
/**