This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c0f46ef [FLINK-25446][state] Avoid improper sanity check on read
bytes on DataInputStream#read(byte[])
c0f46ef is described below
commit c0f46ef324c35b3ed7813c74931ab9cb589896f7
Author: Yun Tang <[email protected]>
AuthorDate: Mon Dec 27 11:50:04 2021 +0800
[FLINK-25446][state] Avoid improper sanity check on read bytes on
DataInputStream#read(byte[])
---
.../main/java/org/apache/flink/changelog/fs/StateChangeFormat.java | 4 ++--
.../runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java | 5 ++---
2 files changed, 4 insertions(+), 5 deletions(-)
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index 246932e..ba04f59 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.changelog.StateChange;
import
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +45,6 @@ import java.util.TreeMap;
import java.util.stream.Collectors;
import static java.util.Comparator.comparing;
-import static org.apache.flink.util.Preconditions.checkState;
/** Serialization format for state changes. */
@Internal
@@ -135,7 +135,7 @@ public class StateChangeFormat
private StateChange readChange() throws IOException {
int size = input.readInt();
byte[] bytes = new byte[size];
- checkState(size == input.read(bytes));
+ IOUtils.readFully(input, bytes, 0, size);
return new StateChange(keyGroup, bytes);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index b772232..7062598 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -47,6 +47,7 @@ import
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAcce
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
@@ -66,8 +67,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import static org.apache.flink.util.Preconditions.checkState;
-
/**
* Base (De)serializer for checkpoint metadata format version 2 and 3.
*
@@ -457,7 +456,7 @@ public abstract class MetadataV2V3SerializerBase {
int keyGroup = dis.readInt();
int bytesSize = dis.readInt();
byte[] bytes = new byte[bytesSize];
- checkState(bytesSize == dis.read(bytes));
+ IOUtils.readFully(dis, bytes, 0, bytesSize);
changes.add(new StateChange(keyGroup, bytes));
}
return new InMemoryChangelogStateHandle(changes, from, to,
keyGroupRange);