This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch two_stage_pipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/two_stage_pipeline by this 
push:
     new a6a7bc7  find reason
a6a7bc7 is described below

commit a6a7bc7b6bfc1df19b389563a7a45a0c4e168022
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Jan 28 19:27:23 2021 +0800

    find reason
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  12 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 326 +++++++++++++++++----
 .../writelog/recover/TsFileRecoverPerformer.java   |   3 +-
 .../org/apache/iotdb/tsfile/utils/PublicBAOS.java  |   5 +
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  |  32 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |   3 +-
 6 files changed, 304 insertions(+), 77 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d0918f1..e35fe72 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -128,17 +128,17 @@ public class IoTDBConfig {
   /**
    * Memory allocated for the write process
    */
-  private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 4 / 
10;
+  private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 7 / 
10;
 
   /**
    * Memory allocated for the read process
    */
-  private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 
10;
+  private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() / 10;
 
   /**
    * Memory allocated for the mtree
    */
-  private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() * 1 
/ 10;
+  private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10;
 
   /**
    * Memory allocated for the read process besides cache
@@ -298,7 +298,7 @@ public class IoTDBConfig {
   /**
    * Is the write mem control for writing enable.
    */
-  private boolean enableMemControl = true;
+  private boolean enableMemControl = false;
 
   /**
    * Is the write ahead log enable.
@@ -340,12 +340,12 @@ public class IoTDBConfig {
   /**
    * When a memTable's size (in byte) exceeds this, the memtable is flushed to 
disk.
    */
-  private long memtableSizeThreshold = 1024 * 1024 * 1024L;
+  private long memtableSizeThreshold = 5 * 1024 * 1024 * 1024L;
 
   /**
    * When average series point number reaches this, flush the memtable to disk
    */
-  private int avgSeriesPointNumberThreshold = 100000;
+  private int avgSeriesPointNumberThreshold = 1000000000;
 
   /**
    * Work when tsfile_manage_strategy is level_strategy. When merge point 
number reaches this, merge
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index eaf2fc7..0328eab 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -20,10 +20,17 @@ package org.apache.iotdb.db.engine.flush;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -34,23 +41,35 @@ import org.slf4j.LoggerFactory;
 public class MemTableFlushTask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MemTableFlushTask.class);
-  private final RestorableTsFileIOWriter writer;
+  private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = 
FlushSubTaskPoolManager
+      .getInstance();
+  private final Future<?> encodingTaskFuture;
+  private final Future<?> ioTaskFuture;
+  private RestorableTsFileIOWriter writer;
 
-  private final String storageGroup;
+  private final ConcurrentLinkedQueue<Object> ioTaskQueue = new 
ConcurrentLinkedQueue<>();
+  private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new 
ConcurrentLinkedQueue<>();
+  private String storageGroup;
 
-  private final IMemTable memTable;
+  private IMemTable memTable;
 
+  private volatile boolean noMoreEncodingTask = false;
+  private volatile boolean noMoreIOTask = false;
 
   /**
-   * @param memTable the memTable to flush
-   * @param writer the writer where memTable will be flushed to (current 
tsfile writer or vm writer)
+   * @param memTable     the memTable to flush
+   * @param writer       the writer where memTable will be flushed to (current 
tsfile writer or vm
+   *                     writer)
    * @param storageGroup current storage group
    */
 
-  public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter 
writer, String storageGroup) {
+  public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer,
+      String storageGroup) {
     this.memTable = memTable;
     this.writer = writer;
     this.storageGroup = storageGroup;
+    this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
+    this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
     LOGGER.debug("flush task of Storage group {} memtable {} is created ",
         storageGroup, memTable.getVersion());
   }
