This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ImproveFlush
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 53f8c6270f4aeedf4b5d1c1c3c40fdc30dd58ad8
Author: JackieTien97 <[email protected]>
AuthorDate: Fri Jan 29 10:25:48 2021 +0800

    Improve flush pipeline
---
 .../org/apache/iotdb/tsfile/utils/PublicBAOS.java  | 31 ++++++++++++++++++++++
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  | 14 +++++++---
 2 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index 79f587a..a911af8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.tsfile.utils;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 
 /**
  * A subclass extending <code>ByteArrayOutputStream</code>. It's used to return
@@ -46,4 +48,33 @@ public class PublicBAOS extends ByteArrayOutputStream {
     return this.buf;
   }
 
+  /**
+   * It's not a thread-safe method.
+   * Override the super class's implementation.
+   * Remove the synchronized key word, to save the synchronization overhead.
+   *
+   * Writes the complete contents of this byte array output stream to
+   * the specified output stream argument, as if by calling the output
+   * stream's write method using <code>out.write(buf, 0, count)</code>.
+   *
+   * @param      out   the output stream to which to write the data.
+   * @exception  IOException  if an I/O error occurs.
+   */
+  public void writeTo(OutputStream out) throws IOException {
+    out.write(buf, 0, count);
+  }
+
+  /**
+   * It's not a thread-safe method.
+   * Override the super class's implementation.
+   * Remove the synchronized key word, to save the synchronization overhead.
+   *
+   * Resets the <code>count</code> field of this byte array output
+   * stream to zero, so that all currently accumulated output in the
+   * output stream is discarded. The output stream can be used again,
+   * reusing the already allocated buffer space.
+   */
+  public void reset() {
+    count = 0;
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 46b1fa4..a1968c8 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -52,6 +52,11 @@ public class ChunkWriterImpl implements IChunkWriter {
    */
   private PublicBAOS pageBuffer;
 
+  /**
+   * current chunk data size, i.e the size of pageBuffer
+   */
+  private int chunkDataSize;
+
   private int numOfPages;
 
   /**
@@ -351,6 +356,7 @@ public class ChunkWriterImpl implements IChunkWriter {
     if (pageWriter != null && pageWriter.getPointNumber() > 0) {
       writePageToPageBuffer();
     }
+    chunkDataSize = pageBuffer.size();
   }
   
   public void clearPageWriter() {
@@ -418,18 +424,18 @@ public class ChunkWriterImpl implements IChunkWriter {
 
     // start to write this column chunk
     writer.startFlushChunk(measurementSchema, compressor.getType(), 
measurementSchema.getType(),
-        measurementSchema.getEncodingType(), statistics, pageBuffer.size(), 
numOfPages);
+        measurementSchema.getEncodingType(), statistics, chunkDataSize, 
numOfPages);
 
     long dataOffset = writer.getPos();
 
     // write all pages of this column
     writer.writeBytesToStream(pageBuffer);
 
-    long dataSize = writer.getPos() - dataOffset;
-    if (dataSize != pageBuffer.size()) {
+    int dataSize = (int) (writer.getPos() - dataOffset);
+    if (dataSize != chunkDataSize) {
       throw new IOException(
           "Bytes written is inconsistent with the size of data: " + dataSize + 
" !="
-              + " " + pageBuffer.size());
+              + " " + chunkDataSize);
     }
 
     writer.endCurrentChunk();

Reply via email to