This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1ffb481111f [FLINK-34097] Remove JobMasterGateway#requestJobDetails
1ffb481111f is described below
commit 1ffb481111f658b699702357921a48e914d13caf
Author: Chesnay Schepler <[email protected]>
AuthorDate: Mon Jan 15 16:20:55 2024 +0100
[FLINK-34097] Remove JobMasterGateway#requestJobDetails
---
.../java/org/apache/flink/runtime/jobmaster/JobMaster.java | 6 ------
.../apache/flink/runtime/jobmaster/JobMasterGateway.java | 9 ---------
.../org/apache/flink/runtime/scheduler/SchedulerBase.java | 7 -------
.../org/apache/flink/runtime/scheduler/SchedulerNG.java | 3 ---
.../runtime/scheduler/adaptive/AdaptiveScheduler.java | 6 ------
.../runtime/jobmaster/utils/TestingJobMasterGateway.java | 14 ++++----------
.../jobmaster/utils/TestingJobMasterGatewayBuilder.java | 12 ++++++------
.../apache/flink/runtime/scheduler/TestingSchedulerNG.java | 7 -------
8 files changed, 10 insertions(+), 54 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index bfea710db66..59455b787a6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -71,7 +71,6 @@ import
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
@@ -861,11 +860,6 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
return resourceManagerHeartbeatManager.requestHeartbeat(resourceID,
null);
}
- @Override
- public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
- return
CompletableFuture.completedFuture(schedulerNG.requestJobDetails());
- }
-
@Override
public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
return
CompletableFuture.completedFuture(schedulerNG.requestJobStatus());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 6c1b79568a8..02c3c7d501a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -39,7 +39,6 @@ import
org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -183,14 +182,6 @@ public interface JobMasterGateway
*/
CompletableFuture<Void> heartbeatFromResourceManager(final ResourceID
resourceID);
- /**
- * Request the details of the executed job.
- *
- * @param timeout for the rpc call
- * @return Future details of the executed job
- */
- CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout);
-
/**
* Requests the current job status.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index c11d7b2ca86..7f4ba383e43 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -78,7 +78,6 @@ import
org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
@@ -819,12 +818,6 @@ public abstract class SchedulerBase implements
SchedulerNG, CheckpointScheduling
return executionGraph.getState();
}
- @Override
- public JobDetails requestJobDetails() {
- mainThreadExecutor.assertRunningInMainThread();
- return JobDetails.createDetailsForJob(executionGraph);
- }
-
@Override
public KvStateLocation requestKvStateLocation(final JobID jobId, final
String registrationName)
throws UnknownKvStateLocation, FlinkJobNotFoundException {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 1643dbee282..b22e6204789 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -44,7 +44,6 @@ import
org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -106,8 +105,6 @@ public interface SchedulerNG extends GlobalFailureHandler,
AutoCloseableAsync {
JobStatus requestJobStatus();
- JobDetails requestJobDetails();
-
//
------------------------------------------------------------------------------------
// Methods below do not belong to Scheduler but are included due to
historical reasons
//
------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 760f9e969b1..101766a9780 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -83,7 +83,6 @@ import
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
@@ -613,11 +612,6 @@ public class AdaptiveScheduler
return state.getJobStatus();
}
- @Override
- public JobDetails requestJobDetails() {
- return JobDetails.createDetailsForJob(state.getJob());
- }
-
@Override
public KvStateLocation requestKvStateLocation(JobID jobId, String
registrationName)
throws UnknownKvStateLocation, FlinkJobNotFoundException {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index c519c21f1c7..ecdc78f45a9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -48,7 +48,6 @@ import
org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -129,7 +128,7 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
@Nonnull
private final Function<ResourceID, CompletableFuture<Void>>
resourceManagerHeartbeatFunction;
- @Nonnull private final Supplier<CompletableFuture<JobDetails>>
requestJobDetailsSupplier;
+ @Nonnull private final Supplier<CompletableFuture<JobStatus>>
requestJobStatusSupplier;
@Nonnull private final Supplier<CompletableFuture<ExecutionGraphInfo>>
requestJobSupplier;
@@ -244,7 +243,7 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
CompletableFuture<Void>>
taskManagerHeartbeatFunction,
@Nonnull Function<ResourceID, CompletableFuture<Void>>
resourceManagerHeartbeatFunction,
- @Nonnull Supplier<CompletableFuture<JobDetails>>
requestJobDetailsSupplier,
+ @Nonnull Supplier<CompletableFuture<JobStatus>>
requestJobStatusSupplier,
@Nonnull Supplier<CompletableFuture<ExecutionGraphInfo>>
requestJobSupplier,
@Nonnull
Supplier<CompletableFuture<CheckpointStatsSnapshot>>
@@ -327,7 +326,7 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
this.registerTaskManagerFunction = registerTaskManagerFunction;
this.taskManagerHeartbeatFunction = taskManagerHeartbeatFunction;
this.resourceManagerHeartbeatFunction =
resourceManagerHeartbeatFunction;
- this.requestJobDetailsSupplier = requestJobDetailsSupplier;
+ this.requestJobStatusSupplier = requestJobStatusSupplier;
this.requestJobSupplier = requestJobSupplier;
this.checkpointStatsSnapshotSupplier = checkpointStatsSnapshotSupplier;
this.triggerSavepointFunction = triggerSavepointFunction;
@@ -412,14 +411,9 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
return resourceManagerHeartbeatFunction.apply(resourceID);
}
- @Override
- public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
- return requestJobDetailsSupplier.get();
- }
-
@Override
public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
- return
requestJobDetailsSupplier.get().thenApply(JobDetails::getStatus);
+ return requestJobStatusSupplier.get();
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index e462a0d23c5..6fbc367f800 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobmaster.utils;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple6;
@@ -45,7 +46,6 @@ import
org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -118,7 +118,7 @@ public class TestingJobMasterGatewayBuilder {
(ignoredA, ignoredB) -> FutureUtils.completedVoidFuture();
private Function<ResourceID, CompletableFuture<Void>>
resourceManagerHeartbeatFunction =
ignored -> FutureUtils.completedVoidFuture();
- private Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier =
+ private Supplier<CompletableFuture<JobStatus>> requestJobStatusSupplier =
() -> FutureUtils.completedExceptionally(new
UnsupportedOperationException());
private Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier
=
() -> FutureUtils.completedExceptionally(new
UnsupportedOperationException());
@@ -280,9 +280,9 @@ public class TestingJobMasterGatewayBuilder {
return this;
}
- public TestingJobMasterGatewayBuilder setRequestJobDetailsSupplier(
- Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier)
{
- this.requestJobDetailsSupplier = requestJobDetailsSupplier;
+ public TestingJobMasterGatewayBuilder setRequestJobStatusSupplier(
+ Supplier<CompletableFuture<JobStatus>> requestJobStatusSupplier) {
+ this.requestJobStatusSupplier = requestJobStatusSupplier;
return this;
}
@@ -446,7 +446,7 @@ public class TestingJobMasterGatewayBuilder {
registerTaskManagerFunction,
taskManagerHeartbeatFunction,
resourceManagerHeartbeatFunction,
- requestJobDetailsSupplier,
+ requestJobStatusSupplier,
requestJobSupplier,
checkpointStatsSnapshotSupplier,
triggerSavepointFunction,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 9820f94db81..ad4d33edf96 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -157,12 +156,6 @@ public class TestingSchedulerNG implements SchedulerNG {
return JobStatus.CREATED;
}
- @Override
- public JobDetails requestJobDetails() {
- failOperation();
- return null;
- }
-
@Override
public KvStateLocation requestKvStateLocation(JobID jobId, String
registrationName) {
failOperation();