@@ -59,97 +78,280 @@ public class MemTableFlushTask {
    * the function for flushing memtable.
    */
   public void syncFlushMemTable()
-      throws InterruptedException, IOException {
+      throws ExecutionException, InterruptedException {
     LOGGER.info("The memTable size of SG {} is {}, the avg series points num 
in chunk is {} ",
         storageGroup,
         memTable.memSize(),
         memTable.getTotalPointsNum() / memTable.getSeriesNumber());
     long start = System.currentTimeMillis();
     long sortTime = 0;
-    long encodingTime = 0;
-    long ioTime = 0;
 
     //for map do not use get(key) to iteratate
-    for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : 
memTable.getMemTableMap().entrySet()) {
-      writer.startChunkGroup(memTableEntry.getKey());
+    for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : 
memTable.getMemTableMap()
+        .entrySet()) {
+      encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
+
       final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : 
value.entrySet()) {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = iWritableMemChunkEntry.getValue();
         MeasurementSchema desc = series.getSchema();
         TVList tvList = series.getSortedTVListForFlush();
-        long encodingStartTime = System.currentTimeMillis();
-        sortTime += encodingStartTime - startTime;
-        IChunkWriter seriesWriter = new ChunkWriterImpl(desc);
-        writeOneSeries(tvList, seriesWriter, desc.getType());
-        seriesWriter.sealCurrentPage();
-        seriesWriter.clearPageWriter();
-        long ioStartTime = System.currentTimeMillis();
-        encodingTime += (ioStartTime - encodingStartTime);
-        seriesWriter.writeToFileWriter(this.writer);
-        ioTime += (System.currentTimeMillis() - ioStartTime);
+        sortTime += System.currentTimeMillis() - startTime;
+        encodingTaskQueue.add(new Pair<>(tvList, desc));
       }
-      long ioStartTime = System.currentTimeMillis();
-      writer.setMinPlanIndex(memTable.getMinPlanIndex());
-      writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
-      writer.endChunkGroup();
-      ioTime += (System.currentTimeMillis() - ioStartTime);
+
+      encodingTaskQueue.add(new EndChunkGroupIoTask());
     }
 
+    noMoreEncodingTask = true;
     LOGGER.info(
         "Storage group {} memtable {}, flushing into disk: data sort time cost 
{} ms.",
         storageGroup, memTable.getVersion(), sortTime);
-    LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding 
data cost "
-            + "{} ms.",
-        storageGroup, memTable.getVersion(), encodingTime);
-    LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", 
memTable.getVersion(),
-        storageGroup, ioTime);
 
-    writer.writeVersion(memTable.getVersion());
-    writer.writePlanIndices();
+    try {
+      encodingTaskFuture.get();
+    } catch (InterruptedException | ExecutionException e) {
+      // avoid ioTask waiting forever
+      noMoreIOTask = true;
+      ioTaskFuture.cancel(true);
+      throw e;
+    }
+
+    ioTaskFuture.get();
+
+    try {
+      writer.writeVersion(memTable.getVersion());
+      writer.writePlanIndices();
+    } catch (IOException e) {
+      throw new ExecutionException(e);
+    }
 
     LOGGER.info(
         "Storage group {} memtable {} flushing a memtable has finished! Time 
consumption: {}ms",
         storageGroup, memTable, System.currentTimeMillis() - start);
+  }
 
+  private Runnable encodingTask = new Runnable() {
+    private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
+        TSDataType dataType) {
+      for (int i = 0; i < tvPairs.size(); i++) {
+        long time = tvPairs.getTime(i);
 
-  }
+        // skip duplicated data
+        if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) {
+          continue;
+        }
 
-  private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
-      TSDataType dataType) {
-    for (int i = 0; i < tvPairs.size(); i++) {
-      long time = tvPairs.getTime(i);
+        switch (dataType) {
+          case BOOLEAN:
+            seriesWriterImpl.write(time, tvPairs.getBoolean(i));
+            break;
+          case INT32:
+            seriesWriterImpl.write(time, tvPairs.getInt(i));
+            break;
+          case INT64:
+            seriesWriterImpl.write(time, tvPairs.getLong(i));
+            break;
+          case FLOAT:
+            seriesWriterImpl.write(time, tvPairs.getFloat(i));
+            break;
+          case DOUBLE:
+            seriesWriterImpl.write(time, tvPairs.getDouble(i));
+            break;
+          case TEXT:
+            seriesWriterImpl.write(time, tvPairs.getBinary(i));
+            break;
+          default:
+            LOGGER.error("Storage group {} does not support data type: {}", 
storageGroup,
+                dataType);
+            break;
+        }
+      }
+    }
 
