This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1009978 [hotfix][tests] Replace MockOperatorStateBackend with default
1009978 is described below
commit 1009978672bf5989e5e3afae94b938cec1e685f3
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Nov 10 11:05:16 2021 +0100
[hotfix][tests] Replace MockOperatorStateBackend with default
---
.../state/ttl/mock/MockOperatorStateBackend.java | 100 ---------------------
.../runtime/state/ttl/mock/MockStateBackend.java | 37 +++++++-
2 files changed, 36 insertions(+), 101 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
deleted file mode 100644
index 046b7a6..0000000
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl.mock;
-
-import org.apache.flink.api.common.state.BroadcastState;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.SnapshotResult;
-
-import javax.annotation.Nonnull;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RunnableFuture;
-
-class MockOperatorStateBackend implements OperatorStateBackend {
-
- private final HashSet<String> registeredStateNames = new HashSet<>();
- private final boolean emptySnapshot;
-
- public MockOperatorStateBackend(boolean emptySnapshot) {
- this.emptySnapshot = emptySnapshot;
- }
-
- @Override
- public <S> ListState<S> getListState(ListStateDescriptor<S>
stateDescriptor) throws Exception {
- registeredStateNames.add(stateDescriptor.getName());
- ListState<S> state =
- MockInternalListState.createState(
- stateDescriptor.getElementSerializer(),
stateDescriptor);
- ((MockInternalKvState) state).values = HashMap::new;
- return state;
- }
-
- @Override
- public <S> ListState<S> getUnionListState(ListStateDescriptor<S>
stateDescriptor)
- throws Exception {
- return getListState(stateDescriptor);
- }
-
- @Override
- public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K,
V> stateDescriptor)
- throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Set<String> getRegisteredStateNames() {
- return registeredStateNames;
- }
-
- @Override
- public Set<String> getRegisteredBroadcastStateNames() {
- return Collections.emptySet();
- }
-
- @Nonnull
- @Override
- public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
- long checkpointId,
- long timestamp,
- @Nonnull CheckpointStreamFactory streamFactory,
- @Nonnull CheckpointOptions checkpointOptions)
- throws Exception {
- if (!emptySnapshot) {
- throw new UnsupportedOperationException();
- }
- return new FutureTask<>(SnapshotResult::empty);
- }
-
- @Override
- public void dispose() {}
-
- @Override
- public void close() throws IOException {}
-}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
index d94644c..bfcd1ef 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
@@ -22,22 +22,31 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotExecutionType;
+import org.apache.flink.runtime.state.SnapshotResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import javax.annotation.Nonnull;
import java.util.Collection;
+import java.util.HashMap;
/** mack state backend. */
public class MockStateBackend extends AbstractStateBackend {
@@ -87,6 +96,32 @@ public class MockStateBackend extends AbstractStateBackend {
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) {
- return new MockOperatorStateBackend(emptySnapshot);
+ return new DefaultOperatorStateBackend(
+ env.getExecutionConfig(),
+ cancelStreamRegistry,
+ new HashMap<>(),
+ new HashMap<>(),
+ new HashMap<>(),
+ new HashMap<>(),
+ new SnapshotStrategyRunner<>(
+ "",
+ new SnapshotStrategy<OperatorStateHandle,
SnapshotResources>() {
+ @Override
+ public SnapshotResources syncPrepareResources(long
checkpointId) {
+ return () -> {};
+ }
+
+ @Override
+ public SnapshotResultSupplier<OperatorStateHandle>
asyncSnapshot(
+ SnapshotResources syncPartResource,
+ long checkpointId,
+ long timestamp,
+ @Nonnull CheckpointStreamFactory
streamFactory,
+ @Nonnull CheckpointOptions
checkpointOptions) {
+ return snapshotCloseableRegistry ->
SnapshotResult.empty();
+ }
+ },
+ new CloseableRegistry(),
+ SnapshotExecutionType.ASYNCHRONOUS));
}
}