[
https://issues.apache.org/jira/browse/FLINK-9427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489121#comment-16489121
]
ASF GitHub Bot commented on FLINK-9427:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6067#discussion_r190611660
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
---
@@ -1483,6 +1485,216 @@ public void
testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
}
}
+ /**
+ * Tests that we ignore slot requests if the TaskExecutor is not
+ * registered at a ResourceManager.
+ */
+ @Test
+ public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
+ final TaskSlotTable taskSlotTable = new
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+ final TaskManagerServices taskManagerServices = new
TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
+
+ final TaskExecutor taskExecutor =
createTaskExecutor(taskManagerServices);
+
+ taskExecutor.start();
+
+ try {
+ final TestingResourceManagerGateway
testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+ final CompletableFuture<RegistrationResponse>
registrationFuture = new CompletableFuture<>();
+ final CompletableFuture<ResourceID>
taskExecutorResourceIdFuture = new CompletableFuture<>();
+
+
testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5
-> {
+
taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
+ return registrationFuture;
+ });
+
+
rpc.registerGateway(testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway);
+
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway.getFencingToken().toUUID());
+
+ final TaskExecutorGateway taskExecutorGateway =
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+ final ResourceID resourceId =
taskExecutorResourceIdFuture.get();
+
+ final SlotID slotId = new SlotID(resourceId, 0);
+ final CompletableFuture<Acknowledge>
slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new
AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(),
timeout);
+
+ try {
+ slotRequestResponse.get();
+ fail("We should not be able to request slots
before the TaskExecutor is registered at the ResourceManager.");
+ } catch (ExecutionException ee) {
+
assertThat(ExceptionUtils.stripExecutionException(ee),
instanceOf(TaskManagerException.class));
+ }
+ } finally {
+ RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+ }
+ }
+
+ /**
+ * Tests that the TaskExecutor tries to reconnect to a ResourceManager
from which it
+ * was explicitly disconnected.
+ */
+ @Test
+ public void testReconnectionAttemptIfExplicitlyDisconnected() throws
Exception {
+ final long heartbeatInterval = 1000L;
+ final TaskSlotTable taskSlotTable = new
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+ final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ rpc,
+
TaskManagerConfiguration.fromConfiguration(configuration),
+ haServices,
+ new TaskManagerServicesBuilder()
+ .setTaskSlotTable(taskSlotTable)
+ .setTaskManagerLocation(taskManagerLocation)
+ .build(),
+ new HeartbeatServices(heartbeatInterval, 1000L),
+
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+ dummyBlobCacheService,
+ testingFatalErrorHandler);
+
+ taskExecutor.start();
+
+ try {
+ final TestingResourceManagerGateway
testingResourceManagerGateway = new TestingResourceManagerGateway();
+ final ClusterInformation clusterInformation = new
ClusterInformation("foobar", 1234);
+ final CompletableFuture<RegistrationResponse>
registrationResponseFuture = CompletableFuture.completedFuture(new
TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(),
heartbeatInterval, clusterInformation));
+ final BlockingQueue<ResourceID> registrationQueue = new
ArrayBlockingQueue<>(1);
+
+
testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5
-> {
+
registrationQueue.offer(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
--- End diff --
Arrg, the same.
> Cannot download from BlobServer, because the server address is unknown.
> -----------------------------------------------------------------------
>
> Key: FLINK-9427
> URL: https://issues.apache.org/jira/browse/FLINK-9427
> Project: Flink
> Issue Type: Bug
> Components: TaskManager
> Affects Versions: 1.5.0, 1.6.0
> Reporter: Piotr Nowojski
> Assignee: Till Rohrmann
> Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: failure
>
>
> Setup: 6 + 1 nodes EMR cluster with m4.4xlarge instances
> Job submission fails in most cases (but not all of them):
> {noformat}
> [hadoop@ip-172-31-28-17 flink-1.5.0]$ HADOOP_CONF_DIR=/etc/hadoop/conf
> ./bin/flink run -m yarn-cluster -p 80 -yn 80 examples/batch/WordCount.jar
> --input hdfs:///user/hadoop/enwiki-latest-abstract.xml --output
> hdfs:///user/hadoop/output
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/home/hadoop/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-05-23 15:07:46,062 INFO org.apache.hadoop.yarn.client.RMProxy
> - Connecting to ResourceManager at
> ip-172-31-28-17.eu-central-1.compute.internal/172.31.28.17:8032
> 2018-05-23 15:07:46,179 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli
> - No path for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-05-23 15:07:46,179 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli
> - No path for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-05-23 15:07:46,339 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster
> specification: ClusterSpecification{masterMemoryMB=1024,
> taskManagerMemoryMB=4096, numberTaskManagers=80, slotsPerTaskManager=1}
> 2018-05-23 15:07:46,596 WARN
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The
> configuration directory ('/home/hadoop/flink-1.5.0/conf') contains both LOG4J
> and Logback configuration files. Please delete or rename one of them.
> 2018-05-23 15:07:47,318 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting
> application master application_1526561166266_0049
> 2018-05-23 15:07:47,336 INFO
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
> application application_1526561166266_0049
> 2018-05-23 15:07:47,337 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for
> the cluster to be allocated
> 2018-05-23 15:07:47,338 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying
> cluster, current state ACCEPTED
> 2018-05-23 15:07:51,101 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN
> application has been deployed successfully.
> Starting execution of program
> ------------------------------------------------------------
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException:
> java.io.IOException: Cannot download from BlobServer, because the server
> address is unknown.
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:264)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:86)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
> Caused by: java.io.IOException: Cannot download from BlobServer, because the
> server address is unknown.
> at
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:192)
> at
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206)
> at
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
> at
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:863)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
> at java.lang.Thread.run(Thread.java:748){noformat}
> See attached full application log
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)