-      // skip duplicated data
-      if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) {
-        continue;
+    @SuppressWarnings("squid:S135")
+    @Override
+    public void run() {
+      long memSerializeTime = 0;
+      boolean noMoreMessages = false;
+      LOGGER.debug("Storage group {} memtable {}, starts to encoding data.", 
storageGroup,
+          memTable.getVersion());
+      while (true) {
+        if (noMoreEncodingTask) {
+          noMoreMessages = true;
+        }
+        Object task = encodingTaskQueue.poll();
+        if (task == null) {
+          if (noMoreMessages) {
+            break;
+          }
+          try {
+            TimeUnit.MILLISECONDS.sleep(10);
+          } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
+            LOGGER.error("Storage group {} memtable {}, encoding task is 
interrupted.",
+                storageGroup, memTable.getVersion(), e);
+            // generally it is because the thread pool is shutdown so the task 
should be aborted
+            break;
+          }
+        } else {
+          if (task instanceof StartFlushGroupIOTask || task instanceof 
EndChunkGroupIoTask) {
+            ioTaskQueue.add(task);
+          } else {
+            long starTime = System.currentTimeMillis();
+            Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, 
MeasurementSchema>) task;
+            IChunkWriter seriesWriter = new 
ChunkWriterImpl(encodingMessage.right);
+            writeOneSeries(encodingMessage.left, seriesWriter, 
encodingMessage.right.getType());
+            seriesWriter.sealCurrentPage();
+            seriesWriter.clearPageWriter();
+            ioTaskQueue.add(seriesWriter);
+            memSerializeTime += System.currentTimeMillis() - starTime;
+          }
+        }
       }
+      noMoreIOTask = true;
+      LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding 
data cost "
+              + "{} ms.",
+          storageGroup, memTable.getVersion(), memSerializeTime);
+    }
+  };
 
