This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 739d52c [FLINK-21673][state/heap] Add extension points to snapshot
reading/writing
739d52c is described below
commit 739d52c197bf229a4c94b9c8235e9c3bc09bc3e5
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Mar 19 13:01:26 2021 +0100
[FLINK-21673][state/heap] Add extension points to snapshot reading/writing
---
.../main/java/org/apache/flink/runtime/state/heap/StateTable.java | 2 +-
.../flink/runtime/state/heap/StateTableByKeyGroupReaders.java | 6 ++++--
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index c64184d..da694eb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -311,7 +311,7 @@ public abstract class StateTable<K, N, S>
}
@VisibleForTesting
- StateMap<K, N, S> getMapForKeyGroup(int keyGroupIndex) {
+ public StateMap<K, N, S> getMapForKeyGroup(int keyGroupIndex) {
final int pos = indexToOffset(keyGroupIndex);
if (pos >= 0 && pos < keyGroupedStateMaps.length) {
return keyGroupedStateMaps[pos];
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
index f651344..d025aad 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.heap;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.memory.DataInputView;
@@ -35,7 +36,8 @@ import java.io.IOException;
*
* <p>The implementations are also located here as inner classes.
*/
-class StateTableByKeyGroupReaders {
+@Internal
+public class StateTableByKeyGroupReaders {
/**
* Creates a new StateTableByKeyGroupReader that inserts de-serialized
mappings into the given
@@ -48,7 +50,7 @@ class StateTableByKeyGroupReaders {
* @param version version for the de-serialization algorithm.
* @return the appropriate reader.
*/
- static <K, N, S> StateSnapshotKeyGroupReader readerForVersion(
+ public static <K, N, S> StateSnapshotKeyGroupReader readerForVersion(
StateTable<K, N, S> stateTable, int version) {
switch (version) {
case 1: