This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 25969c9de1e [FLINK-36243][state/forst] Store namespace in state request and contextKey (#25300) 25969c9de1e is described below commit 25969c9de1e73c2364200ae3d155aca4862ea036 Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Wed Sep 11 19:45:57 2024 +0800 [FLINK-36243][state/forst] Store namespace in state request and contextKey (#25300) --- .../asyncprocessing/AsyncExecutionController.java | 6 ++--- .../runtime/asyncprocessing/StateRequest.java | 12 +++++++++- .../asyncprocessing/StateRequestBuffer.java | 10 ++++---- .../asyncprocessing/StateRequestContainer.java | 2 +- .../runtime/asyncprocessing/MockStateExecutor.java | 2 +- .../asyncprocessing/MockStateRequestContainer.java | 6 ++--- .../state/v2/InternalKeyedStateTestBase.java | 8 +++---- .../org/apache/flink/state/forst/ContextKey.java | 14 ++++++----- .../apache/flink/state/forst/ForStInnerTable.java | 4 ++-- .../apache/flink/state/forst/ForStListState.java | 15 ++++++++---- .../apache/flink/state/forst/ForStMapState.java | 28 +++++++++++++++------- .../state/forst/ForStStateRequestClassifier.java | 4 ++-- .../apache/flink/state/forst/ForStValueState.java | 14 +++++++---- .../state/forst/ForStDBOperationTestBase.java | 4 ++-- .../flink/state/forst/ForStStateExecutorTest.java | 20 ++++++++-------- 15 files changed, 90 insertions(+), 59 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 24ae7e9342f..ad767aa340a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -239,7 +239,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler { @Nullable State state, StateRequestType type, @Nullable IN payload) { // Step 1: build state future & assign context. InternalStateFuture<OUT> stateFuture = stateFutureFactory.create(currentContext); - StateRequest<K, IN, OUT> request = + StateRequest<K, ?, IN, OUT> request = new StateRequest<>(state, type, payload, stateFuture, currentContext); // Step 2: try to seize the capacity, if the current in-flight records exceeds the limit, @@ -278,11 +278,11 @@ public class AsyncExecutionController<K> implements StateRequestHandler { currentContext.setNamespace(state, namespace); } - <IN, OUT> void insertActiveBuffer(StateRequest<K, IN, OUT> request) { + <IN, OUT> void insertActiveBuffer(StateRequest<K, ?, IN, OUT> request) { stateRequestsBuffer.enqueueToActive(request); } - <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) { + <IN, OUT> void insertBlockingBuffer(StateRequest<K, ?, IN, OUT> request) { stateRequestsBuffer.enqueueToBlocking(request); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java index 87fd02926f2..32527a2e881 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.runtime.state.v2.InternalPartitionedState; import javax.annotation.Nullable; @@ -30,9 +31,10 @@ import java.io.Serializable; * * @param <K> Type of partitioned key. * @param <IN> Type of input of this request. + * @param <N> Type of namespace. * @param <OUT> Type of value that request will return. */ -public class StateRequest<K, IN, OUT> implements Serializable { +public class StateRequest<K, N, IN, OUT> implements Serializable { /** * The underlying state to be accessed, can be empty for {@link StateRequestType#SYNC_POINT}. @@ -51,6 +53,8 @@ public class StateRequest<K, IN, OUT> implements Serializable { /** The record context of this request. */ private final RecordContext<K> context; + @Nullable private final N namespace; + public StateRequest( @Nullable State state, StateRequestType type, @@ -62,6 +66,8 @@ public class StateRequest<K, IN, OUT> implements Serializable { this.payload = payload; this.stateFuture = stateFuture; this.context = context; + this.namespace = + state == null ? null : context.getNamespace((InternalPartitionedState<N>) state); } public StateRequestType getRequestType() { @@ -85,4 +91,8 @@ public class StateRequest<K, IN, OUT> implements Serializable { public RecordContext<K> getRecordContext() { return context; } + + public @Nullable N getNamespace() { + return namespace; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java index 263b7cce092..04379d2d34b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java @@ -60,14 +60,14 @@ public class StateRequestBuffer<K> { * The state requests in this buffer could be executed when the buffer is full or configured * batch size is reached. All operations on this buffer must be invoked in task thread. */ - final LinkedList<StateRequest<K, ?, ?>> activeQueue; + final LinkedList<StateRequest<K, ?, ?, ?>> activeQueue; /** * The requests in that should wait until all preceding records with identical key finishing its * execution. After which the queueing requests will move into the active buffer. All operations * on this buffer must be invoked in task thread. */ - final Map<K, Deque<StateRequest<K, ?, ?>>> blockingQueue; + final Map<K, Deque<StateRequest<K, ?, ?, ?>>> blockingQueue; /** The number of state requests in blocking queue. */ int blockingQueueSize; @@ -120,7 +120,7 @@ public class StateRequestBuffer<K> { return currentSeq.get() == seq; } - void enqueueToActive(StateRequest<K, ?, ?> request) { + void enqueueToActive(StateRequest<K, ?, ?, ?> request) { if (request.getRequestType() == StateRequestType.SYNC_POINT) { request.getFuture().complete(null); } else { @@ -147,7 +147,7 @@ public class StateRequestBuffer<K> { } } - void enqueueToBlocking(StateRequest<K, ?, ?> request) { + void enqueueToBlocking(StateRequest<K, ?, ?, ?> request) { blockingQueue .computeIfAbsent(request.getRecordContext().getKey(), k -> new LinkedList<>()) .add(request); @@ -166,7 +166,7 @@ public class StateRequestBuffer<K> { return null; } - StateRequest<K, ?, ?> stateRequest = blockingQueue.get(key).removeFirst(); + StateRequest<K, ?, ?, ?> stateRequest = blockingQueue.get(key).removeFirst(); enqueueToActive(stateRequest); if (blockingQueue.get(key).isEmpty()) { blockingQueue.remove(key); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestContainer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestContainer.java index 1a389272263..adc93e13995 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestContainer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestContainer.java @@ -29,7 +29,7 @@ package org.apache.flink.runtime.asyncprocessing; public interface StateRequestContainer { /** Preserve a stateRequest into the {@code StateRequestContainer}. */ - void offer(StateRequest<?, ?, ?> stateRequest); + void offer(StateRequest<?, ?, ?, ?> stateRequest); /** Returns whether the container is empty. */ boolean isEmpty(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java index b36c1da8426..8939dc7210d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java @@ -29,7 +29,7 @@ public class MockStateExecutor implements StateExecutor { public CompletableFuture<Void> executeBatchRequests( StateRequestContainer stateRequestContainer) { Preconditions.checkArgument(stateRequestContainer instanceof MockStateRequestContainer); - for (StateRequest<?, ?, ?> request : + for (StateRequest<?, ?, ?, ?> request : ((MockStateRequestContainer) stateRequestContainer).getStateRequestList()) { request.getFuture().complete(null); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateRequestContainer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateRequestContainer.java index f2200a55c03..0fa8746e3ae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateRequestContainer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateRequestContainer.java @@ -24,10 +24,10 @@ import java.util.List; /** The mocked {@link StateRequestContainer} for testing. */ public class MockStateRequestContainer implements StateRequestContainer { - private final List<StateRequest<?, ?, ?>> stateRequestList = new ArrayList<>(); + private final List<StateRequest<?, ?, ?, ?>> stateRequestList = new ArrayList<>(); @Override - public void offer(StateRequest<?, ?, ?> stateRequest) { + public void offer(StateRequest<?, ?, ?, ?> stateRequest) { stateRequestList.add(stateRequest); } @@ -36,7 +36,7 @@ public class MockStateRequestContainer implements StateRequestContainer { return stateRequestList.isEmpty(); } - public List<StateRequest<?, ?, ?>> getStateRequestList() { + public List<StateRequest<?, ?, ?, ?>> getStateRequestList() { return stateRequestList; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalKeyedStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalKeyedStateTestBase.java index 22cce5e636d..91923ff7705 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalKeyedStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalKeyedStateTestBase.java @@ -154,7 +154,7 @@ public class InternalKeyedStateTestBase { */ static class TestStateExecutor implements StateExecutor { - private Deque<StateRequest<?, ?, ?>> receivedRequest; + private Deque<StateRequest<?, ?, ?, ?>> receivedRequest; TestStateExecutor() { receivedRequest = new ConcurrentLinkedDeque<>(); @@ -162,7 +162,7 @@ public class InternalKeyedStateTestBase { <IN> void validate(@Nullable State state, StateRequestType type, @Nullable IN payload) { assertThat(receivedRequest.isEmpty()).isFalse(); - StateRequest<?, ?, ?> request = receivedRequest.pop(); + StateRequest<?, ?, ?, ?> request = receivedRequest.pop(); assertThat(request.getState()).isEqualTo(state); assertThat(request.getRequestType()).isEqualTo(type); assertThat(request.getPayload()).isEqualTo(payload); @@ -194,10 +194,10 @@ public class InternalKeyedStateTestBase { public void shutdown() {} static class TestStateRequestContainer implements StateRequestContainer { - ArrayList<StateRequest<?, ?, ?>> requests = new ArrayList<>(); + ArrayList<StateRequest<?, ?, ?, ?>> requests = new ArrayList<>(); @Override - public void offer(StateRequest<?, ?, ?> stateRequest) { + public void offer(StateRequest<?, ?, ?, ?> stateRequest) { requests.add(stateRequest); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java index 16807b78f10..53e7573a118 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java @@ -19,7 +19,6 @@ package org.apache.flink.state.forst; import org.apache.flink.runtime.asyncprocessing.RecordContext; -import org.apache.flink.runtime.state.v2.InternalPartitionedState; import org.apache.flink.util.function.FunctionWithException; import javax.annotation.Nullable; @@ -40,12 +39,15 @@ public class ContextKey<K, N> { @Nullable private Object userKey; - public ContextKey(RecordContext<K> recordContext) { - this.recordContext = recordContext; + @Nullable private final N namespace; + + public ContextKey(RecordContext<K> recordContext, @Nullable N namespace) { + this(recordContext, namespace, null); } - public ContextKey(RecordContext<K> recordContext, Object userKey) { + public ContextKey(RecordContext<K> recordContext, @Nullable N namespace, Object userKey) { this.recordContext = recordContext; + this.namespace = namespace; this.userKey = userKey; } @@ -57,8 +59,8 @@ public class ContextKey<K, N> { return recordContext.getKeyGroup(); } - public N getNamespace(InternalPartitionedState<N> state) { - return recordContext.getNamespace(state); + public N getNamespace() { + return namespace; } public Object getUserKey() { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java index 141c63cf1e2..2984a815a20 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java @@ -73,7 +73,7 @@ public interface ForStInnerTable<K, N, V> { * @param stateRequest The given stateRequest. * @return The corresponding ForSt GetRequest. */ - ForStDBGetRequest<?, ?, ?, ?> buildDBGetRequest(StateRequest<?, ?, ?> stateRequest); + ForStDBGetRequest<?, ?, ?, ?> buildDBGetRequest(StateRequest<?, ?, ?, ?> stateRequest); /** * Build a {@link ForStDBPutRequest} that belong to {@code ForStInnerTable} with the given @@ -82,5 +82,5 @@ public interface ForStInnerTable<K, N, V> { * @param stateRequest The given stateRequest. * @return The corresponding ForSt PutRequest. */ - ForStDBPutRequest<?, ?, ?> buildDBPutRequest(StateRequest<?, ?, ?> stateRequest); + ForStDBPutRequest<?, ?, ?> buildDBPutRequest(StateRequest<?, ?, ?, ?> stateRequest); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java index 152bc0d92ea..986dbb11ddc 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java @@ -99,7 +99,7 @@ public class ForStListState<K, N, V> extends InternalListState<K, N, V> ctxKey -> { SerializedCompositeKeyBuilder<K> builder = serializedKeyBuilder.get(); builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup()); - N namespace = contextKey.getNamespace(this); + N namespace = contextKey.getNamespace(); return builder.buildCompositeKeyNamespace( namespace == null ? defaultNamespace : namespace, namespaceSerializer.get()); @@ -123,19 +123,24 @@ public class ForStListState<K, N, V> extends InternalListState<K, N, V> @SuppressWarnings("unchecked") @Override public ForStDBGetRequest<K, N, List<V>, StateIterator<V>> buildDBGetRequest( - StateRequest<?, ?, ?> stateRequest) { + StateRequest<?, ?, ?, ?> stateRequest) { Preconditions.checkArgument(stateRequest.getRequestType() == StateRequestType.LIST_GET); ContextKey<K, N> contextKey = - new ContextKey<>((RecordContext<K>) stateRequest.getRecordContext()); + new ContextKey<>( + (RecordContext<K>) stateRequest.getRecordContext(), + (N) stateRequest.getNamespace()); return new ForStDBListGetRequest<>( contextKey, this, (InternalStateFuture<StateIterator<V>>) stateRequest.getFuture()); } @SuppressWarnings("unchecked") @Override - public ForStDBPutRequest<K, N, List<V>> buildDBPutRequest(StateRequest<?, ?, ?> stateRequest) { + public ForStDBPutRequest<K, N, List<V>> buildDBPutRequest( + StateRequest<?, ?, ?, ?> stateRequest) { ContextKey<K, N> contextKey = - new ContextKey<>((RecordContext<K>) stateRequest.getRecordContext()); + new ContextKey<>( + (RecordContext<K>) stateRequest.getRecordContext(), + (N) stateRequest.getNamespace()); List<V> value; boolean merge = false; switch (stateRequest.getRequestType()) { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java index 71b30b9ede7..fe23949febb 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java @@ -119,8 +119,10 @@ public class ForStMapState<K, N, UK, UV> extends InternalMapState<K, N, UK, UV> ctxKey -> { SerializedCompositeKeyBuilder<K> builder = serializedKeyBuilder.get(); builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup()); - N namespace = contextKey.getNamespace(this); - builder.setNamespace(namespace, namespaceSerializer.get()); + N namespace = contextKey.getNamespace(); + builder.setNamespace( + namespace == null ? defaultNamespace : namespace, + namespaceSerializer.get()); if (contextKey.getUserKey() == null) { // value get return builder.build(); } @@ -152,7 +154,7 @@ public class ForStMapState<K, N, UK, UV> extends InternalMapState<K, N, UK, UV> @Override @SuppressWarnings("unchecked") - public ForStDBGetRequest<?, ?, ?, ?> buildDBGetRequest(StateRequest<?, ?, ?> stateRequest) { + public ForStDBGetRequest<?, ?, ?, ?> buildDBGetRequest(StateRequest<?, ?, ?, ?> stateRequest) { Preconditions.checkArgument( stateRequest.getRequestType() == StateRequestType.MAP_GET || stateRequest.getRequestType() == StateRequestType.MAP_CONTAINS @@ -160,6 +162,7 @@ public class ForStMapState<K, N, UK, UV> extends InternalMapState<K, N, UK, UV> ContextKey<K, N> contextKey = new ContextKey<>( (RecordContext<K>) stateRequest.getRecordContext(), + (N) stateRequest.getNamespace(), stateRequest.getPayload()); if (stateRequest.getRequestType() == StateRequestType.MAP_GET) { @@ -175,13 +178,14 @@ public class ForStMapState<K, N, UK, UV> extends InternalMapState<K, N, UK, UV> @Override @SuppressWarnings("unchecked") - public ForStDBPutRequest<K, N, UV> buildDBPutRequest(StateRequest<?, ?, ?> stateRequest) { + public ForStDBPutRequest<K, N, UV> buildDBPutRequest(StateRequest<?, ?, ?, ?> stateRequest) { Preconditions.checkArgument( stateRequest.getRequestType() == StateRequestType.MAP_PUT || stateRequest.getRequestType() == StateRequestType.MAP_REMOVE); ContextKey<K, N> contextKey = new ContextKey<>( (RecordContext<K>) stateRequest.getRecordContext(), + (N) stateRequest.getNamespace(), ((Tuple2<UK, UV>) stateRequest.getPayload()).f0); Preconditions.checkNotNull( stateRequest.getPayload(), String.format("payload is null, %s", stateRequest)); @@ -203,12 +207,15 @@ public class ForStMapState<K, N, UK, UV> extends InternalMapState<K, N, UK, UV> */ @SuppressWarnings("unchecked") public ForStDBBunchPutRequest<K, N, UK, UV> buildDBBunchPutRequest( - StateRequest<?, ?, ?> stateRequest) { + StateRequest<?, ?, ?, ?> stateRequest) { Preconditions.checkArgument( stateRequest.getRequestType() == StateRequestType.MAP_PUT_ALL || stateRequest.getRequestType() == StateRequestType.CLEAR); ContextKey<K, N> contextKey = - new ContextKey<>((RecordContext<K>) stateRequest.getRecordContext(), null); + new ContextKey<>( + (RecordContext<K>) stateRequest.getRecordContext(), + (N) stateRequest.getNamespace(), + null); Map<UK, UV> value = (Map<UK, UV>) stateRequest.getPayload(); return new ForStDBBunchPutRequest(contextKey, value, this, stateRequest.getFuture()); } @@ -223,7 +230,7 @@ public class ForStMapState<K, N, UK, UV> extends InternalMapState<K, N, UK, UV> */ @SuppressWarnings("unchecked") public ForStDBIterRequest<K, N, UK, UV, ?> buildDBIterRequest( - StateRequest<?, ?, ?> stateRequest) { + StateRequest<?, ?, ?, ?> stateRequest) { Preconditions.checkArgument( stateRequest.getRequestType() == StateRequestType.MAP_ITER || stateRequest.getRequestType() == StateRequestType.MAP_ITER_KEY @@ -242,11 +249,14 @@ public class ForStMapState<K, N, UK, UV> extends InternalMapState<K, N, UK, UV> @SuppressWarnings("unchecked") private ForStDBIterRequest<K, N, UK, UV, ?> buildDBIterRequest( - StateRequest<?, ?, ?> stateRequest, + StateRequest<?, ?, ?, ?> stateRequest, StateRequestType requestType, RocksIterator rocksIterator) { ContextKey<K, N> contextKey = - new ContextKey<>((RecordContext<K>) stateRequest.getRecordContext(), null); + new ContextKey<>( + (RecordContext<K>) stateRequest.getRecordContext(), + (N) stateRequest.getNamespace(), + null); switch (requestType) { case MAP_ITER: return new ForStDBMapEntryIterRequest<>( diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java index 9e9510e4327..ea6ede88acf 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java @@ -44,7 +44,7 @@ public class ForStStateRequestClassifier implements StateRequestContainer { } @Override - public void offer(StateRequest<?, ?, ?> stateRequest) { + public void offer(StateRequest<?, ?, ?, ?> stateRequest) { convertStateRequestsToForStDBRequests(stateRequest); } @@ -54,7 +54,7 @@ public class ForStStateRequestClassifier implements StateRequestContainer { } @SuppressWarnings("ConstantConditions") - private void convertStateRequestsToForStDBRequests(StateRequest<?, ?, ?> stateRequest) { + private void convertStateRequestsToForStDBRequests(StateRequest<?, ?, ?, ?> stateRequest) { StateRequestType stateRequestType = stateRequest.getRequestType(); switch (stateRequestType) { case VALUE_GET: diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java index cd6968bb12f..31d65f521a6 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java @@ -93,7 +93,7 @@ public class ForStValueState<K, N, V> extends InternalValueState<K, N, V> ctxKey -> { SerializedCompositeKeyBuilder<K> builder = serializedKeyBuilder.get(); builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup()); - N namespace = contextKey.getNamespace(this); + N namespace = contextKey.getNamespace(); return builder.buildCompositeKeyNamespace( namespace == null ? defaultNamespace : namespace, namespaceSerializer.get()); @@ -117,22 +117,26 @@ public class ForStValueState<K, N, V> extends InternalValueState<K, N, V> @SuppressWarnings("unchecked") @Override - public ForStDBGetRequest<K, N, V, V> buildDBGetRequest(StateRequest<?, ?, ?> stateRequest) { + public ForStDBGetRequest<K, N, V, V> buildDBGetRequest(StateRequest<?, ?, ?, ?> stateRequest) { Preconditions.checkArgument(stateRequest.getRequestType() == StateRequestType.VALUE_GET); ContextKey<K, N> contextKey = - new ContextKey<>((RecordContext<K>) stateRequest.getRecordContext()); + new ContextKey<>( + (RecordContext<K>) stateRequest.getRecordContext(), + (N) stateRequest.getNamespace()); return new ForStDBSingleGetRequest<>( contextKey, this, (InternalStateFuture<V>) stateRequest.getFuture()); } @SuppressWarnings("unchecked") @Override - public ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?> stateRequest) { + public ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?, ?> stateRequest) { Preconditions.checkArgument( stateRequest.getRequestType() == StateRequestType.VALUE_UPDATE || stateRequest.getRequestType() == StateRequestType.CLEAR); ContextKey<K, N> contextKey = - new ContextKey<>((RecordContext<K>) stateRequest.getRecordContext()); + new ContextKey<>( + (RecordContext<K>) stateRequest.getRecordContext(), + (N) stateRequest.getNamespace()); V value = (stateRequest.getRequestType() == StateRequestType.CLEAR) ? null // "Delete(key)" is equivalent to "Put(key, null)" diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java index dba6254dc55..df8c9ff42c3 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java @@ -118,7 +118,7 @@ public class ForStDBOperationTestBase { int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i, 128); RecordContext<Integer> recordContext = new RecordContext<>(i, i, t -> {}, keyGroup, new Epoch(0)); - return new ContextKey<>(recordContext); + return new ContextKey<>(recordContext, VoidNamespace.INSTANCE, null); } protected ForStValueState<Integer, VoidNamespace, String> buildForStValueState(String stateName) @@ -216,7 +216,7 @@ public class ForStDBOperationTestBase { @Override public void completeExceptionally(String message, Throwable ex) { - throw new UnsupportedOperationException(); + // do nothing } @Override diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java index b7872dc97f9..96f0c30a04f 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java @@ -66,13 +66,13 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { forStStateExecutor.executeBatchRequests(stateRequestContainer).get(); - List<StateRequest<?, ?, ?>> checkList = new ArrayList<>(); + List<StateRequest<?, ?, ?, ?>> checkList = new ArrayList<>(); stateRequestContainer = forStStateExecutor.createStateRequestContainer(); // 2. Get value state: keyRange [0, keyNum) // Update value state: keyRange [keyNum, keyNum + 100] for (int i = 0; i < keyNum; i++) { ForStValueState<Integer, VoidNamespace, String> state = (i % 2 == 0 ? state1 : state2); - StateRequest<?, ?, ?> getRequest = + StateRequest<?, ?, ?, ?> getRequest = buildStateRequest(state, StateRequestType.VALUE_GET, i, null, i * 2); stateRequestContainer.offer(getRequest); checkList.add(getRequest); @@ -85,7 +85,7 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { forStStateExecutor.executeBatchRequests(stateRequestContainer).get(); // 3. Check value state Get result : [0, keyNum) - for (StateRequest<?, ?, ?> getRequest : checkList) { + for (StateRequest<?, ?, ?, ?> getRequest : checkList) { assertThat(getRequest.getRequestType()).isEqualTo(StateRequestType.VALUE_GET); int key = (Integer) getRequest.getRecordContext().getKey(); assertThat(getRequest.getRecordContext().getRecord()).isEqualTo(key * 2); @@ -113,13 +113,13 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { checkList.clear(); for (int i = keyNum - 100; i < keyNum + 100; i++) { ForStValueState<Integer, VoidNamespace, String> state = (i % 2 == 0 ? state1 : state2); - StateRequest<?, ?, ?> getRequest = + StateRequest<?, ?, ?, ?> getRequest = buildStateRequest(state, StateRequestType.VALUE_GET, i, null, i * 2); stateRequestContainer.offer(getRequest); checkList.add(getRequest); } forStStateExecutor.executeBatchRequests(stateRequestContainer).get(); - for (StateRequest<?, ?, ?> getRequest : checkList) { + for (StateRequest<?, ?, ?, ?> getRequest : checkList) { assertThat(getRequest.getRequestType()).isEqualTo(StateRequestType.VALUE_GET); assertThat(((TestStateFuture<String>) getRequest.getFuture()).getCompletedResult()) .isEqualTo(null); @@ -157,11 +157,11 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { forStStateExecutor.executeBatchRequests(stateRequestContainer).get(); stateRequestContainer = forStStateExecutor.createStateRequestContainer(); - List<StateRequest<?, ?, ?>> checkList = new ArrayList<>(); + List<StateRequest<?, ?, ?, ?>> checkList = new ArrayList<>(); // 2. check the number of user key under primary key is correct for (int i = 0; i < 100; i++) { - StateRequest<?, ?, ?> iterRequest = + StateRequest<?, ?, ?, ?> iterRequest = buildStateRequest(state, StateRequestType.MAP_ITER_KEY, i, null, i * 2); stateRequestContainer.offer(iterRequest); checkList.add(iterRequest); @@ -208,7 +208,7 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { checkList.clear(); // 4. check primary key [75,100) is deleted for (int i = 0; i < 100; i++) { - StateRequest<?, ?, ?> iterRequest = + StateRequest<?, ?, ?, ?> iterRequest = buildStateRequest(state, StateRequestType.MAP_IS_EMPTY, i, null, i * 2); stateRequestContainer.offer(iterRequest); checkList.add(iterRequest); @@ -229,7 +229,7 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { } @SuppressWarnings({"rawtypes", "unchecked"}) - private <K, N, V, R> StateRequest<?, ?, ?> buildStateRequest( + private <K, N, V, R> StateRequest<?, ?, ?, ?> buildStateRequest( InternalKeyedState<K, N, V> innerTable, StateRequestType requestType, K key, @@ -243,7 +243,7 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { } @SuppressWarnings({"rawtypes", "unchecked"}) - private <K, N, UK, UV, R> StateRequest<?, ?, ?> buildMapRequest( + private <K, N, UK, UV, R> StateRequest<?, ?, ?, ?> buildMapRequest( ForStMapState<K, N, UK, UV> innerTable, StateRequestType requestType, K key,