This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aa2acd6b4f5 Fix short reads in fixed-length deserialization (#17870)
aa2acd6b4f5 is described below
commit aa2acd6b4f550f1138348b5d04283b18521b63d9
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 23 10:14:32 2026 +0800
Fix short reads in fixed-length deserialization (#17870)
* Fix short reads in fixed-length deserialization
* Fix tag log append EOF handling
* spotless
---------
Co-authored-by: Jiang Tian <[email protected]>
---
.../datastructure/SerializableList.java | 3 +-
.../iotdb/calc/utils/sort/FileSpillerReader.java | 4 +-
.../externalservice/ExternalServiceInfo.java | 3 +-
.../confignode/persistence/ProcedureInfo.java | 13 +-
.../writelog/io/SingleFileLogReader.java | 5 +-
.../consensus/deletion/recover/DeletionReader.java | 5 +-
.../plan/planner/plan/node/write/ObjectNode.java | 4 +-
.../logfile/FakeCRC32Deserializer.java | 5 +-
.../mtree/impl/pbtree/schemafile/SchemaFile.java | 3 +-
.../pbtree/schemafile/log/SchemaFileLogReader.java | 7 +-
.../pbtree/schemafile/pagemgr/PageIOChannel.java | 3 +-
.../schemaengine/schemaregion/tag/TagLogFile.java | 12 +-
.../dataregion/modification/IDPredicate.java | 7 +-
.../dataregion/wal/io/WALFileVersion.java | 4 +-
.../dataregion/wal/io/WALInputStream.java | 32 +++--
.../dataregion/wal/io/WALMetaData.java | 12 +-
.../dataregion/wal/recover/WALRepairWriter.java | 3 +-
.../load/splitter/AlignedChunkData.java | 6 +-
.../response/SubscriptionEventTsFileResponse.java | 11 +-
.../iotdb/db/utils/DataNodeObjectFileService.java | 3 +-
.../logfile/FakeCRC32DeserializerTest.java | 108 +++++++++++++++
.../schemaregion/tag/TagLogFileTest.java | 57 ++++++++
.../modification/TableDeletionEntryTest.java | 9 ++
.../index/impl/TimeWindowStateProgressIndex.java | 9 +-
.../commons/executable/ExecutableManager.java | 3 +-
.../queue/serializer/PlainQueueSerializer.java | 3 +-
.../pipe/sink/protocol/IoTDBAirGapSink.java | 5 +-
.../schema/table/column/TsTableColumnCategory.java | 7 +-
.../org/apache/iotdb/commons/utils/IOUtils.java | 24 +++-
.../apache/iotdb/commons/utils/IOUtilsTest.java | 147 +++++++++++++++++++++
30 files changed, 444 insertions(+), 73 deletions(-)
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/datastructure/SerializableList.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/datastructure/SerializableList.java
index ee70df305ed..fb1b86ad8bb 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/datastructure/SerializableList.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/datastructure/SerializableList.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.calc.transformation.datastructure;
import org.apache.iotdb.calc.service.AbstractTemporaryQueryDataFileService;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.tsfile.utils.PublicBAOS;
@@ -60,7 +61,7 @@ public interface SerializableList {
}
init();
ByteBuffer byteBuffer =
ByteBuffer.allocate(recorder.getSerializedByteLength());
- recorder.getFileChannel().read(byteBuffer);
+ IOUtils.readFully(recorder.getFileChannel(), byteBuffer);
byteBuffer.flip();
deserialize(byteBuffer);
recorder.closeFile();
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/sort/FileSpillerReader.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/sort/FileSpillerReader.java
index af7d726d5ff..05fc939ede7 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/sort/FileSpillerReader.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/utils/sort/FileSpillerReader.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.calc.utils.sort;
import org.apache.iotdb.calc.utils.datastructure.MergeSortKey;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.common.conf.TSFileDescriptor;
@@ -93,10 +94,11 @@ public class FileSpillerReader implements SortReader {
if (readLen == -1) {
return -1;
}
+ IOUtils.readFully(fileChannel, bytes);
bytes.flip();
int capacity = bytes.getInt();
ByteBuffer tsBlockBytes = ByteBuffer.allocate(capacity);
- fileChannel.read(tsBlockBytes);
+ IOUtils.readFully(fileChannel, tsBlockBytes);
tsBlockBytes.flip();
TsBlock cachedTsBlock = serde.deserialize(tsBlockBytes);
cacheBlocks.add(cachedTsBlock);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceInfo.java
index a3cf89515b5..823325514a9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceInfo.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -239,7 +240,7 @@ public class ExternalServiceInfo implements
SnapshotProcessor {
throws IOException {
int length = ReadWriteIOUtils.readInt(inputStream);
byte[] bytes = new byte[length];
- inputStream.read(bytes);
+ new DataInputStream(inputStream).readFully(bytes);
crc32.reset();
crc32.update(bytes, 0, length);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
index 63855d9646d..f95575fef29 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
@@ -165,8 +166,16 @@ public class ProcedureInfo implements SnapshotProcessor {
try (FileInputStream fis = new
FileInputStream(procedureFilePath.toFile())) {
Procedure procedure = null;
try (FileChannel channel = fis.getChannel()) {
- ByteBuffer byteBuffer =
ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE);
- if (channel.read(byteBuffer) > 0) {
+ final long fileSize = channel.size();
+ if (fileSize > PROCEDURE_LOAD_BUFFER_SIZE) {
+ throw new IOException(
+ String.format(
+ "Procedure file %s exceeds the load buffer limit %s, actual
size %s",
+ procedureFilePath, PROCEDURE_LOAD_BUFFER_SIZE, fileSize));
+ }
+ ByteBuffer byteBuffer = ByteBuffer.allocate((int) fileSize);
+ if (fileSize > 0) {
+ IOUtils.readFully(channel, byteBuffer);
byteBuffer.flip();
procedure = ProcedureFactory.getInstance().create(byteBuffer);
byteBuffer.clear();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
index b46057b8330..fe123372bec 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
@@ -76,10 +76,7 @@ public class SingleFileLogReader implements ILogReader {
}
buffer = new byte[logSize];
- int readLen = logStream.read(buffer, 0, logSize);
- if (readLen < logSize) {
- throw new IOException(ConfigNodeMessages.REACH_EOF);
- }
+ logStream.readFully(buffer, 0, logSize);
final long checkSum = logStream.readLong();
checkSummer.reset();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
index 0bd6f48d51f..8477e1e8059 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.consensus.deletion.recover;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
@@ -60,7 +61,7 @@ public class DeletionReader implements Closeable {
try {
// Read magic string
ByteBuffer magicStringBuffer =
ByteBuffer.allocate(MAGIC_STRING_BYTES_SIZE);
- fileChannel.read(magicStringBuffer);
+ IOUtils.readFully(fileChannel, magicStringBuffer);
magicStringBuffer.flip();
String magicVersion = new String(magicStringBuffer.array(),
StandardCharsets.UTF_8);
if (LOGGER.isDebugEnabled()) {
@@ -70,7 +71,7 @@ public class DeletionReader implements Closeable {
// Read deletions
long remainingBytes = fileChannel.size() - fileChannel.position();
ByteBuffer byteBuffer = ByteBuffer.allocate((int) remainingBytes);
- fileChannel.read(byteBuffer);
+ IOUtils.readFully(fileChannel, byteBuffer);
byteBuffer.flip();
List<DeletionResource> deletions = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
index 552d26e5631..cf80d63a8d5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
@@ -158,7 +158,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
if (objectFile.isPresent()) {
try (RandomAccessFile raf = new RandomAccessFile(objectFile.get(), "r"))
{
raf.seek(offset);
- raf.read(contents);
+ raf.readFully(contents);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -308,7 +308,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
private void readContentFromFile(File file, byte[] contents) throws
IOException {
try (RandomAccessFile raf = new RandomAccessFile(file, "r")) {
raf.seek(offset);
- raf.read(contents);
+ raf.readFully(contents);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
index 8570a651e05..4bf0f059b52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
@@ -30,7 +30,6 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -70,9 +69,7 @@ public class FakeCRC32Deserializer<T> implements
IDeserializer<T> {
}
byte[] logBuffer = new byte[logLength];
- if (logLength < inputStream.read(logBuffer, 0, logLength)) {
- throw new EOFException();
- }
+ dataInputStream.readFully(logBuffer, 0, logLength);
T result = deserializer.deserialize(ByteBuffer.wrap(logBuffer));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index f9f5a63a44a..665bafb956f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -360,7 +361,7 @@ public class SchemaFile implements ISchemaFile {
lastSGAddr = 0L;
pageManager = new BTreePageManager(channel, pmtFile, -1, logPath);
} else {
- channel.read(headerContent);
+ IOUtils.readFully(channel, headerContent);
headerContent.clear();
lastPageIndex = ReadWriteIOUtils.readInt(headerContent);
dataTTL = ReadWriteIOUtils.readLong(headerContent);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
index 1840d867b36..d2300384fa7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
@@ -27,6 +27,8 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafil
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -91,8 +93,9 @@ public class SchemaFileLogReader {
}
}
- // corrupted within one entry
- if (inputStream.read(tempBytes, 1, tempBytes.length - 1) <
tempBytes.length - 2) {
+ try {
+ new DataInputStream(inputStream).readFully(tempBytes, 1,
tempBytes.length - 1);
+ } catch (EOFException e) {
throw new SchemaFileLogCorruptedException(
logFile.getAbsolutePath(),
DataNodeSchemaMessages.SCHEMA_FILE_LOG_INCOMPLETE_ENTRY);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
index 044b7339a82..4e9d6524f19 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
@@ -19,6 +19,7 @@
package
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.utils.IOUtils;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogReader;
@@ -107,7 +108,7 @@ public class PageIOChannel {
if (!readChannel.isOpen()) {
readChannel = FileChannel.open(pmtFile.toPath(),
StandardOpenOption.READ);
}
- readChannel.read(dst, getPageAddress(pageIndex));
+ IOUtils.readFully(readChannel, dst, getPageAddress(pageIndex));
}
// region Flush Strategy
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
index 6ffed68535a..0ed0ba9afda 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.schemaengine.schemaregion.tag;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.DataNodeSchemaMessages;
@@ -116,7 +117,7 @@ public class TagLogFile implements AutoCloseable {
throws IOException {
// Read the first block
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
- fileChannel.read(byteBuffer, position);
+ IOUtils.readFully(fileChannel, byteBuffer, position);
byteBuffer.flip();
if (byteBuffer.limit() > 0) { // This indicates that there is data at this
position
int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
@@ -131,7 +132,7 @@ public class TagLogFile implements AutoCloseable {
// read one offset, then use filechannel's read to read it
byteBuffers.position(MAX_LENGTH * i);
byteBuffers.limit(MAX_LENGTH * (i + 1));
- fileChannel.read(byteBuffers, nextPosition);
+ IOUtils.readFully(fileChannel, byteBuffers, nextPosition);
byteBuffers.position(4 + i * Long.BYTES);
}
byteBuffers.limit(byteBuffers.capacity());
@@ -146,7 +147,10 @@ public class TagLogFile implements AutoCloseable {
blockOffset.add(position);
// Read the first block
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
- fileChannel.read(byteBuffer, position);
+ if (position == fileChannel.size()) {
+ return blockOffset;
+ }
+ IOUtils.readFully(fileChannel, byteBuffer, position);
byteBuffer.flip();
if (byteBuffer.limit() > 0) { // This indicates that there is data at this
position
int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
@@ -169,7 +173,7 @@ public class TagLogFile implements AutoCloseable {
// read
blockBuffer.position(MAX_LENGTH * i);
blockBuffer.limit(MAX_LENGTH * (i + 1));
- fileChannel.read(blockBuffer, blockOffset.get(i));
+ IOUtils.readFully(fileChannel, blockBuffer, blockOffset.get(i));
blockBuffer.position(4 + i * Long.BYTES);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java
index 0d657dab9ed..487689d83b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java
@@ -29,6 +29,7 @@ import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -63,7 +64,11 @@ public abstract class IDPredicate implements
StreamSerializable, BufferSerializa
}
public static IDPredicateType deserialize(InputStream stream) throws
IOException {
- return values()[stream.read()];
+ int typeNum = stream.read();
+ if (typeNum == -1) {
+ throw new EOFException();
+ }
+ return values()[typeNum];
}
public static IDPredicateType deserialize(ByteBuffer buffer) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
index fc09c34b650..32cdc535ba6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+import org.apache.iotdb.commons.utils.IOUtils;
+
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -64,7 +66,7 @@ public enum WALFileVersion {
continue;
}
ByteBuffer buffer = ByteBuffer.allocate(version.versionBytes.length);
- channel.read(buffer);
+ IOUtils.readFully(channel, buffer);
buffer.flip();
String versionString = new String(buffer.array(),
StandardCharsets.UTF_8);
if (version.versionString.equals(versionString)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index c8d18ed773c..6932b6df381 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.i18n.StorageEngineMessages;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.utils.MmapUtil;
@@ -86,7 +87,8 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
if (version == WALFileVersion.V2 || version == WALFileVersion.V3) {
// New Version
ByteBuffer magicStringBuffer =
ByteBuffer.allocate(version.getVersionBytes().length);
- channel.read(magicStringBuffer, channel.size() -
version.getVersionBytes().length);
+ IOUtils.readFully(
+ channel, magicStringBuffer, channel.size() -
version.getVersionBytes().length);
magicStringBuffer.flip();
if
(logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX)
|| !new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
@@ -106,7 +108,8 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
}
// Old version
ByteBuffer magicStringBuffer =
ByteBuffer.allocate(version.getVersionBytes().length);
- channel.read(magicStringBuffer, channel.size() -
version.getVersionBytes().length);
+ IOUtils.readFully(
+ channel, magicStringBuffer, channel.size() -
version.getVersionBytes().length);
magicStringBuffer.flip();
if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
.equals(version.getVersionString())) {
@@ -118,7 +121,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
}
}
// Read the metadata size
- channel.read(metadataSizeBuf, position);
+ IOUtils.readFully(channel, metadataSizeBuf, position);
metadataSizeBuf.flip();
int metadataSize = metadataSizeBuf.getInt();
endOffset = channel.size() - version.getVersionBytes().length -
Integer.BYTES - metadataSize;
@@ -240,9 +243,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
compressedBuffer.clear();
// limit the buffer to prevent it from reading too much byte than
expected
compressedBuffer.limit(segmentInfo.dataInDiskSize);
- if (readWALBufferFromChannel(compressedBuffer) !=
segmentInfo.dataInDiskSize) {
- throw new IOException(StorageEngineMessages.UNEXPECTED_END_OF_FILE);
- }
+ readWALBufferFullyFromChannel(compressedBuffer);
compressedBuffer.flip();
IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor);
@@ -258,9 +259,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
// limit the buffer to prevent it from reading too much byte than
expected
dataBuffer.limit(segmentInfo.dataInDiskSize);
- if (readWALBufferFromChannel(dataBuffer) !=
segmentInfo.dataInDiskSize) {
- throw new IOException(StorageEngineMessages.UNEXPECTED_END_OF_FILE);
- }
+ readWALBufferFullyFromChannel(dataBuffer);
}
} catch (Exception e) {
logger.error(
@@ -313,7 +312,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
compressedBuffer =
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
- readWALBufferFromChannel(compressedBuffer);
+ readWALBufferFullyFromChannel(compressedBuffer);
compressedBuffer.flip();
IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
@@ -322,7 +321,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
compressedBuffer = null;
} else {
dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
- readWALBufferFromChannel(dataBuffer);
+ readWALBufferFullyFromChannel(dataBuffer);
dataBuffer.flip();
}
@@ -361,7 +360,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
private SegmentInfo getNextSegmentInfo() throws IOException {
segmentHeaderWithoutCompressedSizeBuffer.clear();
- channel.read(segmentHeaderWithoutCompressedSizeBuffer);
+ readWALBufferFullyFromChannel(segmentHeaderWithoutCompressedSizeBuffer);
segmentHeaderWithoutCompressedSizeBuffer.flip();
SegmentInfo info = new SegmentInfo();
info.compressionType =
@@ -369,7 +368,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
info.dataInDiskSize = segmentHeaderWithoutCompressedSizeBuffer.getInt();
if (info.compressionType != CompressionType.UNCOMPRESSED) {
compressedSizeBuffer.clear();
- readWALBufferFromChannel(compressedSizeBuffer);
+ readWALBufferFullyFromChannel(compressedSizeBuffer);
compressedSizeBuffer.flip();
info.uncompressedSize = compressedSizeBuffer.getInt();
} else {
@@ -385,6 +384,13 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
return size;
}
+ private void readWALBufferFullyFromChannel(ByteBuffer buffer) throws
IOException {
+ long startTime = System.nanoTime();
+ int size = buffer.remaining();
+ IOUtils.readFully(channel, buffer);
+ WritingMetrics.getInstance().recordWALRead(size, System.nanoTime() -
startTime);
+ }
+
private void uncompressWALBuffer(
ByteBuffer compressed, ByteBuffer uncompressed, IUnCompressor
unCompressor)
throws IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
index afd4d026644..3c4f08d3153 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.db.i18n.StorageEngineMessages;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException;
@@ -360,12 +361,12 @@ public class WALMetaData implements SerializedSize {
ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
WALFileVersion version = WALFileVersion.getVersion(channel);
position = channel.size() - Integer.BYTES -
(version.getVersionBytes().length);
- channel.read(metadataSizeBuf, position);
+ IOUtils.readFully(channel, metadataSizeBuf, position);
metadataSizeBuf.flip();
// load metadata
int metadataSize = metadataSizeBuf.getInt();
ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize);
- channel.read(metadataBuf, position - metadataSize);
+ IOUtils.readFully(channel, metadataBuf, position - metadataSize);
metadataBuf.flip();
metaData = WALMetaData.deserialize(metadataBuf, version);
// versions before V1.3, should recover memTable ids from entries
@@ -374,8 +375,8 @@ public class WALMetaData implements SerializedSize {
for (int size : metaData.buffersSize) {
channel.position(offset);
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
- channel.read(buffer);
- buffer.clear();
+ IOUtils.readFully(channel, buffer);
+ buffer.flip();
metaData.memTablesId.add(buffer.getLong());
offset += size;
}
@@ -399,7 +400,8 @@ public class WALMetaData implements SerializedSize {
return false;
}
ByteBuffer magicStringBytes = ByteBuffer.allocate(maxMagicLen);
- channel.read(magicStringBytes, channel.size() - maxMagicLen);
+ IOUtils.readFully(channel, magicStringBytes, channel.size() - maxMagicLen);
+
magicStringBytes.flip();
String magicString = new String(magicStringBytes.array(),
StandardCharsets.UTF_8);
return magicString.contains(WALFileVersion.V3.getVersionString())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
index d1b27060c90..46598561a5b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.recover;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALFileVersion;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter;
@@ -65,7 +66,7 @@ public class WALRepairWriter {
}
try (FileChannel channel = FileChannel.open(logFile.toPath(),
StandardOpenOption.READ)) {
ByteBuffer magicStringBytes = ByteBuffer.allocate(size);
- channel.read(magicStringBytes, channel.size() - size);
+ IOUtils.readFully(channel, magicStringBytes, channel.size() - size);
magicStringBytes.flip();
return new String(magicStringBytes.array(), StandardCharsets.UTF_8);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index d852ec3c9b8..e624e06e9c0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.storageengine.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
-import org.apache.iotdb.db.i18n.StorageEngineMessages;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.tsfile.enums.TSDataType;
@@ -46,6 +45,7 @@ import org.apache.tsfile.write.writer.TsFileIOWriter;
import javax.annotation.Nonnull;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -313,9 +313,7 @@ public class AlignedChunkData implements ChunkData {
this.chunkData = ((LoadTsFilePieceNode.ByteBufferInputStream)
stream).read(size);
} else {
byte[] data = new byte[size];
- if (size != stream.read(data)) {
- throw new
IOException(StorageEngineMessages.TSFILE_DATA_BYTE_ARRAY_SIZE_MISMATCH);
- }
+ new DataInputStream(stream).readFully(data);
this.chunkData = ByteBuffer.wrap(data);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
index ee8ef1fdb89..90fbd117905 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
@@ -194,20 +194,13 @@ public class SubscriptionEventTsFileResponse extends
SubscriptionEventExtendable
PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(bufferSize);
final byte[] readBuffer = new byte[(int) bufferSize];
- final int readLength = reader.read(readBuffer);
- if (readLength != bufferSize) {
- memoryBlock.close();
- throw new SubscriptionException(
- String.format(
- "inconsistent read length (broken invariant), expected: %s,
actual: %s",
- bufferSize, readLength));
- }
+ reader.readFully(readBuffer);
// generate subscription poll response with piece payload
final CachedSubscriptionPollResponse response =
new CachedSubscriptionPollResponse(
SubscriptionPollResponseType.FILE_PIECE.getType(),
- new FilePiecePayload(tsFile.getName(), writingOffset +
readLength, readBuffer),
+ new FilePiecePayload(tsFile.getName(), writingOffset +
bufferSize, readBuffer),
commitContext);
// set fixed memory block for response
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeObjectFileService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeObjectFileService.java
index 8de64f9de31..2036885c385 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeObjectFileService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeObjectFileService.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.calc.utils.IObjectFileService;
import org.apache.iotdb.calc.utils.ObjectTypeUtils;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.ObjectFileNotExist;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.i18n.DataNodeMiscMessages;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
@@ -101,7 +102,7 @@ public class DataNodeObjectFileService implements
IObjectFileService {
byte[] bytes = new byte[(int) readSize];
ByteBuffer buffer = ByteBuffer.wrap(bytes);
try (FileChannel fileChannel = FileChannel.open(file.toPath(),
StandardOpenOption.READ)) {
- fileChannel.read(buffer, offset);
+ IOUtils.readFully(fileChannel, buffer, offset);
} catch (IOException e) {
throw new IoTDBRuntimeException(e,
TSStatusCode.OBJECT_READ_ERROR.getStatusCode());
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java
new file mode 100644
index 00000000000..b9f58311977
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.schemaengine.schemaregion.logfile;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class FakeCRC32DeserializerTest {
+
+ @Test
+ public void deserializeReadsCompletePayloadAfterShortRead() throws
IOException {
+ byte[] payload = new byte[] {1, 2, 3, 4};
+
+ byte[] deserialized =
+ new FakeCRC32Deserializer<>(new ByteBufferDeserializer())
+ .deserialize(new OneByteAtATimeInputStream(serialize(payload,
true)));
+
+ Assert.assertArrayEquals(payload, deserialized);
+ }
+
+ @Test
+ public void deserializeThrowsWhenPayloadIsTruncated() throws IOException {
+ byte[] bytes = serialize(new byte[] {1, 2}, false, false);
+
+ Assert.assertThrows(
+ EOFException.class,
+ () ->
+ new FakeCRC32Deserializer<>(new ByteBufferDeserializer())
+ .deserialize(new OneByteAtATimeInputStream(bytes)));
+ }
+
+ private static byte[] serialize(byte[] payload, boolean complete) throws
IOException {
+ return serialize(payload, complete, true);
+ }
+
+ private static byte[] serialize(byte[] payload, boolean complete, boolean
writeValidationCode)
+ throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (DataOutputStream dataOutputStream = new
DataOutputStream(outputStream)) {
+ dataOutputStream.writeInt(complete ? payload.length : payload.length +
1);
+ dataOutputStream.write(payload);
+ if (writeValidationCode) {
+ dataOutputStream.writeLong(0L);
+ }
+ }
+ return outputStream.toByteArray();
+ }
+
+ private static class ByteBufferDeserializer implements IDeserializer<byte[]>
{
+
+ @Override
+ public byte[] deserialize(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return bytes;
+ }
+ }
+
+ private static class OneByteAtATimeInputStream extends InputStream {
+
+ private final byte[] bytes;
+ private int index;
+
+ private OneByteAtATimeInputStream(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public int read() {
+ return index < bytes.length ? bytes[index++] & 0xFF : -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (len == 0) {
+ return 0;
+ }
+ if (index >= bytes.length) {
+ return -1;
+ }
+ b[off] = bytes[index++];
+ return 1;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java
new file mode 100644
index 00000000000..555594cc207
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.schemaengine.schemaregion.tag;
+
+import org.apache.tsfile.external.commons.io.FileUtils;
+import org.apache.tsfile.utils.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+
+public class TagLogFileTest {
+
+ private File tempDir;
+
+ @After
+ public void tearDown() throws Exception {
+ if (tempDir != null) {
+ FileUtils.deleteDirectory(tempDir);
+ }
+ }
+
+ @Test
+ public void writeAppendsFirstRecordWithoutReadingPastFileEnd() throws
Exception {
+ tempDir = Files.createTempDirectory("tag-log-file").toFile();
+ Map<String, String> tags = Collections.singletonMap("tag", "value");
+ Map<String, String> attributes = Collections.singletonMap("attr", "value");
+
+ try (TagLogFile tagLogFile = new TagLogFile(tempDir.getAbsolutePath(),
"tag.log")) {
+ long offset = tagLogFile.write(tags, attributes);
+ Pair<Map<String, String>, Map<String, String>> result =
tagLogFile.read(offset);
+
+ Assert.assertEquals(tags, result.left);
+ Assert.assertEquals(attributes, result.right);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java
index a5d308a2b16..f4e90957945 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -37,6 +38,7 @@ import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class TableDeletionEntryTest {
@@ -63,6 +65,13 @@ public class TableDeletionEntryTest {
assertEquals(entry, deserialized2);
}
+ @Test
+ public void testDeserializePredicateTypeFromEmptyStream() {
+ assertThrows(
+ EOFException.class,
+ () -> IDPredicate.IDPredicateType.deserialize(new
ByteArrayInputStream(new byte[0])));
+ }
+
@Test
public void testAffectDevice() {
TableDeletionEntry entry1 =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
index bb1b2c1ce5e..02139d2058d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
@@ -29,6 +29,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
+import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -279,13 +280,7 @@ public class TimeWindowStateProgressIndex extends
ProgressIndex {
continue;
}
final byte[] body = new byte[length];
- final int readLen = stream.read(body);
- if (readLen != length) {
- throw new IOException(
- String.format(
- "The intended read length is %s but %s is actually read when
deserializing TimeProgressIndex, ProgressIndex: %s",
- length, readLen, timeWindowStateProgressIndex));
- }
+ new DataInputStream(stream).readFully(body);
final ByteBuffer dstBuffer = ByteBuffer.wrap(body);
timeWindowStateProgressIndex.timeSeries2TimestampWindowBufferPairMap.put(
timeSeries, new Pair<>(timestamp, dstBuffer));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 2c75928fee2..7d42bed3f8e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.executable;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.trigger.exception.TriggerJarTooLargeException;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.tsfile.external.commons.io.FileUtils;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
@@ -199,7 +200,7 @@ public class ExecutableManager {
String.format("Size of file exceed %d bytes", Integer.MAX_VALUE));
}
ByteBuffer byteBuffer = ByteBuffer.allocate((int) size);
- fileChannel.read(byteBuffer);
+ IOUtils.readFully(fileChannel, byteBuffer);
byteBuffer.flip();
return byteBuffer;
} catch (Exception e) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
index d944c83f77d..10087878d93 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.commons.pipe.datastructure.queue.serializer;
import org.apache.iotdb.commons.i18n.PipeMessages;
import
org.apache.iotdb.commons.pipe.datastructure.queue.ConcurrentIterableLinkedQueue;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -69,7 +70,7 @@ public class PlainQueueSerializer<E> implements
QueueSerializer<E> {
}
int capacity = ReadWriteIOUtils.readInt(inputStream);
ByteBuffer buffer = ByteBuffer.allocate(capacity);
- channel.read(buffer);
+ IOUtils.readFully(channel, buffer);
buffer.flip();
E element = elementDeserializationFunction.apply(buffer);
if (element == null) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index 6d74102ff19..4543ebaee9b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -324,8 +325,8 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
outputStream.flush();
final byte[] response = new byte[1];
- final int size = socket.getInputStream().read(response);
- return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response);
+ new DataInputStream(socket.getInputStream()).readFully(response);
+ return Arrays.equals(AirGapOneByteResponse.OK, response);
}
protected boolean send(final AirGapSocket socket, final byte[] bytes) throws
IOException {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
index 9c6a5e3faee..a104ba9d532 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.i18n.SchemaMessages;
import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -54,7 +55,11 @@ public enum TsTableColumnCategory {
}
public static TsTableColumnCategory deserialize(final InputStream stream)
throws IOException {
- return deserialize((byte) stream.read());
+ final int category = stream.read();
+ if (category < 0) {
+ throw new EOFException();
+ }
+ return deserialize((byte) category);
}
public static TsTableColumnCategory deserialize(final ByteBuffer stream) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
index bb314597565..88c929e7092 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
@@ -29,11 +29,13 @@ import com.google.common.base.Supplier;
import com.google.common.util.concurrent.RateLimiter;
import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
@@ -125,6 +127,26 @@ public class IOUtils {
outputStream.write(encodingBuffer.array(), 0, Long.BYTES);
}
+ public static void readFully(FileChannel fileChannel, ByteBuffer buffer)
throws IOException {
+ while (buffer.hasRemaining()) {
+ if (fileChannel.read(buffer) <= 0) {
+ throw new EOFException();
+ }
+ }
+ }
+
+ public static void readFully(FileChannel fileChannel, ByteBuffer buffer,
long position)
+ throws IOException {
+ long currentPosition = position;
+ while (buffer.hasRemaining()) {
+ final int readBytes = fileChannel.read(buffer, currentPosition);
+ if (readBytes <= 0) {
+ throw new EOFException();
+ }
+ currentPosition += readBytes;
+ }
+ }
+
/**
* Read a string from the given stream.
*
@@ -157,7 +179,7 @@ public class IOUtils {
strBuffer = new byte[length];
}
- inputStream.read(strBuffer, 0, length);
+ inputStream.readFully(strBuffer, 0, length);
return new String(strBuffer, 0, length, encoding);
}
return null;
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java
new file mode 100644
index 00000000000..bf2cf546628
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class IOUtilsTest {
+
+ private static final String ENCODING = "UTF-8";
+
+ @Test
+ public void readStringReadsCompletePayloadAfterShortRead() throws
IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ IOUtils.writeString(outputStream, "abcdefg", ENCODING, null);
+
+ try (DataInputStream inputStream =
+ new DataInputStream(new
OneByteAtATimeInputStream(outputStream.toByteArray()))) {
+ Assert.assertEquals("abcdefg", IOUtils.readString(inputStream, ENCODING,
null));
+ }
+ }
+
+ @Test
+ public void readStringThrowsWhenPayloadIsTruncated() throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ IOUtils.writeInt(outputStream, 7, null);
+ outputStream.write(new byte[] {'a', 'b', 'c'});
+
+ try (DataInputStream inputStream =
+ new DataInputStream(new
OneByteAtATimeInputStream(outputStream.toByteArray()))) {
+ Assert.assertThrows(
+ EOFException.class, () -> IOUtils.readString(inputStream, ENCODING,
null));
+ }
+ }
+
+ @Test
+ public void readFullyReadsCompleteByteBufferAfterShortChannelRead() throws
IOException {
+ byte[] bytes = new byte[] {1, 2, 3};
+ FileChannel channel = mockOneByteAtATimeChannel(bytes);
+ ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+
+ IOUtils.readFully(channel, buffer);
+
+ Assert.assertArrayEquals(bytes, buffer.array());
+ }
+
+ @Test
+ public void
readFullyReadsCompleteByteBufferFromPositionAfterShortChannelRead()
+ throws IOException {
+ byte[] bytes = new byte[] {1, 2, 3, 4, 5};
+ FileChannel channel = mockOneByteAtATimeChannel(bytes);
+ ByteBuffer buffer = ByteBuffer.allocate(3);
+
+ IOUtils.readFully(channel, buffer, 2);
+
+ Assert.assertArrayEquals(new byte[] {3, 4, 5}, buffer.array());
+ }
+
+ @Test
+ public void readFullyThrowsWhenChannelIsTruncated() throws IOException {
+ FileChannel channel = mockOneByteAtATimeChannel(new byte[] {1, 2});
+ ByteBuffer buffer = ByteBuffer.allocate(3);
+
+ Assert.assertThrows(EOFException.class, () -> IOUtils.readFully(channel,
buffer));
+ }
+
+ private static FileChannel mockOneByteAtATimeChannel(byte[] bytes) throws
IOException {
+ FileChannel channel = Mockito.mock(FileChannel.class);
+ AtomicInteger index = new AtomicInteger();
+ Mockito.when(channel.read(Mockito.any(ByteBuffer.class)))
+ .thenAnswer(
+ invocation -> {
+ ByteBuffer buffer = invocation.getArgument(0);
+ int currentIndex = index.getAndIncrement();
+ if (currentIndex >= bytes.length) {
+ return -1;
+ }
+ buffer.put(bytes[currentIndex]);
+ return 1;
+ });
+ Mockito.when(channel.read(Mockito.any(ByteBuffer.class),
Mockito.anyLong()))
+ .thenAnswer(
+ invocation -> {
+ ByteBuffer buffer = invocation.getArgument(0);
+ long position = invocation.getArgument(1);
+ if (position >= bytes.length) {
+ return -1;
+ }
+ buffer.put(bytes[(int) position]);
+ return 1;
+ });
+ return channel;
+ }
+
+ private static class OneByteAtATimeInputStream extends InputStream {
+
+ private final byte[] bytes;
+ private int index;
+
+ private OneByteAtATimeInputStream(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public int read() {
+ return index < bytes.length ? bytes[index++] & 0xFF : -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (len == 0) {
+ return 0;
+ }
+ if (index >= bytes.length) {
+ return -1;
+ }
+ b[off] = bytes[index++];
+ return 1;
+ }
+ }
+}