This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c8089764215 [FLINK-33891] Remove the obsolete SingleJobGraphStore
c8089764215 is described below

commit c80897642152690ef23978e2090be17e60a58039
Author: Zhanghao Chen <[email protected]>
AuthorDate: Wed Dec 20 10:34:35 2023 +0800

    [FLINK-33891] Remove the obsolete SingleJobGraphStore
---
 .../runtime/dispatcher/SingleJobJobGraphStore.java | 81 ----------------------
 .../runner/DefaultDispatcherRunnerITCase.java      |  4 +-
 2 files changed, 2 insertions(+), 83 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java
deleted file mode 100644
index 7059d5798f7..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
-import org.apache.flink.runtime.jobmanager.JobGraphStore;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Objects;
-
-/** {@link JobGraphStore} implementation for a single job. */
-public class SingleJobJobGraphStore implements JobGraphStore {
-
-    private final JobGraph jobGraph;
-
-    public SingleJobJobGraphStore(JobGraph jobGraph) {
-        this.jobGraph = Preconditions.checkNotNull(jobGraph);
-    }
-
-    @Override
-    public void start(JobGraphListener jobGraphListener) throws Exception {
-        // noop
-    }
-
-    @Override
-    public void stop() throws Exception {
-        // noop
-    }
-
-    @Override
-    public JobGraph recoverJobGraph(JobID jobId) throws Exception {
-        if (jobGraph.getJobID().equals(jobId)) {
-            return jobGraph;
-        } else {
-            throw new FlinkException("Could not recover job graph " + jobId + 
'.');
-        }
-    }
-
-    @Override
-    public void putJobGraph(JobGraph jobGraph) throws Exception {
-        if (!Objects.equals(this.jobGraph.getJobID(), jobGraph.getJobID())) {
-            throw new FlinkException(
-                    "Cannot put additional jobs into this submitted job graph 
store.");
-        }
-    }
-
-    @Override
-    public void putJobResourceRequirements(
-            JobID jobId, JobResourceRequirements jobResourceRequirements) 
throws Exception {
-        Preconditions.checkArgument(
-                jobId.equals(jobGraph.getJobID()),
-                String.format("The %s can only store a single job.", 
getClass().getSimpleName()));
-        JobResourceRequirements.writeToJobGraph(jobGraph, 
jobResourceRequirements);
-    }
-
-    @Override
-    public Collection<JobID> getJobIds() {
-        return Collections.singleton(jobGraph.getJobID());
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
index d08b2749714..641bf7fc866 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
 import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
 import 
org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobPersistenceComponents;
 import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
-import org.apache.flink.runtime.dispatcher.SingleJobJobGraphStore;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import 
org.apache.flink.runtime.dispatcher.TestingJobMasterServiceLeadershipRunnerFactory;
 import org.apache.flink.runtime.dispatcher.TestingPartialDispatcherServices;
@@ -183,7 +182,8 @@ class DefaultDispatcherRunnerITCase {
                 DefaultDispatcherRunnerFactory.createSessionRunner(
                         new TestingDispatcherFactory(
                                 jobManagerRunnerFactory, 
cleanupRunnerFactory));
-        jobGraphStore = new SingleJobJobGraphStore(jobGraph);
+        jobGraphStore.start(null);
+        jobGraphStore.putJobGraph(jobGraph);
 
         try (final DispatcherRunner dispatcherRunner = 
createDispatcherRunner()) {
 

Reply via email to