Repository: flink Updated Branches: refs/heads/flip-6 9da76dcfd -> 928800569
http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java new file mode 100644 index 0000000..66d8102 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.exceptions; + +/** + * Exception indicating that the slot allocation on the task manager failed. + */ +public class SlotAllocationException extends TaskManagerException { + + private static final long serialVersionUID = -4764932098204266773L; + + public SlotAllocationException(String message) { + super(message); + } + + public SlotAllocationException(String message, Throwable cause) { + super(message, cause); + } + + public SlotAllocationException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 42cb919..88123b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -70,10 +70,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { /** Interface for slot actions, such as freeing them or timing them out */ private SlotActions slotActions; - - /** The timeout for allocated slots */ - private Time slotTimeout; - + /** Whether the table has been started */ private boolean started; @@ -104,7 +101,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { slotsPerJob = new HashMap<>(4); slotActions = null; - slotTimeout = null; started = false; } @@ -112,11 +108,9 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { * Start the task slot table with the given slot actions and slot timeout value. * * @param initialSlotActions to use for slot actions - * @param initialSlotTimeout to use for slot timeouts */ - public void start(SlotActions initialSlotActions, Time initialSlotTimeout) { + public void start(SlotActions initialSlotActions) { this.slotActions = Preconditions.checkNotNull(initialSlotActions); - this.slotTimeout = Preconditions.checkNotNull(initialSlotTimeout); timerService.start(this); @@ -129,7 +123,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { public void stop() { started = false; timerService.stop(); - slotTimeout = null; slotActions = null; } @@ -144,9 +137,10 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { * @param index of the task slot to allocate * @param jobId to allocate the task slot for * @param allocationId identifying the allocation + * @param slotTimeout until the slot times out * @return True if the task slot could be allocated; otherwise false */ - public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId) { + public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) { checkInit(); TaskSlot taskSlot = taskSlots.get(index); @@ -180,7 +174,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { * * @param allocationId to identify the task slot to mark as active * @throws SlotNotFoundException if the slot could not be found for the given allocation id - * @return True if the slot could be marked active + * @return True if the slot could be marked active; otherwise false */ public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException { checkInit(); @@ -190,6 +184,8 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { if (taskSlot != null) { if (taskSlot.markActive()) { // unregister a potential timeout + LOG.info("Activate slot {}.", allocationId); + timerService.unregisterTimeout(allocationId); return true; @@ -206,10 +202,11 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { * then a {@link SlotNotFoundException} is thrown. * * @param allocationId to identify the task slot to mark as inactive + * @param slotTimeout until the slot times out * @throws SlotNotFoundException if the slot could not be found for the given allocation id * @return True if the slot could be marked inactive */ - public boolean markSlotInactive(AllocationID allocationId) throws SlotNotFoundException { + public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException { checkInit(); TaskSlot taskSlot = getTaskSlot(allocationId); @@ -253,6 +250,12 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { */ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { checkInit(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Free slot {}.", allocationId, cause); + } else { + LOG.info("Free slot {}.", allocationId); + } TaskSlot taskSlot = getTaskSlot(allocationId); @@ -322,8 +325,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { * @return True if the given task slot is allocated for the given job and allocation id */ public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) { - checkInit(); - TaskSlot taskSlot = taskSlots.get(index); return taskSlot.isAllocated(jobId, allocationId); @@ -336,7 +337,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { * @param allocationId identifying the allocation * @return True if there exists a task slot which is active for the given job and allocation id. */ - public boolean existActiveSlot(JobID jobId, AllocationID allocationId) { + public boolean existsActiveSlot(JobID jobId, AllocationID allocationId) { TaskSlot taskSlot = getTaskSlot(allocationId); if (taskSlot != null) { @@ -431,6 +432,8 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { * @return The removed task if there is any for the given execution attempt id; otherwise null */ public Task removeTask(ExecutionAttemptID executionAttemptID) { + checkInit(); + TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID); if (taskSlotMapping != null) { @@ -481,6 +484,8 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { @Override public void notifyTimeout(AllocationID key, UUID ticket) { + checkInit(); + if (slotActions != null) { slotActions.timeoutSlot(key, ticket); } @@ -493,9 +498,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { private TaskSlot getTaskSlot(AllocationID allocationId) { Preconditions.checkNotNull(allocationId); - TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId); - - return taskSlot; + return allocationIDTaskSlotMap.get(allocationId); } private void checkInit() { http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index b67737d..90a829c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -278,7 +278,7 @@ public class Task implements Runnable, TaskActions { this.jobId = checkNotNull(tdd.getJobID()); this.vertexId = checkNotNull(tdd.getVertexID()); this.executionId = checkNotNull(tdd.getExecutionId()); - this.allocationId = checkNotNull(tdd.getAllocationID()); + this.allocationId = checkNotNull(tdd.getAllocationId()); this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks(); this.jobConfiguration = checkNotNull(tdd.getJobConfiguration()); this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration()); http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index 39ea176..993fd19 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -29,6 +29,7 @@ import java.util.List; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -44,6 +45,7 @@ public class TaskDeploymentDescriptorTest { public void testSerialization() { try { final JobID jobID = new JobID(); + final AllocationID allocationId = new AllocationID(); final JobVertexID vertexID = new JobVertexID(); final ExecutionAttemptID execId = new ExecutionAttemptID(); final String jobName = "job name"; @@ -61,7 +63,7 @@ public class TaskDeploymentDescriptorTest { final List<URL> requiredClasspaths = new ArrayList<URL>(0); final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig()); - final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, jobName, vertexID, execId, + final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, allocationId, jobName, vertexID, execId, executionConfig, taskName, numberOfKeyGroups, indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber, jobConfiguration, taskConfiguration, invokableClass.getName(), producedResults, inputGates, requiredJars, requiredClasspaths, 47); @@ -69,12 +71,14 @@ public class TaskDeploymentDescriptorTest { final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); assertFalse(orig.getJobID() == copy.getJobID()); + assertFalse(orig.getAllocationId() == copy.getAllocationId()); assertFalse(orig.getVertexID() == copy.getVertexID()); assertFalse(orig.getTaskName() == copy.getTaskName()); assertFalse(orig.getJobConfiguration() == copy.getJobConfiguration()); assertFalse(orig.getTaskConfiguration() == copy.getTaskConfiguration()); assertEquals(orig.getJobID(), copy.getJobID()); + assertEquals(orig.getAllocationId(), copy.getAllocationId()); assertEquals(orig.getVertexID(), copy.getVertexID()); assertEquals(orig.getTaskName(), copy.getTaskName()); assertEquals(orig.getNumberOfKeyGroups(), copy.getNumberOfKeyGroups()); http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index a255027..38e372d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) throws Exception { LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID); if (service != null) { return service; http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java index 49f2268..073aeac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -75,7 +76,8 @@ public class TaskManagerGroupTest extends TestLogger { final ExecutionAttemptID execution21 = new ExecutionAttemptID(); TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( - jid1, + jid1, + new AllocationID(), jobName1, vertex11, execution11, @@ -91,6 +93,7 @@ public class TaskManagerGroupTest extends TestLogger { TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( jid1, + new AllocationID(), jobName1, vertex12, execution12, @@ -106,6 +109,7 @@ public class TaskManagerGroupTest extends TestLogger { TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor( jid2, + new AllocationID(), jobName2, vertex21, execution21, @@ -121,6 +125,7 @@ public class TaskManagerGroupTest extends TestLogger { TaskDeploymentDescriptor tdd4 = new TaskDeploymentDescriptor( jid1, + new AllocationID(), jobName1, vertex13, execution13, @@ -192,6 +197,7 @@ public class TaskManagerGroupTest extends TestLogger { TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( jid1, + new AllocationID(), jobName1, vertex11, execution11, @@ -207,6 +213,7 @@ public class TaskManagerGroupTest extends TestLogger { TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( jid1, + new AllocationID(), jobName1, vertex12, execution12, @@ -222,6 +229,7 @@ public class TaskManagerGroupTest extends TestLogger { TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor( jid2, + new AllocationID(), jobName2, vertex21, execution21, http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 558d3c2..948c129 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -68,7 +68,7 @@ public class SlotManagerTest { taskExecutorRegistration = Mockito.mock(TaskExecutorRegistration.class); TaskExecutorGateway gateway = Mockito.mock(TaskExecutorGateway.class); Mockito.when(taskExecutorRegistration.getTaskExecutorGateway()).thenReturn(gateway); - Mockito.when(gateway.requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class))) + Mockito.when(gateway.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class))) .thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>()); } http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 24d959e..86cd1f8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -144,7 +144,7 @@ public class SlotProtocolTest extends TestLogger { Mockito .when( taskExecutorGateway - .requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class))) + .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class))) .thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>()); testRpcService.registerGateway(tmAddress, taskExecutorGateway); @@ -161,7 +161,7 @@ public class SlotProtocolTest extends TestLogger { // 4) Slot becomes available and TaskExecutor gets a SlotRequest verify(taskExecutorGateway, timeout(5000)) - .requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class)); + .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class)); } /** @@ -189,7 +189,7 @@ public class SlotProtocolTest extends TestLogger { TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); Mockito.when( taskExecutorGateway - .requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class))) + .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class))) .thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>()); testRpcService.registerGateway(tmAddress, taskExecutorGateway); @@ -240,7 +240,7 @@ public class SlotProtocolTest extends TestLogger { // 4) a SlotRequest is routed to the TaskExecutor verify(taskExecutorGateway, timeout(5000)) - .requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class)); + .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class)); } private static TestingLeaderElectionService configureHA( http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index e7143ae..bbde331 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -287,17 +288,27 @@ public class RpcCompletenessTest extends TestLogger { if (!futureClass.equals(RpcCompletenessTest.futureClass)) { return false; } else { - Class<?> valueClass = ReflectionUtil.getTemplateType1(gatewayMethod.getGenericReturnType()); + ReflectionUtil.FullTypeInfo fullValueTypeInfo = ReflectionUtil.getFullTemplateType(gatewayMethod.getGenericReturnType(), 0); if (endpointMethod.getReturnType().equals(futureClass)) { - Class<?> rpcEndpointValueClass = ReflectionUtil.getTemplateType1(endpointMethod.getGenericReturnType()); + ReflectionUtil.FullTypeInfo fullRpcEndpointValueTypeInfo = ReflectionUtil.getFullTemplateType(endpointMethod.getGenericReturnType(), 0); // check if we have the same future value types - if (valueClass != null && rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) { - return false; + if (fullValueTypeInfo != null && fullRpcEndpointValueTypeInfo != null) { + Iterator<Class<?>> valueClasses = fullValueTypeInfo.getClazzIterator(); + Iterator<Class<?>> rpcClasses = fullRpcEndpointValueTypeInfo.getClazzIterator(); + + while (valueClasses.hasNext() && rpcClasses.hasNext()) { + if (!checkType(valueClasses.next(), rpcClasses.next())) { + return false; + } + } + + // both should be empty + return !valueClasses.hasNext() && !rpcClasses.hasNext(); } } else { - if (valueClass != null && !checkType(valueClass, endpointMethod.getReturnType())) { + if (fullValueTypeInfo != null && !checkType(fullValueTypeInfo.getClazz(), endpointMethod.getReturnType())) { return false; } } @@ -342,16 +353,16 @@ public class RpcCompletenessTest extends TestLogger { if (method.getReturnType().equals(Void.TYPE)) { builder.append("void").append(" "); } else if (method.getReturnType().equals(futureClass)) { - Class<?> valueClass = ReflectionUtil.getTemplateType1(method.getGenericReturnType()); + ReflectionUtil.FullTypeInfo fullTypeInfo = ReflectionUtil.getFullTemplateType(method.getGenericReturnType(), 0); builder .append(futureClass.getSimpleName()) .append("<") - .append(valueClass != null ? valueClass.getSimpleName() : "") + .append(fullTypeInfo != null ? fullTypeInfo.toString() : "") .append(">"); - if (valueClass != null) { - builder.append("/").append(valueClass.getSimpleName()); + if (fullTypeInfo != null) { + builder.append("/").append(fullTypeInfo); } builder.append(" "); http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index baae251..23c6833 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -18,21 +18,45 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.NonHaServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; @@ -40,19 +64,42 @@ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; +import org.apache.flink.runtime.taskexecutor.slot.TimerService; +import org.apache.flink.runtime.taskmanager.CheckpointResponder; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; +import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Matchers; import org.powermock.api.mockito.PowerMockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import static org.hamcrest.Matchers.contains; public class TaskExecutorTest extends TestLogger { + @Rule + public TestName name = new TestName(); + + @Test public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception { final ResourceID resourceID = ResourceID.generate(); @@ -85,6 +132,8 @@ public class TaskExecutorTest extends TestLogger { mock(BroadcastVariableManager.class), mock(FileCache.class), mock(TaskSlotTable.class), + mock(JobManagerTable.class), + mock(JobLeaderService.class), mock(FatalErrorHandler.class)); taskManager.start(); @@ -142,6 +191,8 @@ public class TaskExecutorTest extends TestLogger { mock(BroadcastVariableManager.class), mock(FileCache.class), mock(TaskSlotTable.class), + mock(JobManagerTable.class), + mock(JobLeaderService.class), mock(FatalErrorHandler.class)); taskManager.start(); @@ -173,17 +224,385 @@ public class TaskExecutorTest extends TestLogger { } /** + * Tests that we can submit a task to the TaskManager given that we've allocated a slot there. + */ + @Test(timeout = 1000L) + public void testTaskSubmission() throws Exception { + final Configuration configuration = new Configuration(); + + final TestingSerialRpcService rpc = new TestingSerialRpcService(); + final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + final JobID jobId = new JobID(); + final AllocationID allocationId = new AllocationID(); + final UUID jobManagerLeaderId = UUID.randomUUID(); + final JobVertexID jobVertexId = new JobVertexID(); + + final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( + jobId, + allocationId, + name.getMethodName(), + jobVertexId, + new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "test task", + 1, + 0, + 1, + 0, + configuration, + configuration, + TestInvokable.class.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + 0); + + final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class); + when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader()); + + final JobManagerConnection jobManagerConnection = new JobManagerConnection( + mock(JobMasterGateway.class), + jobManagerLeaderId, + mock(TaskManagerActions.class), + mock(CheckpointResponder.class), + libraryCacheManager, + mock(ResultPartitionConsumableNotifier.class), + mock(PartitionStateChecker.class)); + + final JobManagerTable jobManagerTable = new JobManagerTable(); + jobManagerTable.put(jobId, jobManagerConnection); + + final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class); + when(taskSlotTable.existsActiveSlot(eq(jobId), eq(allocationId))).thenReturn(true); + when(taskSlotTable.addTask(any(Task.class))).thenReturn(true); + + final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); + + when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class)); + + final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class); + + when(taskManagerMetricGroup.addTaskForJob(eq(tdd))).thenReturn(mock(TaskMetricGroup.class)); + + final HighAvailabilityServices haServices = mock(HighAvailabilityServices.class); + when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class)); + + try { + + TaskExecutor taskManager = new TaskExecutor( + taskManagerConfiguration, + mock(TaskManagerLocation.class), + rpc, + mock(MemoryManager.class), + mock(IOManager.class), + networkEnvironment, + haServices, + mock(MetricRegistry.class), + taskManagerMetricGroup, + mock(BroadcastVariableManager.class), + mock(FileCache.class), + taskSlotTable, + jobManagerTable, + mock(JobLeaderService.class), + mock(FatalErrorHandler.class)); + + taskManager.start(); + + taskManager.submitTask(tdd, jobManagerLeaderId); + + Future<Boolean> completionFuture = TestInvokable.completableFuture; + + completionFuture.get(); + + } finally { + rpc.stopService(); + } + } + + /** + * Test invokable which completes the given future when executed. + */ + public static class TestInvokable extends AbstractInvokable { + + static final CompletableFuture<Boolean> completableFuture = new FlinkCompletableFuture<>(); + + @Override + public void invoke() throws Exception { + completableFuture.complete(true); + } + } + + /** + * Tests that a TaskManager detects a job leader for which has reserved slots. Upon detecting + * the job leader, it will offer all reserved slots to the JobManager. + */ + @Test + public void testJobLeaderDetection() throws TestingFatalErrorHandler.TestingException, SlotAllocationException { + final JobID jobId = new JobID(); + + final TestingSerialRpcService rpc = new TestingSerialRpcService(); + final Configuration configuration = new Configuration(); + final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + final ResourceID resourceId = new ResourceID("foobar"); + final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234); + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + final TimerService<AllocationID> timerService = mock(TimerService.class); + final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService); + final JobManagerTable jobManagerTable = new JobManagerTable(); + final JobLeaderService jobLeaderService = new JobLeaderService(resourceId); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + + final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(); + final TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(); + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService); + haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService); + + final String resourceManagerAddress = "rm"; + final UUID resourceManagerLeaderId = UUID.randomUUID(); + + final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); + final InstanceID registrationId = new InstanceID(); + + when(resourceManagerGateway.registerTaskExecutor( + eq(resourceManagerLeaderId), + any(String.class), + eq(resourceId), + any(SlotReport.class), + any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L))); + + final String jobManagerAddress = "jm"; + final UUID jobManagerLeaderId = UUID.randomUUID(); + final ResourceID jmResourceId = new ResourceID(jobManagerAddress); + final int blobPort = 42; + + final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); + + when(jobMasterGateway.registerTaskManager( + any(String.class), + eq(resourceId), + eq(jobManagerLeaderId), + any(Time.class) + )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); + when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); + + rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); + rpc.registerGateway(jobManagerAddress, jobMasterGateway); + + final AllocationID allocationId = new AllocationID(); + final SlotID slotId = new SlotID(resourceId, 0); + + try { + TaskExecutor taskManager = new TaskExecutor( + taskManagerConfiguration, + taskManagerLocation, + rpc, + mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices, + mock(MetricRegistry.class), + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), + taskSlotTable, + jobManagerTable, + jobLeaderService, + testingFatalErrorHandler); + + taskManager.start(); + + // tell the task manager about the rm leader + resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerLeaderId); + + // request slots from the task manager under the given allocation id + TMSlotRequestReply reply = taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId); + + // this is hopefully successful :-) + assertTrue(reply instanceof TMSlotRequestRegistered); + + // now inform the task manager about the new job leader + jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId); + + // the job leader should get the allocation id offered + verify(jobMasterGateway).offerSlots((Iterable<AllocationID>)Matchers.argThat(contains(allocationId)), eq(jobManagerLeaderId), any(Time.class)); + } finally { + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowException(); + + rpc.stopService(); + } + } + + /** + * Tests that accepted slots go into state assigned and the others are returned to the resource + * manager. + */ + @Test + public void testSlotAcceptance() throws Exception { + final JobID jobId = new JobID(); + + final TestingSerialRpcService rpc = new TestingSerialRpcService(); + final Configuration configuration = new Configuration(); + final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + final ResourceID resourceId = new ResourceID("foobar"); + final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234); + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + final TimerService<AllocationID> timerService = mock(TimerService.class); + final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService); + final JobManagerTable jobManagerTable = new JobManagerTable(); + final JobLeaderService jobLeaderService = new JobLeaderService(resourceId); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + + final String resourceManagerAddress = "rm"; + final UUID resourceManagerLeaderId = UUID.randomUUID(); + + final String jobManagerAddress = "jm"; + final UUID jobManagerLeaderId = UUID.randomUUID(); + + final LeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId); + final LeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId); + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService); + haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService); + + final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); + final InstanceID registrationId = new InstanceID(); + + when(resourceManagerGateway.registerTaskExecutor( + eq(resourceManagerLeaderId), + any(String.class), + eq(resourceId), + any(SlotReport.class), + any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L))); + + final ResourceID jmResourceId = new ResourceID(jobManagerAddress); + final int blobPort = 42; + + final AllocationID allocationId1 = new AllocationID(); + final AllocationID allocationId2 = new AllocationID(); + + final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); + + when(jobMasterGateway.registerTaskManager( + any(String.class), + eq(resourceId), + eq(jobManagerLeaderId), + any(Time.class) + )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); + when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); + + when(jobMasterGateway.offerSlots(any(Iterable.class), eq(jobManagerLeaderId), any(Time.class))) + .thenReturn(FlinkCompletableFuture.completed((Iterable<AllocationID>)Collections.singleton(allocationId1))); + + rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); + rpc.registerGateway(jobManagerAddress, jobMasterGateway); + + try { + TaskExecutor taskManager = new TaskExecutor( + taskManagerConfiguration, + taskManagerLocation, + rpc, + mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices, + mock(MetricRegistry.class), + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), + taskSlotTable, + jobManagerTable, + jobLeaderService, + testingFatalErrorHandler); + + taskManager.start(); + + taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L)); + taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds(10000L)); + + // we have to add the job after the TaskExecutor, because otherwise the service has not + // been properly started. + jobLeaderService.addJob(jobId, jobManagerAddress); + + verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1))); + + assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1)); + assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2)); + assertTrue(taskSlotTable.isSlotFree(1)); + } finally { + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowException(); + + rpc.stopService(); + } + } + + private static class TestingFatalErrorHandler implements FatalErrorHandler { + private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); + private final AtomicReference<Throwable> atomicThrowable; + + public TestingFatalErrorHandler() { + atomicThrowable = new AtomicReference<>(null); + } + + public void rethrowException() throws TestingException { + Throwable throwable = atomicThrowable.get(); + + if (throwable != null) { + throw new TestingException(throwable); + } + } + + public boolean hasExceptionOccurred() { + return atomicThrowable.get() != null; + } + + public Throwable getException() { + return atomicThrowable.get(); + } + + @Override + public void onFatalError(Throwable exception) { + LOG.error("OnFatalError:", exception); + atomicThrowable.compareAndSet(null, exception); + } + + //------------------------------------------------------------------ + // static utility classes + //------------------------------------------------------------------ + + private static final class TestingException extends Exception { + public TestingException(String message) { + super(message); + } + + public TestingException(String message, Throwable cause) { + super(message, cause); + } + + public TestingException(Throwable cause) { + super(cause); + } + + private static final long serialVersionUID = -4648195335470914498L; + } + } + + /** * Tests that all allocation requests for slots are ignored if the slot has been reported as * free by the TaskExecutor but this report hasn't been confirmed by the ResourceManager. * * This is essential for the correctness of the state of the ResourceManager. */ + @Ignore @Test - public void testRejectAllocationRequestsForOutOfSyncSlots() { + public void testRejectAllocationRequestsForOutOfSyncSlots() throws SlotAllocationException { final ResourceID resourceID = ResourceID.generate(); final String address1 = "/resource/manager/address/one"; final UUID leaderId = UUID.randomUUID(); + final JobID jobId = new JobID(); + final String jobManagerAddress = "foobar"; final TestingSerialRpcService rpc = new TestingSerialRpcService(); try { @@ -215,6 +634,8 @@ public class TaskExecutorTest extends TestLogger { mock(BroadcastVariableManager.class), mock(FileCache.class), mock(TaskSlotTable.class), + mock(JobManagerTable.class), + mock(JobLeaderService.class), mock(FatalErrorHandler.class)); taskManager.start(); @@ -232,14 +653,14 @@ public class TaskExecutorTest extends TestLogger { // test that allocating a slot works final SlotID slotID = new SlotID(resourceID, 0); - TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, new AllocationID(), leaderId); + TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId); assertTrue(tmSlotRequestReply instanceof TMSlotRequestRegistered); + // TODO: Figure out the concrete allocation behaviour between RM and TM. Maybe we don't need the SlotID... // test that we can't allocate slots which are blacklisted due to pending confirmation of the RM final SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1); - taskManager.addUnconfirmedFreeSlotNotification(unconfirmedFreeSlotID); TMSlotRequestReply tmSlotRequestReply2 = - taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId); + taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId); assertTrue(tmSlotRequestReply2 instanceof TMSlotRequestRejected); // re-register @@ -250,7 +671,7 @@ public class TaskExecutorTest extends TestLogger { // now we should be successful because the slots status has been synced // test that we can't allocate slots which are blacklisted due to pending confirmation of the RM TMSlotRequestReply tmSlotRequestReply3 = - taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId); + taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId); assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered); } http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index eb7f3c5..e027d78 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -159,7 +160,7 @@ public class TaskAsyncCallTest { .thenReturn(mock(TaskKvStateRegistry.class)); TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), + new JobID(), new AllocationID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue<>(new ExecutionConfig()), "Test Task", 1, 0, 1, 0, new Configuration(), new Configuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index d4efd24..d1909fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -163,12 +164,13 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid = new ExecutionAttemptID(); final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig()); - final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig, - "TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(), - TestInvokableCorrect.class.getName(), - Collections.<ResultPartitionDeploymentDescriptor>emptyList(), - Collections.<InputGateDeploymentDescriptor>emptyList(), - new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); + final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, new AllocationID(), + "TestJob", vid, eid, executionConfig, + "TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(), + TestInvokableCorrect.class.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); new Within(d) { @@ -265,7 +267,7 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid2 = new ExecutionAttemptID(); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( - jid1, "TestJob1", vid1, eid1, + jid1, new AllocationID(), "TestJob1", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), @@ -274,7 +276,7 @@ public class TaskManagerTest extends TestLogger { new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( - jid2, "TestJob2", vid2, eid2, + jid2, new AllocationID(), "TestJob2", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), @@ -404,13 +406,13 @@ public class TaskManagerTest extends TestLogger { final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig()); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, "TestJob", vid1, eid1, executionConfig, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, new AllocationID(), "TestJob", vid1, eid1, executionConfig, "TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, "TestJob", vid2, eid2, executionConfig, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, new AllocationID(), "TestJob", vid2, eid2, executionConfig, "TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), @@ -531,7 +533,7 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid2 = new ExecutionAttemptID(); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( - jid, "TestJob", vid1, eid1, + jid, new AllocationID(), "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), @@ -540,7 +542,7 @@ public class TaskManagerTest extends TestLogger { new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( - jid, "TestJob", vid2, eid2, + jid, new AllocationID(), "TestJob", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), @@ -636,7 +638,7 @@ public class TaskManagerTest extends TestLogger { ); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( - jid, "TestJob", vid1, eid1, + jid, new AllocationID(), "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), @@ -644,7 +646,7 @@ public class TaskManagerTest extends TestLogger { Collections.<URL>emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( - jid, "TestJob", vid2, eid2, + jid, new AllocationID(), "TestJob", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), @@ -781,7 +783,7 @@ public class TaskManagerTest extends TestLogger { ); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( - jid, "TestJob", vid1, eid1, + jid, new AllocationID(), "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), @@ -789,7 +791,7 @@ public class TaskManagerTest extends TestLogger { new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( - jid, "TestJob", vid2, eid2, + jid, new AllocationID(), "TestJob", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(), @@ -929,7 +931,7 @@ public class TaskManagerTest extends TestLogger { new InputGateDeploymentDescriptor(resultId, 0, icdd); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - jid, "TestJob", vid, eid, + jid, new AllocationID(), "TestJob", vid, eid, new SerializedValue<>(new ExecutionConfig()), "Receiver", 1, 0, 1, 0, new Configuration(), new Configuration(), @@ -1022,7 +1024,7 @@ public class TaskManagerTest extends TestLogger { new InputGateDeploymentDescriptor(resultId, 0, icdd); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - jid, "TestJob", vid, eid, + jid, new AllocationID(), "TestJob", vid, eid, new SerializedValue<>(new ExecutionConfig()), "Receiver", 1, 0, 1, 0, new Configuration(), new Configuration(), @@ -1097,24 +1099,25 @@ public class TaskManagerTest extends TestLogger { // Single blocking task final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - new JobID(), - "Job", - new JobVertexID(), - new ExecutionAttemptID(), - new SerializedValue<>(new ExecutionConfig()), - "Task", - 1, - 0, - 1, - 0, - new Configuration(), - new Configuration(), - Tasks.BlockingNoOpInvokable.class.getName(), - Collections.<ResultPartitionDeploymentDescriptor>emptyList(), - Collections.<InputGateDeploymentDescriptor>emptyList(), - Collections.<BlobKey>emptyList(), - Collections.<URL>emptyList(), - 0); + new JobID(), + new AllocationID(), + "Job", + new JobVertexID(), + new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "Task", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + Tasks.BlockingNoOpInvokable.class.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + 0); // Submit the task new Within(d) { http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index 5d3eb3a..7d466f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -69,7 +69,7 @@ public class TaskStopTest { when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class)); when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class)); when(tddMock.getInvokableClassName()).thenReturn("className"); - when(tddMock.getAllocationID()).thenReturn(mock(AllocationID.class)); + when(tddMock.getAllocationId()).thenReturn(mock(AllocationID.class)); task = new Task( tddMock, http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 50fc181..c5a9f2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -665,16 +666,18 @@ public class TaskTest { } return new TaskDeploymentDescriptor( - new JobID(), "Test Job", new JobVertexID(), new ExecutionAttemptID(), - execConfig, - "Test Task", 1, 0, 1, 0, - new Configuration(), new Configuration(), - invokable.getName(), - Collections.<ResultPartitionDeploymentDescriptor>emptyList(), - Collections.<InputGateDeploymentDescriptor>emptyList(), - Collections.<BlobKey>emptyList(), - Collections.<URL>emptyList(), - 0); + new JobID(), + new AllocationID(), + "Test Job", new JobVertexID(), new ExecutionAttemptID(), + execConfig, + "Test Task", 1, 0, 1, 0, + new Configuration(), new Configuration(), + invokable.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + 0); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index fb1b3b3..e84f594 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -129,24 +130,25 @@ public class InterruptSensitiveRestoreTest { List<Collection<OperatorStateHandle>> partitionableOperatorState = Collections.emptyList(); return new TaskDeploymentDescriptor( - new JobID(), - "test job name", - new JobVertexID(), - new ExecutionAttemptID(), - new SerializedValue<>(new ExecutionConfig()), - "test task name", - 1, 0, 1, 0, - new Configuration(), - taskConfig, - SourceStreamTask.class.getName(), - Collections.<ResultPartitionDeploymentDescriptor>emptyList(), - Collections.<InputGateDeploymentDescriptor>emptyList(), - Collections.<BlobKey>emptyList(), - Collections.<URL>emptyList(), - 0, - operatorState, - keyGroupState, - partitionableOperatorState); + new JobID(), + new AllocationID(), + "test job name", + new JobVertexID(), + new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "test task name", + 1, 0, 1, 0, + new Configuration(), + taskConfig, + SourceStreamTask.class.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + 0, + operatorState, + keyGroupState, + partitionableOperatorState); } private static Task createTask(TaskDeploymentDescriptor tdd) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 9548595..abbf10c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -220,17 +221,19 @@ public class StreamTaskTest { .thenReturn(mock(TaskKvStateRegistry.class)); TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), - new SerializedValue<>(new ExecutionConfig()), - "Test Task", 1, 0, 1, 0, - new Configuration(), - taskConfig.getConfiguration(), - invokable.getName(), - Collections.<ResultPartitionDeploymentDescriptor>emptyList(), - Collections.<InputGateDeploymentDescriptor>emptyList(), - Collections.<BlobKey>emptyList(), - Collections.<URL>emptyList(), - 0); + new JobID(), + new AllocationID(), + "Job Name", new JobVertexID(), new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "Test Task", 1, 0, 1, 0, + new Configuration(), + taskConfig.getConfiguration(), + invokable.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + 0); return new Task( tdd,