This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1009978  [hotfix][tests] Replace MockOperatorStateBackend with default
1009978 is described below

commit 1009978672bf5989e5e3afae94b938cec1e685f3
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Nov 10 11:05:16 2021 +0100

    [hotfix][tests] Replace MockOperatorStateBackend with default
---
 .../state/ttl/mock/MockOperatorStateBackend.java   | 100 ---------------------
 .../runtime/state/ttl/mock/MockStateBackend.java   |  37 +++++++-
 2 files changed, 36 insertions(+), 101 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
deleted file mode 100644
index 046b7a6..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl.mock;
-
-import org.apache.flink.api.common.state.BroadcastState;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.SnapshotResult;
-
-import javax.annotation.Nonnull;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RunnableFuture;
-
-class MockOperatorStateBackend implements OperatorStateBackend {
-
-    private final HashSet<String> registeredStateNames = new HashSet<>();
-    private final boolean emptySnapshot;
-
-    public MockOperatorStateBackend(boolean emptySnapshot) {
-        this.emptySnapshot = emptySnapshot;
-    }
-
-    @Override
-    public <S> ListState<S> getListState(ListStateDescriptor<S> 
stateDescriptor) throws Exception {
-        registeredStateNames.add(stateDescriptor.getName());
-        ListState<S> state =
-                MockInternalListState.createState(
-                        stateDescriptor.getElementSerializer(), 
stateDescriptor);
-        ((MockInternalKvState) state).values = HashMap::new;
-        return state;
-    }
-
-    @Override
-    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> 
stateDescriptor)
-            throws Exception {
-        return getListState(stateDescriptor);
-    }
-
-    @Override
-    public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, 
V> stateDescriptor)
-            throws Exception {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Set<String> getRegisteredStateNames() {
-        return registeredStateNames;
-    }
-
-    @Override
-    public Set<String> getRegisteredBroadcastStateNames() {
-        return Collections.emptySet();
-    }
-
-    @Nonnull
-    @Override
-    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
-            long checkpointId,
-            long timestamp,
-            @Nonnull CheckpointStreamFactory streamFactory,
-            @Nonnull CheckpointOptions checkpointOptions)
-            throws Exception {
-        if (!emptySnapshot) {
-            throw new UnsupportedOperationException();
-        }
-        return new FutureTask<>(SnapshotResult::empty);
-    }
-
-    @Override
-    public void dispose() {}
-
-    @Override
-    public void close() throws IOException {}
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
index d94644c..bfcd1ef 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
@@ -22,22 +22,31 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotExecutionType;
+import org.apache.flink.runtime.state.SnapshotResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.runtime.state.SnapshotStrategyRunner;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import javax.annotation.Nonnull;
 
 import java.util.Collection;
+import java.util.HashMap;
 
 /** mack state backend. */
 public class MockStateBackend extends AbstractStateBackend {
@@ -87,6 +96,32 @@ public class MockStateBackend extends AbstractStateBackend {
             String operatorIdentifier,
             @Nonnull Collection<OperatorStateHandle> stateHandles,
             CloseableRegistry cancelStreamRegistry) {
-        return new MockOperatorStateBackend(emptySnapshot);
+        return new DefaultOperatorStateBackend(
+                env.getExecutionConfig(),
+                cancelStreamRegistry,
+                new HashMap<>(),
+                new HashMap<>(),
+                new HashMap<>(),
+                new HashMap<>(),
+                new SnapshotStrategyRunner<>(
+                        "",
+                        new SnapshotStrategy<OperatorStateHandle, 
SnapshotResources>() {
+                            @Override
+                            public SnapshotResources syncPrepareResources(long 
checkpointId) {
+                                return () -> {};
+                            }
+
+                            @Override
+                            public SnapshotResultSupplier<OperatorStateHandle> 
asyncSnapshot(
+                                    SnapshotResources syncPartResource,
+                                    long checkpointId,
+                                    long timestamp,
+                                    @Nonnull CheckpointStreamFactory 
streamFactory,
+                                    @Nonnull CheckpointOptions 
checkpointOptions) {
+                                return snapshotCloseableRegistry -> 
SnapshotResult.empty();
+                            }
+                        },
+                        new CloseableRegistry(),
+                        SnapshotExecutionType.ASYNCHRONOUS));
     }
 }

Reply via email to