[ 
https://issues.apache.org/jira/browse/SPARK-19354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Devaraj K updated SPARK-19354:
------------------------------
    Description: 
When we enable speculation, we can see there are multiple attempts running for 
the same task when the first task progress is slow. If any of the task attempt 
succeeds then the other attempts will be killed, during killing the attempts 
those attempts are getting marked as failed due to the below error. We need to 
handle this error and mark the attempt as KILLED instead of FAILED.


||93    ||214   ||1 (speculative)       ||FAILED        ||ANY   ||1 / 
xx.xx.xx.x2
stdout
stderr||2017/01/24 10:30:44     ||0.2 s         ||||||0.0 B / 0 ||8.0 KB / 400  
||java.io.IOException: Failed on local exception: 
java.nio.channels.ClosedByInterruptException; Host Details : local host is: 
"node2/xx.xx.xx.x2"; destination host is: "node1":9000; 
+details||

{code:xml}
17/01/23 23:54:32 INFO Executor: Executor is trying to kill task 93.1 in stage 
1.0 (TID 214)
17/01/23 23:54:32 INFO FileOutputCommitter: File Output Committer Algorithm 
version is 1
17/01/23 23:54:32 ERROR Executor: Exception in task 93.1 in stage 1.0 (TID 214)
java.io.IOException: Failed on local exception: 
java.nio.channels.ClosedByInterruptException; Host Details : local host is: 
"stobdtserver3/10.224.54.70"; destination host is: "stobdtserver2":9000; 
        at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
        at org.apache.hadoop.ipc.Client.call(Client.java:1479)
        at org.apache.hadoop.ipc.Client.call(Client.java:1412)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
        at com.sun.proxy.$Proxy17.create(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy18.create(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1648)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:804)
        at 
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
        at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1133)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1124)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
        at org.apache.spark.scheduler.Task.run(Task.scala:114)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedByInterruptException
        at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
        at 
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
        at 
org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:614)
        at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:712)
        at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
        at org.apache.hadoop.ipc.Client.call(Client.java:1451)
        ... 31 more
17/01/23 23:54:33 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
{code}

  was:
When we enable speculation, we can see there are multiple attempts running for 
the same task when the first task progress is slow. If any of the task attempt 
succeeds then the other attempts will be killed, during killing the attempts 
those attempts are getting marked as failed due to the below error. We need to 
handle this error and mark the attempt as KILLED instead of FAILED.


||93    ||214   ||1 (speculative)       ||FAILED        ||ANY   ||1 / 
xx.xx.xx.x2
stdout
stderr
||2017/01/24 10:30:44   ||0.2 s         ||||||0.0 B / 0 ||8.0 KB / 400  
||java.io.IOException: Failed on local exception: 
java.nio.channels.ClosedByInterruptException; Host Details : local host is: 
"node2/xx.xx.xx.x2"; destination host is: "node1":9000; 
+details||

{code:xml}
17/01/23 23:54:32 INFO Executor: Executor is trying to kill task 93.1 in stage 
1.0 (TID 214)
17/01/23 23:54:32 INFO FileOutputCommitter: File Output Committer Algorithm 
version is 1
17/01/23 23:54:32 ERROR Executor: Exception in task 93.1 in stage 1.0 (TID 214)
java.io.IOException: Failed on local exception: 
java.nio.channels.ClosedByInterruptException; Host Details : local host is: 
"stobdtserver3/10.224.54.70"; destination host is: "stobdtserver2":9000; 
        at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
        at org.apache.hadoop.ipc.Client.call(Client.java:1479)
        at org.apache.hadoop.ipc.Client.call(Client.java:1412)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
        at com.sun.proxy.$Proxy17.create(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy18.create(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1648)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:804)
        at 
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
        at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1133)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1124)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
        at org.apache.spark.scheduler.Task.run(Task.scala:114)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedByInterruptException
        at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
        at 
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
        at 
org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:614)
        at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:712)
        at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
        at org.apache.hadoop.ipc.Client.call(Client.java:1451)
        ... 31 more
17/01/23 23:54:33 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
{code}


> Killed tasks are getting marked as FAILED
> -----------------------------------------
>
>                 Key: SPARK-19354
>                 URL: https://issues.apache.org/jira/browse/SPARK-19354
>             Project: Spark
>          Issue Type: Improvement
>          Components: Scheduler, Spark Core
>            Reporter: Devaraj K
>            Priority: Minor
>
> When we enable speculation, we can see there are multiple attempts running 
> for the same task when the first task progress is slow. If any of the task 
> attempt succeeds then the other attempts will be killed, during killing the 
> attempts those attempts are getting marked as failed due to the below error. 
> We need to handle this error and mark the attempt as KILLED instead of FAILED.
> ||93  ||214   ||1 (speculative)       ||FAILED        ||ANY   ||1 / 
> xx.xx.xx.x2
> stdout
> stderr||2017/01/24 10:30:44   ||0.2 s         ||||||0.0 B / 0 ||8.0 KB / 400  
> ||java.io.IOException: Failed on local exception: 
> java.nio.channels.ClosedByInterruptException; Host Details : local host is: 
> "node2/xx.xx.xx.x2"; destination host is: "node1":9000; 
> +details||
> {code:xml}
> 17/01/23 23:54:32 INFO Executor: Executor is trying to kill task 93.1 in 
> stage 1.0 (TID 214)
> 17/01/23 23:54:32 INFO FileOutputCommitter: File Output Committer Algorithm 
> version is 1
> 17/01/23 23:54:32 ERROR Executor: Exception in task 93.1 in stage 1.0 (TID 
> 214)
> java.io.IOException: Failed on local exception: 
> java.nio.channels.ClosedByInterruptException; Host Details : local host is: 
> "stobdtserver3/10.224.54.70"; destination host is: "stobdtserver2":9000; 
>       at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>       at com.sun.proxy.$Proxy17.create(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy18.create(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1648)
>       at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
>       at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
>       at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
>       at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:804)
>       at 
> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
>       at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1133)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1124)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
>       at org.apache.spark.scheduler.Task.run(Task.scala:114)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedByInterruptException
>       at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
>       at 
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
>       at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>       at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:614)
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:712)
>       at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
>       at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1451)
>       ... 31 more
> 17/01/23 23:54:33 INFO CoarseGrainedExecutorBackend: Driver commanded a 
> shutdown
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to