This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ImproveFlush in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 53f8c6270f4aeedf4b5d1c1c3c40fdc30dd58ad8 Author: JackieTien97 <[email protected]> AuthorDate: Fri Jan 29 10:25:48 2021 +0800 Improve flush pipeline --- .../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 31 ++++++++++++++++++++++ .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 14 +++++++--- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java index 79f587a..a911af8 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java @@ -19,6 +19,8 @@ package org.apache.iotdb.tsfile.utils; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; /** * A subclass extending <code>ByteArrayOutputStream</code>. It's used to return @@ -46,4 +48,33 @@ public class PublicBAOS extends ByteArrayOutputStream { return this.buf; } + /** + * It's not a thread-safe method. + * Override the super class's implementation. + * Remove the synchronized key word, to save the synchronization overhead. + * + * Writes the complete contents of this byte array output stream to + * the specified output stream argument, as if by calling the output + * stream's write method using <code>out.write(buf, 0, count)</code>. + * + * @param out the output stream to which to write the data. + * @exception IOException if an I/O error occurs. + */ + public void writeTo(OutputStream out) throws IOException { + out.write(buf, 0, count); + } + + /** + * It's not a thread-safe method. + * Override the super class's implementation. + * Remove the synchronized key word, to save the synchronization overhead. + * + * Resets the <code>count</code> field of this byte array output + * stream to zero, so that all currently accumulated output in the + * output stream is discarded. The output stream can be used again, + * reusing the already allocated buffer space. + */ + public void reset() { + count = 0; + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java index 46b1fa4..a1968c8 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java @@ -52,6 +52,11 @@ public class ChunkWriterImpl implements IChunkWriter { */ private PublicBAOS pageBuffer; + /** + * current chunk data size, i.e the size of pageBuffer + */ + private int chunkDataSize; + private int numOfPages; /** @@ -351,6 +356,7 @@ public class ChunkWriterImpl implements IChunkWriter { if (pageWriter != null && pageWriter.getPointNumber() > 0) { writePageToPageBuffer(); } + chunkDataSize = pageBuffer.size(); } public void clearPageWriter() { @@ -418,18 +424,18 @@ public class ChunkWriterImpl implements IChunkWriter { // start to write this column chunk writer.startFlushChunk(measurementSchema, compressor.getType(), measurementSchema.getType(), - measurementSchema.getEncodingType(), statistics, pageBuffer.size(), numOfPages); + measurementSchema.getEncodingType(), statistics, chunkDataSize, numOfPages); long dataOffset = writer.getPos(); // write all pages of this column writer.writeBytesToStream(pageBuffer); - long dataSize = writer.getPos() - dataOffset; - if (dataSize != pageBuffer.size()) { + int dataSize = (int) (writer.getPos() - dataOffset); + if (dataSize != chunkDataSize) { throw new IOException( "Bytes written is inconsistent with the size of data: " + dataSize + " !=" - + " " + pageBuffer.size()); + + " " + chunkDataSize); } writer.endCurrentChunk();
