This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 073625ad9c1 Fix short reads in fixed-length deserialization (#17870)
(#18007)
073625ad9c1 is described below
commit 073625ad9c19e491eefad5296c3761f92574a176
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 24 10:02:29 2026 +0800
Fix short reads in fixed-length deserialization (#17870) (#18007)
* Fix short reads in fixed-length deserialization
* Fix tag log append EOF handling
* spotless
---------
(cherry picked from commit aa2acd6b4f550f1138348b5d04283b18521b63d9)
---
.../confignode/persistence/ProcedureInfo.java | 13 +-
.../writelog/io/SingleFileLogReader.java | 5 +-
.../datastructure/SerializableList.java | 3 +-
.../logfile/FakeCRC32Deserializer.java | 5 +-
.../mtree/impl/pbtree/schemafile/SchemaFile.java | 3 +-
.../pbtree/schemafile/log/SchemaFileLogReader.java | 7 +-
.../pbtree/schemafile/pagemgr/PageIOChannel.java | 3 +-
.../schemaengine/schemaregion/tag/TagLogFile.java | 12 +-
.../dataregion/wal/io/WALFileVersion.java | 4 +-
.../dataregion/wal/io/WALInputStream.java | 32 +++--
.../dataregion/wal/io/WALMetaData.java | 12 +-
.../dataregion/wal/recover/WALRepairWriter.java | 3 +-
.../load/splitter/AlignedChunkData.java | 5 +-
.../response/SubscriptionEventTsFileResponse.java | 11 +-
.../iotdb/db/utils/sort/FileSpillerReader.java | 4 +-
.../logfile/FakeCRC32DeserializerTest.java | 108 +++++++++++++++
.../schemaregion/tag/TagLogFileTest.java | 57 ++++++++
.../index/impl/TimeWindowStateProgressIndex.java | 9 +-
.../commons/executable/ExecutableManager.java | 3 +-
.../queue/serializer/PlainQueueSerializer.java | 3 +-
.../pipe/sink/protocol/IoTDBAirGapSink.java | 5 +-
.../org/apache/iotdb/commons/utils/IOUtils.java | 24 +++-
.../apache/iotdb/commons/utils/IOUtilsTest.java | 147 +++++++++++++++++++++
23 files changed, 414 insertions(+), 64 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
index 5589fdd3799..605bd65c8a0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
@@ -163,8 +164,16 @@ public class ProcedureInfo implements SnapshotProcessor {
try (FileInputStream fis = new
FileInputStream(procedureFilePath.toFile())) {
Procedure procedure = null;
try (FileChannel channel = fis.getChannel()) {
- ByteBuffer byteBuffer =
ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE);
- if (channel.read(byteBuffer) > 0) {
+ final long fileSize = channel.size();
+ if (fileSize > PROCEDURE_LOAD_BUFFER_SIZE) {
+ throw new IOException(
+ String.format(
+ "Procedure file %s exceeds the load buffer limit %s, actual
size %s",
+ procedureFilePath, PROCEDURE_LOAD_BUFFER_SIZE, fileSize));
+ }
+ ByteBuffer byteBuffer = ByteBuffer.allocate((int) fileSize);
+ if (fileSize > 0) {
+ IOUtils.readFully(channel, byteBuffer);
byteBuffer.flip();
procedure = ProcedureFactory.getInstance().create(byteBuffer);
byteBuffer.clear();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
index f67cfdc286a..8e8df07cf36 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
@@ -75,10 +75,7 @@ public class SingleFileLogReader implements ILogReader {
}
buffer = new byte[logSize];
- int readLen = logStream.read(buffer, 0, logSize);
- if (readLen < logSize) {
- throw new IOException("Reach eof");
- }
+ logStream.readFully(buffer, 0, logSize);
final long checkSum = logStream.readLong();
checkSummer.reset();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java
index c6b7cee0f3a..3870975fa1e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.transformation.datastructure;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.TemporaryQueryDataFileService;
@@ -60,7 +61,7 @@ public interface SerializableList {
}
init();
ByteBuffer byteBuffer =
ByteBuffer.allocate(recorder.getSerializedByteLength());
- recorder.getFileChannel().read(byteBuffer);
+ IOUtils.readFully(recorder.getFileChannel(), byteBuffer);
byteBuffer.flip();
deserialize(byteBuffer);
recorder.closeFile();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
index 46ce88825ec..9ecca265a9d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
@@ -27,7 +27,6 @@ import javax.validation.constraints.NotNull;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -67,9 +66,7 @@ public class FakeCRC32Deserializer<T> implements
IDeserializer<T> {
}
byte[] logBuffer = new byte[logLength];
- if (logLength < inputStream.read(logBuffer, 0, logLength)) {
- throw new EOFException();
- }
+ dataInputStream.readFully(logBuffer, 0, logLength);
T result = deserializer.deserialize(ByteBuffer.wrap(logBuffer));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index 692736474ba..1501f50acc1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -360,7 +361,7 @@ public class SchemaFile implements ISchemaFile {
lastSGAddr = 0L;
pageManager = new BTreePageManager(channel, pmtFile, -1, logPath);
} else {
- channel.read(headerContent);
+ IOUtils.readFully(channel, headerContent);
headerContent.clear();
lastPageIndex = ReadWriteIOUtils.readInt(headerContent);
dataTTL = ReadWriteIOUtils.readLong(headerContent);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
index c723e38dfaf..227ea27f505 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafil
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -91,8 +93,9 @@ public class SchemaFileLogReader {
}
}
- // corrupted within one entry
- if (inputStream.read(tempBytes, 1, tempBytes.length - 1) <
tempBytes.length - 2) {
+ try {
+ new DataInputStream(inputStream).readFully(tempBytes, 1,
tempBytes.length - 1);
+ } catch (EOFException e) {
throw new SchemaFileLogCorruptedException(logFile.getAbsolutePath(),
"incomplete entry.");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
index 044b7339a82..4e9d6524f19 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
@@ -19,6 +19,7 @@
package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.utils.IOUtils;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogReader;
@@ -107,7 +108,7 @@ public class PageIOChannel {
if (!readChannel.isOpen()) {
readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
}
- readChannel.read(dst, getPageAddress(pageIndex));
+ IOUtils.readFully(readChannel, dst, getPageAddress(pageIndex));
}
// region Flush Strategy
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
index 9cf9d51cf21..512c9fcea57 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.schemaengine.schemaregion.tag;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.commons.io.FileUtils;
@@ -114,7 +115,7 @@ public class TagLogFile implements AutoCloseable {
throws IOException {
// Read the first block
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
- fileChannel.read(byteBuffer, position);
+ IOUtils.readFully(fileChannel, byteBuffer, position);
byteBuffer.flip();
if (byteBuffer.limit() > 0) { // This indicates that there is data at this
position
int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
@@ -129,7 +130,7 @@ public class TagLogFile implements AutoCloseable {
// read one offset, then use filechannel's read to read it
byteBuffers.position(MAX_LENGTH * i);
byteBuffers.limit(MAX_LENGTH * (i + 1));
- fileChannel.read(byteBuffers, nextPosition);
+ IOUtils.readFully(fileChannel, byteBuffers, nextPosition);
byteBuffers.position(4 + i * Long.BYTES);
}
byteBuffers.limit(byteBuffers.capacity());
@@ -144,7 +145,10 @@ public class TagLogFile implements AutoCloseable {
blockOffset.add(position);
// Read the first block
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
- fileChannel.read(byteBuffer, position);
+ if (position == fileChannel.size()) {
+ return blockOffset;
+ }
+ IOUtils.readFully(fileChannel, byteBuffer, position);
byteBuffer.flip();
if (byteBuffer.limit() > 0) { // This indicates that there is data at this
position
int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
@@ -167,7 +171,7 @@ public class TagLogFile implements AutoCloseable {
// read
blockBuffer.position(MAX_LENGTH * i);
blockBuffer.limit(MAX_LENGTH * (i + 1));
- fileChannel.read(blockBuffer, blockOffset.get(i));
+ IOUtils.readFully(fileChannel, blockBuffer, blockOffset.get(i));
blockBuffer.position(4 + i * Long.BYTES);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
index e3d374551b1..8a6d2fbf890 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+import org.apache.iotdb.commons.utils.IOUtils;
+
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -63,7 +65,7 @@ public enum WALFileVersion {
continue;
}
ByteBuffer buffer = ByteBuffer.allocate(version.versionBytes.length);
- channel.read(buffer);
+ IOUtils.readFully(channel, buffer);
buffer.flip();
String versionString = new String(buffer.array(),
StandardCharsets.UTF_8);
if (version.versionString.equals(versionString)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 1827bfc9365..a56c145fe2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.utils.MmapUtil;
@@ -85,7 +86,8 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
if (version == WALFileVersion.V2) {
// New Version
ByteBuffer magicStringBuffer =
ByteBuffer.allocate(version.getVersionBytes().length);
- channel.read(magicStringBuffer, channel.size() -
version.getVersionBytes().length);
+ IOUtils.readFully(
+ channel, magicStringBuffer, channel.size() -
version.getVersionBytes().length);
magicStringBuffer.flip();
if
(logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX)
|| !new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
@@ -105,7 +107,8 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
}
// Old version
ByteBuffer magicStringBuffer =
ByteBuffer.allocate(version.getVersionBytes().length);
- channel.read(magicStringBuffer, channel.size() -
version.getVersionBytes().length);
+ IOUtils.readFully(
+ channel, magicStringBuffer, channel.size() -
version.getVersionBytes().length);
magicStringBuffer.flip();
if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
.equals(version.getVersionString())) {
@@ -117,7 +120,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
}
}
// Read the metadata size
- channel.read(metadataSizeBuf, position);
+ IOUtils.readFully(channel, metadataSizeBuf, position);
metadataSizeBuf.flip();
int metadataSize = metadataSizeBuf.getInt();
endOffset = channel.size() - version.getVersionBytes().length -
Integer.BYTES - metadataSize;
@@ -237,9 +240,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
compressedBuffer.clear();
// limit the buffer to prevent it from reading too much byte than
expected
compressedBuffer.limit(segmentInfo.dataInDiskSize);
- if (readWALBufferFromChannel(compressedBuffer) !=
segmentInfo.dataInDiskSize) {
- throw new IOException("Unexpected end of file");
- }
+ readWALBufferFullyFromChannel(compressedBuffer);
compressedBuffer.flip();
IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor);
@@ -255,9 +256,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
// limit the buffer to prevent it from reading too much byte than
expected
dataBuffer.limit(segmentInfo.dataInDiskSize);
- if (readWALBufferFromChannel(dataBuffer) != segmentInfo.dataInDiskSize) {
- throw new IOException("Unexpected end of file");
- }
+ readWALBufferFullyFromChannel(dataBuffer);
}
dataBuffer.flip();
}
@@ -301,7 +300,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
compressedBuffer =
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
- readWALBufferFromChannel(compressedBuffer);
+ readWALBufferFullyFromChannel(compressedBuffer);
compressedBuffer.flip();
IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
@@ -310,7 +309,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
compressedBuffer = null;
} else {
dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
- readWALBufferFromChannel(dataBuffer);
+ readWALBufferFullyFromChannel(dataBuffer);
dataBuffer.flip();
}
@@ -349,7 +348,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
private SegmentInfo getNextSegmentInfo() throws IOException {
segmentHeaderWithoutCompressedSizeBuffer.clear();
- channel.read(segmentHeaderWithoutCompressedSizeBuffer);
+ readWALBufferFullyFromChannel(segmentHeaderWithoutCompressedSizeBuffer);
segmentHeaderWithoutCompressedSizeBuffer.flip();
SegmentInfo info = new SegmentInfo();
info.compressionType =
@@ -357,7 +356,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
info.dataInDiskSize = segmentHeaderWithoutCompressedSizeBuffer.getInt();
if (info.compressionType != CompressionType.UNCOMPRESSED) {
compressedSizeBuffer.clear();
- readWALBufferFromChannel(compressedSizeBuffer);
+ readWALBufferFullyFromChannel(compressedSizeBuffer);
compressedSizeBuffer.flip();
info.uncompressedSize = compressedSizeBuffer.getInt();
} else {
@@ -373,6 +372,13 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
return size;
}
+ private void readWALBufferFullyFromChannel(ByteBuffer buffer) throws
IOException {
+ long startTime = System.nanoTime();
+ int size = buffer.remaining();
+ IOUtils.readFully(channel, buffer);
+ WritingMetrics.getInstance().recordWALRead(size, System.nanoTime() -
startTime);
+ }
+
private void uncompressWALBuffer(
ByteBuffer compressed, ByteBuffer uncompressed, IUnCompressor
unCompressor)
throws IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
index ba9211656ef..9c707c788a3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException;
import org.apache.iotdb.db.utils.SerializedSize;
@@ -143,12 +144,12 @@ public class WALMetaData implements SerializedSize {
ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
WALFileVersion version = WALFileVersion.getVersion(channel);
position = channel.size() - Integer.BYTES -
(version.getVersionBytes().length);
- channel.read(metadataSizeBuf, position);
+ IOUtils.readFully(channel, metadataSizeBuf, position);
metadataSizeBuf.flip();
// load metadata
int metadataSize = metadataSizeBuf.getInt();
ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize);
- channel.read(metadataBuf, position - metadataSize);
+ IOUtils.readFully(channel, metadataBuf, position - metadataSize);
metadataBuf.flip();
metaData = WALMetaData.deserialize(metadataBuf);
// versions before V1.3, should recover memTable ids from entries
@@ -157,8 +158,8 @@ public class WALMetaData implements SerializedSize {
for (int size : metaData.buffersSize) {
channel.position(offset);
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
- channel.read(buffer);
- buffer.clear();
+ IOUtils.readFully(channel, buffer);
+ buffer.flip();
metaData.memTablesId.add(buffer.getLong());
offset += size;
}
@@ -175,7 +176,8 @@ public class WALMetaData implements SerializedSize {
private static boolean isValidMagicString(FileChannel channel) throws
IOException {
ByteBuffer magicStringBytes =
ByteBuffer.allocate(WALFileVersion.V2.getVersionBytes().length);
- channel.read(magicStringBytes, channel.size() -
WALFileVersion.V2.getVersionBytes().length);
+ IOUtils.readFully(
+ channel, magicStringBytes, channel.size() -
WALFileVersion.V2.getVersionBytes().length);
magicStringBytes.flip();
String magicString = new String(magicStringBytes.array(),
StandardCharsets.UTF_8);
return magicString.equals(WALFileVersion.V2.getVersionString())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
index d1b27060c90..46598561a5b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.recover;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALFileVersion;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter;
@@ -65,7 +66,7 @@ public class WALRepairWriter {
}
try (FileChannel channel = FileChannel.open(logFile.toPath(),
StandardOpenOption.READ)) {
ByteBuffer magicStringBytes = ByteBuffer.allocate(size);
- channel.read(magicStringBytes, channel.size() - size);
+ IOUtils.readFully(channel, magicStringBytes, channel.size() - size);
magicStringBytes.flip();
return new String(magicStringBytes.array(), StandardCharsets.UTF_8);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index 8567d30660d..1395eaa420d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -44,6 +44,7 @@ import org.apache.tsfile.write.writer.TsFileIOWriter;
import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -304,9 +305,7 @@ public class AlignedChunkData implements ChunkData {
protected void deserializeTsFileDataByte(final InputStream stream) throws
IOException {
final int size = ReadWriteIOUtils.readInt(stream);
this.chunkData = new byte[size];
- if (size != stream.read(chunkData)) {
- throw new IOException("TsFileData byte array read error, size
mismatch.");
- }
+ new DataInputStream(stream).readFully(chunkData);
}
private void deserializeEntireChunk(final InputStream stream, final
TsFileIOWriter writer)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
index 5cc7f40cd79..3d94e1a4933 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
@@ -189,20 +189,13 @@ public class SubscriptionEventTsFileResponse extends
SubscriptionEventExtendable
PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(bufferSize);
final byte[] readBuffer = new byte[(int) bufferSize];
- final int readLength = reader.read(readBuffer);
- if (readLength != bufferSize) {
- memoryBlock.close();
- throw new SubscriptionException(
- String.format(
- "inconsistent read length (broken invariant), expected: %s,
actual: %s",
- bufferSize, readLength));
- }
+ reader.readFully(readBuffer);
// generate subscription poll response with piece payload
final CachedSubscriptionPollResponse response =
new CachedSubscriptionPollResponse(
SubscriptionPollResponseType.FILE_PIECE.getType(),
- new FilePiecePayload(tsFile.getName(), writingOffset +
readLength, readBuffer),
+ new FilePiecePayload(tsFile.getName(), writingOffset +
bufferSize, readBuffer),
commitContext);
// set fixed memory block for response
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
index 98ca07dcd03..e622194265a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.utils.sort;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -93,10 +94,11 @@ public class FileSpillerReader implements SortReader {
if (readLen == -1) {
return -1;
}
+ IOUtils.readFully(fileChannel, bytes);
bytes.flip();
int capacity = bytes.getInt();
ByteBuffer tsBlockBytes = ByteBuffer.allocate(capacity);
- fileChannel.read(tsBlockBytes);
+ IOUtils.readFully(fileChannel, tsBlockBytes);
tsBlockBytes.flip();
TsBlock cachedTsBlock = serde.deserialize(tsBlockBytes);
cacheBlocks.add(cachedTsBlock);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java
new file mode 100644
index 00000000000..b9f58311977
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.schemaengine.schemaregion.logfile;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class FakeCRC32DeserializerTest {
+
+ @Test
+ public void deserializeReadsCompletePayloadAfterShortRead() throws
IOException {
+ byte[] payload = new byte[] {1, 2, 3, 4};
+
+ byte[] deserialized =
+ new FakeCRC32Deserializer<>(new ByteBufferDeserializer())
+ .deserialize(new OneByteAtATimeInputStream(serialize(payload,
true)));
+
+ Assert.assertArrayEquals(payload, deserialized);
+ }
+
+ @Test
+ public void deserializeThrowsWhenPayloadIsTruncated() throws IOException {
+ byte[] bytes = serialize(new byte[] {1, 2}, false, false);
+
+ Assert.assertThrows(
+ EOFException.class,
+ () ->
+ new FakeCRC32Deserializer<>(new ByteBufferDeserializer())
+ .deserialize(new OneByteAtATimeInputStream(bytes)));
+ }
+
+ private static byte[] serialize(byte[] payload, boolean complete) throws
IOException {
+ return serialize(payload, complete, true);
+ }
+
+ private static byte[] serialize(byte[] payload, boolean complete, boolean
writeValidationCode)
+ throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (DataOutputStream dataOutputStream = new
DataOutputStream(outputStream)) {
+ dataOutputStream.writeInt(complete ? payload.length : payload.length +
1);
+ dataOutputStream.write(payload);
+ if (writeValidationCode) {
+ dataOutputStream.writeLong(0L);
+ }
+ }
+ return outputStream.toByteArray();
+ }
+
+ private static class ByteBufferDeserializer implements IDeserializer<byte[]>
{
+
+ @Override
+ public byte[] deserialize(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return bytes;
+ }
+ }
+
+ private static class OneByteAtATimeInputStream extends InputStream {
+
+ private final byte[] bytes;
+ private int index;
+
+ private OneByteAtATimeInputStream(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public int read() {
+ return index < bytes.length ? bytes[index++] & 0xFF : -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (len == 0) {
+ return 0;
+ }
+ if (index >= bytes.length) {
+ return -1;
+ }
+ b[off] = bytes[index++];
+ return 1;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java
new file mode 100644
index 00000000000..9f3688d76fa
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.schemaengine.schemaregion.tag;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.utils.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+
+public class TagLogFileTest {
+
+ private File tempDir;
+
+ @After
+ public void tearDown() throws Exception {
+ if (tempDir != null) {
+ FileUtils.deleteDirectory(tempDir);
+ }
+ }
+
+ @Test
+ public void writeAppendsFirstRecordWithoutReadingPastFileEnd() throws
Exception {
+ tempDir = Files.createTempDirectory("tag-log-file").toFile();
+ Map<String, String> tags = Collections.singletonMap("tag", "value");
+ Map<String, String> attributes = Collections.singletonMap("attr", "value");
+
+ try (TagLogFile tagLogFile = new TagLogFile(tempDir.getAbsolutePath(),
"tag.log")) {
+ long offset = tagLogFile.write(tags, attributes);
+ Pair<Map<String, String>, Map<String, String>> result =
tagLogFile.read(offset);
+
+ Assert.assertEquals(tags, result.left);
+ Assert.assertEquals(attributes, result.right);
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
index bb1b2c1ce5e..02139d2058d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
@@ -29,6 +29,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
+import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -279,13 +280,7 @@ public class TimeWindowStateProgressIndex extends
ProgressIndex {
continue;
}
final byte[] body = new byte[length];
- final int readLen = stream.read(body);
- if (readLen != length) {
- throw new IOException(
- String.format(
- "The intended read length is %s but %s is actually read when
deserializing TimeProgressIndex, ProgressIndex: %s",
- length, readLen, timeWindowStateProgressIndex));
- }
+ new DataInputStream(stream).readFully(body);
final ByteBuffer dstBuffer = ByteBuffer.wrap(body);
timeWindowStateProgressIndex.timeSeries2TimestampWindowBufferPairMap.put(
timeSeries, new Pair<>(timestamp, dstBuffer));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index fc739b58ea6..011a99e8a1d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.executable;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.trigger.exception.TriggerJarTooLargeException;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.commons.io.FileUtils;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
@@ -214,7 +215,7 @@ public class ExecutableManager {
String.format("Size of file exceed %d bytes", Integer.MAX_VALUE));
}
ByteBuffer byteBuffer = ByteBuffer.allocate((int) size);
- fileChannel.read(byteBuffer);
+ IOUtils.readFully(fileChannel, byteBuffer);
byteBuffer.flip();
return byteBuffer;
} catch (Exception e) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
index e2279ce06f4..7798acf28e0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.datastructure.queue.serializer;
import
org.apache.iotdb.commons.pipe.datastructure.queue.ConcurrentIterableLinkedQueue;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -68,7 +69,7 @@ public class PlainQueueSerializer<E> implements
QueueSerializer<E> {
}
int capacity = ReadWriteIOUtils.readInt(inputStream);
ByteBuffer buffer = ByteBuffer.allocate(capacity);
- channel.read(buffer);
+ IOUtils.readFully(channel, buffer);
buffer.flip();
E element = elementDeserializationFunction.apply(buffer);
if (element == null) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index 498baeda582..f2c54cd2c94 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -332,8 +333,8 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
outputStream.flush();
final byte[] response = new byte[1];
- final int size = socket.getInputStream().read(response);
- return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response);
+ new DataInputStream(socket.getInputStream()).readFully(response);
+ return Arrays.equals(AirGapOneByteResponse.OK, response);
}
protected boolean send(final AirGapSocket socket, final byte[] bytes) throws
IOException {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
index f96ae8443b9..047dd6bfea5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
@@ -28,10 +28,12 @@ import com.google.common.base.Supplier;
import org.apache.tsfile.utils.Pair;
import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -94,6 +96,26 @@ public class IOUtils {
outputStream.write(encodingBuffer.array(), 0, Integer.BYTES);
}
+ public static void readFully(FileChannel fileChannel, ByteBuffer buffer)
throws IOException {
+ while (buffer.hasRemaining()) {
+ if (fileChannel.read(buffer) <= 0) {
+ throw new EOFException();
+ }
+ }
+ }
+
+ public static void readFully(FileChannel fileChannel, ByteBuffer buffer,
long position)
+ throws IOException {
+ long currentPosition = position;
+ while (buffer.hasRemaining()) {
+ final int readBytes = fileChannel.read(buffer, currentPosition);
+ if (readBytes <= 0) {
+ throw new EOFException();
+ }
+ currentPosition += readBytes;
+ }
+ }
+
/**
* Read a string from the given stream.
*
@@ -120,7 +142,7 @@ public class IOUtils {
strBuffer = new byte[length];
}
- inputStream.read(strBuffer, 0, length);
+ inputStream.readFully(strBuffer, 0, length);
return new String(strBuffer, 0, length, encoding);
}
return null;
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java
new file mode 100644
index 00000000000..bf2cf546628
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class IOUtilsTest {
+
+ private static final String ENCODING = "UTF-8";
+
+ @Test
+ public void readStringReadsCompletePayloadAfterShortRead() throws
IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ IOUtils.writeString(outputStream, "abcdefg", ENCODING, null);
+
+ try (DataInputStream inputStream =
+ new DataInputStream(new
OneByteAtATimeInputStream(outputStream.toByteArray()))) {
+ Assert.assertEquals("abcdefg", IOUtils.readString(inputStream, ENCODING,
null));
+ }
+ }
+
+ @Test
+ public void readStringThrowsWhenPayloadIsTruncated() throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ IOUtils.writeInt(outputStream, 7, null);
+ outputStream.write(new byte[] {'a', 'b', 'c'});
+
+ try (DataInputStream inputStream =
+ new DataInputStream(new
OneByteAtATimeInputStream(outputStream.toByteArray()))) {
+ Assert.assertThrows(
+ EOFException.class, () -> IOUtils.readString(inputStream, ENCODING,
null));
+ }
+ }
+
+ @Test
+ public void readFullyReadsCompleteByteBufferAfterShortChannelRead() throws
IOException {
+ byte[] bytes = new byte[] {1, 2, 3};
+ FileChannel channel = mockOneByteAtATimeChannel(bytes);
+ ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+
+ IOUtils.readFully(channel, buffer);
+
+ Assert.assertArrayEquals(bytes, buffer.array());
+ }
+
+ @Test
+ public void
readFullyReadsCompleteByteBufferFromPositionAfterShortChannelRead()
+ throws IOException {
+ byte[] bytes = new byte[] {1, 2, 3, 4, 5};
+ FileChannel channel = mockOneByteAtATimeChannel(bytes);
+ ByteBuffer buffer = ByteBuffer.allocate(3);
+
+ IOUtils.readFully(channel, buffer, 2);
+
+ Assert.assertArrayEquals(new byte[] {3, 4, 5}, buffer.array());
+ }
+
+ @Test
+ public void readFullyThrowsWhenChannelIsTruncated() throws IOException {
+ FileChannel channel = mockOneByteAtATimeChannel(new byte[] {1, 2});
+ ByteBuffer buffer = ByteBuffer.allocate(3);
+
+ Assert.assertThrows(EOFException.class, () -> IOUtils.readFully(channel,
buffer));
+ }
+
+ private static FileChannel mockOneByteAtATimeChannel(byte[] bytes) throws
IOException {
+ FileChannel channel = Mockito.mock(FileChannel.class);
+ AtomicInteger index = new AtomicInteger();
+ Mockito.when(channel.read(Mockito.any(ByteBuffer.class)))
+ .thenAnswer(
+ invocation -> {
+ ByteBuffer buffer = invocation.getArgument(0);
+ int currentIndex = index.getAndIncrement();
+ if (currentIndex >= bytes.length) {
+ return -1;
+ }
+ buffer.put(bytes[currentIndex]);
+ return 1;
+ });
+ Mockito.when(channel.read(Mockito.any(ByteBuffer.class),
Mockito.anyLong()))
+ .thenAnswer(
+ invocation -> {
+ ByteBuffer buffer = invocation.getArgument(0);
+ long position = invocation.getArgument(1);
+ if (position >= bytes.length) {
+ return -1;
+ }
+ buffer.put(bytes[(int) position]);
+ return 1;
+ });
+ return channel;
+ }
+
+ private static class OneByteAtATimeInputStream extends InputStream {
+
+ private final byte[] bytes;
+ private int index;
+
+ private OneByteAtATimeInputStream(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public int read() {
+ return index < bytes.length ? bytes[index++] & 0xFF : -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (len == 0) {
+ return 0;
+ }
+ if (index >= bytes.length) {
+ return -1;
+ }
+ b[off] = bytes[index++];
+ return 1;
+ }
+ }
+}