[ 
https://issues.apache.org/jira/browse/FLINK-9427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489119#comment-16489119
 ] 

ASF GitHub Bot commented on FLINK-9427:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190611399
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 ---
    @@ -1483,6 +1485,216 @@ public void 
testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
                }
        }
     
    +   /**
    +    * Tests that we ignore slot requests if the TaskExecutor is not
    +    * registered at a ResourceManager.
    +    */
    +   @Test
    +   public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
    +           final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +           final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
    +
    +           final TaskExecutor taskExecutor = 
createTaskExecutor(taskManagerServices);
    +
    +           taskExecutor.start();
    +
    +           try {
    +                   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
    +
    +                   final CompletableFuture<RegistrationResponse> 
registrationFuture = new CompletableFuture<>();
    +                   final CompletableFuture<ResourceID> 
taskExecutorResourceIdFuture = new CompletableFuture<>();
    +
    +                   
testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5
 -> {
    +                
taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    +                return registrationFuture;
    +            });
    +
    +                   
rpc.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
    +                   
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
    +
    +                   final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    +
    +                   final ResourceID resourceId = 
taskExecutorResourceIdFuture.get();
    +
    +                   final SlotID slotId = new SlotID(resourceId, 0);
    +                   final CompletableFuture<Acknowledge> 
slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new 
AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), 
timeout);
    +
    +                   try {
    +                           slotRequestResponse.get();
    +                           fail("We should not be able to request slots 
before the TaskExecutor is registered at the ResourceManager.");
    +                   } catch (ExecutionException ee) {
    +                           
assertThat(ExceptionUtils.stripExecutionException(ee), 
instanceOf(TaskManagerException.class));
    +                   }
    +           } finally {
    +                   RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
    +           }
    +   }
    +
    +   /**
    +    * Tests that the TaskExecutor tries to reconnect to a ResourceManager 
from which it
    +    * was explicitly disconnected.
    +    */
    +   @Test
    +   public void testReconnectionAttemptIfExplicitlyDisconnected() throws 
Exception {
    +           final long heartbeatInterval = 1000L;
    +           final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +           final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
    +           final TaskExecutor taskExecutor = new TaskExecutor(
    +                   rpc,
    +                   
TaskManagerConfiguration.fromConfiguration(configuration),
    +                   haServices,
    +                   new TaskManagerServicesBuilder()
    +                           .setTaskSlotTable(taskSlotTable)
    +                           .setTaskManagerLocation(taskManagerLocation)
    +                           .build(),
    +                   new HeartbeatServices(heartbeatInterval, 1000L),
    +                   
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
    +                   dummyBlobCacheService,
    +                   testingFatalErrorHandler);
    +
    +           taskExecutor.start();
    +
    +           try {
    +                   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
    +                   final ClusterInformation clusterInformation = new 
ClusterInformation("foobar", 1234);
    +                   final CompletableFuture<RegistrationResponse> 
registrationResponseFuture = CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 
heartbeatInterval, clusterInformation));
    +                   final BlockingQueue<ResourceID> registrationQueue = new 
ArrayBlockingQueue<>(1);
    +
    +                   
testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5
 -> {
    +                
registrationQueue.offer(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    +                return registrationResponseFuture;
    +            });
    +                   
rpc.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
    +
    +                   
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
    +
    +                   final ResourceID firstRegistrationAttempt = 
registrationQueue.take();
    +
    +                   assertThat(firstRegistrationAttempt, 
equalTo(taskManagerLocation.getResourceID()));
    +
    +                   final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    +
    +                   assertThat(registrationQueue.isEmpty(), is(true));
    --- End diff --
    
    good point. Will change it.


> Cannot download from BlobServer, because the server address is unknown.
> -----------------------------------------------------------------------
>
>                 Key: FLINK-9427
>                 URL: https://issues.apache.org/jira/browse/FLINK-9427
>             Project: Flink
>          Issue Type: Bug
>          Components: TaskManager
>    Affects Versions: 1.5.0, 1.6.0
>            Reporter: Piotr Nowojski
>            Assignee: Till Rohrmann
>            Priority: Blocker
>             Fix For: 1.5.0
>
>         Attachments: failure
>
>
> Setup: 6 + 1 nodes EMR cluster with m4.4xlarge instances
> Job submission fails in most cases (but not all of them):
> {noformat}
> [hadoop@ip-172-31-28-17 flink-1.5.0]$ HADOOP_CONF_DIR=/etc/hadoop/conf 
> ./bin/flink run -m yarn-cluster -p 80 -yn 80 examples/batch/WordCount.jar 
> --input hdfs:///user/hadoop/enwiki-latest-abstract.xml --output 
> hdfs:///user/hadoop/output
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/hadoop/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-05-23 15:07:46,062 INFO  org.apache.hadoop.yarn.client.RMProxy           
>               - Connecting to ResourceManager at 
> ip-172-31-28-17.eu-central-1.compute.internal/172.31.28.17:8032
> 2018-05-23 15:07:46,179 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>               - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-05-23 15:07:46,179 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>               - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-05-23 15:07:46,339 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster 
> specification: ClusterSpecification{masterMemoryMB=1024, 
> taskManagerMemoryMB=4096, numberTaskManagers=80, slotsPerTaskManager=1}
> 2018-05-23 15:07:46,596 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The 
> configuration directory ('/home/hadoop/flink-1.5.0/conf') contains both LOG4J 
> and Logback configuration files. Please delete or rename one of them.
> 2018-05-23 15:07:47,318 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting 
> application master application_1526561166266_0049
> 2018-05-23 15:07:47,336 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted 
> application application_1526561166266_0049
> 2018-05-23 15:07:47,337 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for 
> the cluster to be allocated
> 2018-05-23 15:07:47,338 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying 
> cluster, current state ACCEPTED
> 2018-05-23 15:07:51,101 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN 
> application has been deployed successfully.
> Starting execution of program
> ------------------------------------------------------------
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: 
> java.io.IOException: Cannot download from BlobServer, because the server 
> address is unknown.
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:264)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452)
> at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:86)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
> Caused by: java.io.IOException: Cannot download from BlobServer, because the 
> server address is unknown.
> at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:192)
> at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206)
> at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
> at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:863)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
> at java.lang.Thread.run(Thread.java:748){noformat}
> See attached full application log



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to