This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0e2ebd9 Improve flush pipeline , remove synchronized qualifier in
PublicBAOS(#2596)
0e2ebd9 is described below
commit 0e2ebd990168b1009ce6332745a6875a70c4cc5a
Author: Jackie Tien <[email protected]>
AuthorDate: Sun Jan 31 20:28:44 2021 +0800
Improve flush pipeline , remove synchronized qualifier in PublicBAOS(#2596)
---
.../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 33 ++++++++++++++++++++++
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 14 ++++++---
2 files changed, 43 insertions(+), 4 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index 79f587a..de4c985 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.tsfile.utils;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
/**
* A subclass extending <code>ByteArrayOutputStream</code>. It's used to return
@@ -46,4 +48,35 @@ public class PublicBAOS extends ByteArrayOutputStream {
return this.buf;
}
+ /**
+ * It's not a thread-safe method.
+ * Override the super class's implementation.
+ * Remove the synchronized key word, to save the synchronization overhead.
+ *
+ * Writes the complete contents of this byte array output stream to
+ * the specified output stream argument, as if by calling the output
+ * stream's write method using <code>out.write(buf, 0, count)</code>.
+ *
+ * @param out the output stream to which to write the data.
+ * @exception IOException if an I/O error occurs.
+ */
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(buf, 0, count);
+ }
+
+ /**
+ * It's not a thread-safe method.
+ * Override the super class's implementation.
+ * Remove the synchronized key word, to save the synchronization overhead.
+ *
+ * Resets the <code>count</code> field of this byte array output
+ * stream to zero, so that all currently accumulated output in the
+ * output stream is discarded. The output stream can be used again,
+ * reusing the already allocated buffer space.
+ */
+ @Override
+ public void reset() {
+ count = 0;
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 2e9cdf9..f5bca68 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -52,6 +52,11 @@ public class ChunkWriterImpl implements IChunkWriter {
*/
private PublicBAOS pageBuffer;
+ /**
+ * current chunk data size, i.e the size of pageBuffer
+ */
+ private int chunkDataSize;
+
private int numOfPages;
/**
@@ -351,6 +356,7 @@ public class ChunkWriterImpl implements IChunkWriter {
if (pageWriter != null && pageWriter.getPointNumber() > 0) {
writePageToPageBuffer();
}
+ chunkDataSize = pageBuffer.size();
}
@Override
@@ -419,18 +425,18 @@ public class ChunkWriterImpl implements IChunkWriter {
// start to write this column chunk
writer.startFlushChunk(measurementSchema, compressor.getType(),
measurementSchema.getType(),
- measurementSchema.getEncodingType(), statistics, pageBuffer.size(),
numOfPages);
+ measurementSchema.getEncodingType(), statistics, chunkDataSize,
numOfPages);
long dataOffset = writer.getPos();
// write all pages of this column
writer.writeBytesToStream(pageBuffer);
- long dataSize = writer.getPos() - dataOffset;
- if (dataSize != pageBuffer.size()) {
+ int dataSize = (int) (writer.getPos() - dataOffset);
+ if (dataSize != chunkDataSize) {
throw new IOException(
"Bytes written is inconsistent with the size of data: " + dataSize +
" !="
- + " " + pageBuffer.size());
+ + " " + chunkDataSize);
}
writer.endCurrentChunk();