[
https://issues.apache.org/jira/browse/FLINK-1922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517515#comment-14517515
]
ASF GitHub Bot commented on FLINK-1922:
---------------------------------------
Github user hsaputra commented on a diff in the pull request:
https://github.com/apache/flink/pull/631#discussion_r29268270
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.TimeUnit;
+
+public class TaskInputSplitProviderTest {
+
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ system = ActorSystem.create("TestActorSystem",
TestingUtils.testConfig());
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ @Test
+ public void testRequestNextInputSplitWithInvalidExecutionID() {
+
+ final JobID jobID = new JobID();
+ final JobVertexID vertexID = new JobVertexID();
+ final ExecutionAttemptID executionID = new ExecutionAttemptID();
+ final Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
+
+ final ActorRef jobManagerRef =
system.actorOf(Props.create(NullInputSplitJobManager.class));
+
+ final TaskInputSplitProvider provider = new
TaskInputSplitProvider(
+ jobManagerRef,
+ jobID,
+ vertexID,
+ executionID,
+ Thread.currentThread().getContextClassLoader(),
--- End diff --
This could be null. Would probably better to grab class loader via getClass
I suppose.
> Failed task deployment causes NPE on input split assignment
> -----------------------------------------------------------
>
> Key: FLINK-1922
> URL: https://issues.apache.org/jira/browse/FLINK-1922
> Project: Flink
> Issue Type: Bug
> Components: JobManager
> Affects Versions: 0.9
> Reporter: Robert Metzger
> Assignee: Till Rohrmann
>
> The input split assignment code is returning {null} if the Task has failed,
> which is causing a NPE.
> We should improve our error handling / reporting in that situation.
> {code}
> 13:12:31,002 INFO org.apache.flink.yarn.ApplicationMaster$$anonfun$2$$anon$1
> - Status of job c0b47ce41e9a85a628a628a3977705ef (Flink Java Job at Tue
> Apr 21 13:10:36 UTC 2015) changed to FAILING Cannot deploy task - TaskManager
> not responding..
> ....
> 13:12:47,591 ERROR org.apache.flink.runtime.operators.RegularPactTask
> - Error in task code: CHAIN DataSource (at userMethod
> (org.apache.flink.api.java.io.AvroInputFormat)) -> FlatMap (FlatMap at
> main(UserClass.java:111)) (20/50)
> java.lang.RuntimeException: Requesting the next InputSplit failed.
> at
> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:88)
> at
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:337)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:136)
> at
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.NullPointerException
> at java.io.ByteArrayInputStream.<init>(ByteArrayInputStream.java:106)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:301)
> at
> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:83)
> ... 4 more
> 13:12:47,595 INFO org.apache.flink.runtime.taskmanager.Task
> - CHAIN DataSource (at SomeMethod
> (org.apache.flink.api.java.io.AvroInputFormat)) -> FlatMap (FlatMap at
> main(SomeClass.java:111)) (20/50) switched to FAILED :
> java.lang.RuntimeException: Requesting the next InputSplit failed.
> at
> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:88)
> at
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:337)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:136)
> at
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.NullPointerException
> at java.io.ByteArrayInputStream.<init>(ByteArrayInputStream.java:106)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:301)
> at
> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:83)
> ... 4 more
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)