This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5351203ff6cee3a0a509fcc586ca947f34df1e0c
Author: Till Rohrmann <[email protected]>
AuthorDate: Tue Jan 5 11:17:37 2021 +0100

    [hotfix][tests] Remove ProgrammedSlotProvider
    
    Removing the ProgrammedSlotProvider since it is no longer used.
---
 .../executiongraph/ProgrammedSlotProvider.java     | 163 ---------------------
 1 file changed, 163 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
deleted file mode 100644
index 91825c3..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A slot provider where one can pre-set the slot futures for tasks based on 
vertex ID and subtask
- * index.
- */
-class ProgrammedSlotProvider implements Scheduler {
-
-    private final Map<JobVertexID, CompletableFuture<LogicalSlot>[]> 
slotFutures = new HashMap<>();
-
-    private final Map<JobVertexID, CompletableFuture<Boolean>[]> 
slotFutureRequested =
-            new HashMap<>();
-
-    private final Set<SlotRequestId> slotRequests = new HashSet<>();
-
-    private final Set<SlotRequestId> canceledSlotRequests = new HashSet<>();
-
-    private final int parallelism;
-
-    public ProgrammedSlotProvider(int parallelism) {
-        checkArgument(parallelism > 0);
-        this.parallelism = parallelism;
-    }
-
-    public void addSlot(
-            JobVertexID vertex, int subtaskIndex, 
CompletableFuture<LogicalSlot> future) {
-        checkNotNull(vertex);
-        checkNotNull(future);
-        checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
-
-        CompletableFuture<LogicalSlot>[] futures = slotFutures.get(vertex);
-        CompletableFuture<Boolean>[] requestedFutures = 
slotFutureRequested.get(vertex);
-
-        if (futures == null) {
-            @SuppressWarnings("unchecked")
-            CompletableFuture<LogicalSlot>[] newArray =
-                    (CompletableFuture<LogicalSlot>[]) new 
CompletableFuture<?>[parallelism];
-            futures = newArray;
-            slotFutures.put(vertex, futures);
-
-            requestedFutures = new CompletableFuture[parallelism];
-            slotFutureRequested.put(vertex, requestedFutures);
-        }
-
-        futures[subtaskIndex] = future;
-        requestedFutures[subtaskIndex] = new CompletableFuture<>();
-    }
-
-    public void addSlots(JobVertexID vertex, CompletableFuture<LogicalSlot>[] 
futures) {
-        checkNotNull(vertex);
-        checkNotNull(futures);
-        checkArgument(futures.length == parallelism);
-
-        slotFutures.put(vertex, futures);
-
-        CompletableFuture<Boolean>[] requestedFutures = new 
CompletableFuture[futures.length];
-
-        for (int i = 0; i < futures.length; i++) {
-            requestedFutures[i] = new CompletableFuture<>();
-        }
-
-        slotFutureRequested.put(vertex, requestedFutures);
-    }
-
-    public CompletableFuture<Boolean> getSlotRequestedFuture(
-            JobVertexID jobVertexId, int subtaskIndex) {
-        return slotFutureRequested.get(jobVertexId)[subtaskIndex];
-    }
-
-    public Set<SlotRequestId> getSlotRequests() {
-        return Collections.unmodifiableSet(slotRequests);
-    }
-
-    public Set<SlotRequestId> getCanceledSlotRequests() {
-        return Collections.unmodifiableSet(canceledSlotRequests);
-    }
-
-    @Override
-    public CompletableFuture<LogicalSlot> allocateSlot(
-            SlotRequestId slotRequestId,
-            ScheduledUnit task,
-            SlotProfile slotProfile,
-            Time allocationTimeout) {
-        final JobVertexID vertexId = task.getJobVertexId();
-        final int subtask = task.getSubtaskIndex();
-
-        CompletableFuture<LogicalSlot>[] forTask = slotFutures.get(vertexId);
-        if (forTask != null) {
-            CompletableFuture<LogicalSlot> future = forTask[subtask];
-            if (future != null) {
-                slotFutureRequested.get(vertexId)[subtask].complete(true);
-                slotRequests.add(slotRequestId);
-
-                return future;
-            }
-        }
-
-        throw new IllegalArgumentException(
-                "No registered slot future for task " + vertexId + " (" + 
subtask + ')');
-    }
-
-    @Override
-    public void cancelSlotRequest(
-            SlotRequestId slotRequestId,
-            @Nullable SlotSharingGroupId slotSharingGroupId,
-            Throwable cause) {
-        canceledSlotRequests.add(slotRequestId);
-    }
-
-    @Override
-    public void start(@Nonnull ComponentMainThreadExecutor mainThreadExecutor) 
{}
-
-    @Override
-    public boolean requiresPreviousExecutionGraphAllocations() {
-        return false;
-    }
-
-    @Override
-    public void returnLogicalSlot(LogicalSlot logicalSlot) {
-        throw new UnsupportedOperationException();
-    }
-}

Reply via email to