-      switch (dataType) {
-        case BOOLEAN:
-          seriesWriterImpl.write(time, tvPairs.getBoolean(i));
-          break;
-        case INT32:
-          seriesWriterImpl.write(time, tvPairs.getInt(i));
-          break;
-        case INT64:
-          seriesWriterImpl.write(time, tvPairs.getLong(i));
-          break;
-        case FLOAT:
-          seriesWriterImpl.write(time, tvPairs.getFloat(i));
-          break;
-        case DOUBLE:
-          seriesWriterImpl.write(time, tvPairs.getDouble(i));
-          break;
-        case TEXT:
-          seriesWriterImpl.write(time, tvPairs.getBinary(i));
+  @SuppressWarnings("squid:S135")
+  private Runnable ioTask = () -> {
+    long ioTime = 0;
+    boolean returnWhenNoTask = false;
+    LOGGER.debug("Storage group {} memtable {}, start io.", storageGroup, 
memTable.getVersion());
+    while (true) {
+      if (noMoreIOTask) {
+        returnWhenNoTask = true;
+      }
+      Object ioMessage = ioTaskQueue.poll();
+      if (ioMessage == null) {
+        if (returnWhenNoTask) {
           break;
-        default:
-          LOGGER.error("Storage group {} does not support data type: {}", 
storageGroup,
-              dataType);
+        }
+        try {
+          TimeUnit.MILLISECONDS.sleep(10);
+        } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
+          LOGGER.error("Storage group {} memtable {}, io task is 
interrupted.", storageGroup
+              , memTable.getVersion());
+          // generally it is because the thread pool is shutdown so the task 
should be aborted
           break;
+        }
+      } else {
+        long starTime = System.currentTimeMillis();
+        try {
+          if (ioMessage instanceof StartFlushGroupIOTask) {
+            this.writer.startChunkGroup(((StartFlushGroupIOTask) 
ioMessage).deviceId);
+          } else if (ioMessage instanceof IChunkWriter) {
+            /*
+            can work
+             */
+//            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+//            System.out.println(chunkWriter.getPageWriter() == null);
+//            writer.currentChunkMetadata = new 
ChunkMetadata(chunkWriter.getMeasurementSchema().getMeasurementId(), 
chunkWriter.getMeasurementSchema().getType(),
+//                0, chunkWriter.getStatistics());
+//            writer.endCurrentChunk();
+//            chunkWriter.getPageBuffer().reset();
+
+            /*
+            can work
+             */
+//            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+//            System.out.println(chunkWriter.getPageWriter() == null);
+//            writer.currentChunkMetadata = new 
ChunkMetadata(chunkWriter.getMeasurementSchema().getMeasurementId(), 
chunkWriter.getMeasurementSchema().getType(),
+//                0, chunkWriter.getStatistics());
+//            chunkWriter.getPageBuffer().reset();
+
+            /*
+            can work
+             */
+//            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+//            System.out.println(chunkWriter.getPageWriter() == null);
+//            ChunkMetadata chunkMetadata = new 
ChunkMetadata(chunkWriter.getMeasurementSchema().getMeasurementId(), 
chunkWriter.getMeasurementSchema().getType(),
+//                0, chunkWriter.getStatistics());
+//            System.out.println(chunkMetadata.getNumOfPoints());
+//            chunkWriter.getPageBuffer().reset();
+
+            /*
+            can work
+             */
+//            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+//            System.out.println(chunkWriter.getPageWriter() == null);
+//            chunkWriter.getPageBuffer().reset();
+
+            /*
+            can work
+             */
+//            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+//            chunkWriter.getPageBuffer().reset();
+
+            /*
+            don't work
+             */
+//            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+
+            /*
+            don't work
+             */
+//            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+//            System.out.println(chunkWriter.getPageBuffer().getBuf().length);
+
+            /*
+            don't work
+             */
+//            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+//            chunkWriter.getPageBuffer().getBuf()[0] = (byte) 128;
+
+            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+            chunkWriter.writeToFileWriter(this.writer);
+
+            /*
+            don't work
+             */
+//            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+//            System.out.println(chunkWriter.getPageWriter() == null);
+//            writer.currentChunkMetadata = new 
ChunkMetadata(chunkWriter.getMeasurementSchema().getMeasurementId(), 
chunkWriter.getMeasurementSchema().getType(),
+//                0, chunkWriter.getStatistics());
+//            writer.endCurrentChunk();
+
+            /*
+            don't work
+             */
+//            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+//            chunkWriter.numOfPages = 0;
+//            chunkWriter.statistics = Statistics
+//                
.getStatsByType(chunkWriter.getMeasurementSchema().getType());
+          } else {
+            this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
+            this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+            this.writer.endChunkGroup();
+          }
+        } catch (Exception e) {
+          LOGGER.error("Storage group {} memtable {}, io task meets error.", 
storageGroup,
+              memTable.getVersion(), e);
+          throw new FlushRunTimeException(e);
+        }
+        ioTime += System.currentTimeMillis() - starTime;
       }
     }
+    LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", 
memTable.getVersion(),
+        storageGroup, ioTime);
+  };
+
+  static class EndChunkGroupIoTask {
+
+    EndChunkGroupIoTask() {
+
+    }
   }
 
+  static class StartFlushGroupIOTask {
+
+    private final String deviceId;
+
+    StartFlushGroupIOTask(String deviceId) {
+      this.deviceId = deviceId;
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index f4adacf..56f6077 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -220,7 +221,7 @@ public class TsFileRecoverPerformer {
       // into it
     } catch (IOException e) {
       throw new StorageGroupProcessorException(e);
-    } catch (InterruptedException e) {
+    } catch (InterruptedException | ExecutionException e) {
       Thread.currentThread().interrupt();
       throw new StorageGroupProcessorException(e);
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index 79f587a..7cbdc87 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.tsfile.utils;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 
 /**
  * A subclass extending <code>ByteArrayOutputStream</code>. It's used to return
@@ -46,4 +48,7 @@ public class PublicBAOS extends ByteArrayOutputStream {
     return this.buf;
   }
 
+  public void writeTo(OutputStream out) throws IOException {
+    out.write(buf, 0, count);
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index d441962..68e3a5e 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -51,13 +51,15 @@ public class ChunkWriterImpl implements IChunkWriter {
    */
   private PublicBAOS pageBuffer;
 
-  private int numOfPages;
+  public int numOfPages;
 
   /**
    * write data into current page
    */
   private PageWriter pageWriter;
 
+  private int chunkDataSize;
+
   /**
    * page size threshold.
    */
@@ -76,7 +78,7 @@ public class ChunkWriterImpl implements IChunkWriter {
   /**
    * statistic of this chunk.
    */
-  private Statistics<?> statistics;
+  public Statistics<?> statistics;
 
   private boolean isSdtEncoding;
 
@@ -319,8 +321,9 @@ public class ChunkWriterImpl implements IChunkWriter {
     writeAllPagesOfChunkToTsFile(tsfileWriter, statistics);
 
     // reinit this chunk writer
-    pageBuffer.reset();
+//    pageBuffer.reset();
     numOfPages = 0;
+    chunkDataSize = 0;
     this.statistics = Statistics.getStatsByType(measurementSchema.getType());
   }
 
@@ -347,6 +350,7 @@ public class ChunkWriterImpl implements IChunkWriter {
 
   public void clearPageWriter() {
     pageWriter = null;
+    chunkDataSize = pageBuffer.size();
   }
 
   @Override
@@ -406,15 +410,15 @@ public class ChunkWriterImpl implements IChunkWriter {
 
     // start to write this column chunk
     writer.startFlushChunk(measurementSchema, compressor.getType(), 
measurementSchema.getType(),
-        measurementSchema.getEncodingType(), statistics, pageBuffer.size(), 
numOfPages);
+        measurementSchema.getEncodingType(), statistics, chunkDataSize, 
numOfPages);
 
     long dataOffset = writer.getPos();
 
-    // write all pages of this column
+//     write all pages of this column
     writer.writeBytesToStream(pageBuffer);
 
     long dataSize = writer.getPos() - dataOffset;
-    if (dataSize != pageBuffer.size()) {
+    if (dataSize != chunkDataSize) {
       throw new IOException(
           "Bytes written is inconsistent with the size of data: " + dataSize + 
" !="
               + " " + pageBuffer.size());
@@ -442,4 +446,20 @@ public class ChunkWriterImpl implements IChunkWriter {
   public boolean isMerging() {
     return isMerging;
   }
+
+  public PublicBAOS getPageBuffer() {
+    return pageBuffer;
+  }
+
+  public MeasurementSchema getMeasurementSchema() {
+    return measurementSchema;
+  }
+
+  public Statistics<?> getStatistics() {
+    return statistics;
+  }
+
+  public PageWriter getPageWriter() {
+    return pageWriter;
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 0275909..cf7f8bb 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -77,7 +77,7 @@ public class TsFileIOWriter {
   protected File file;
 
   // current flushed Chunk
-  private ChunkMetadata currentChunkMetadata;
+  public ChunkMetadata currentChunkMetadata;
   // current flushed ChunkGroup
   protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
   // all flushed ChunkGroups
@@ -191,7 +191,6 @@ public class TsFileIOWriter {
     ChunkHeader header = new ChunkHeader(measurementSchema.getMeasurementId(), 
dataSize, tsDataType,
         compressionCodecName, encodingType, numOfPages);
     header.serializeTo(out.wrapAsStream());
-
   }
 
   /**

Reply via email to