Exception in using updateStateByKey

2015-04-27 Thread Sea
Hi, all:
I use function updateStateByKey in Spark Streaming, I need to store the states 
for one minite,  I set spark.cleaner.ttl to 120, the duration is 2 seconds, 
but it throws Exception 




Caused by: 
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does 
not exist: spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)


at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)



Why?


my code is 


ssc = StreamingContext(sc,2)
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1})
kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\
.filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \
.filter(lambda x: x[1]['isExisted'] != 1) \
.foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))

Re: Exception in using updateStateByKey

2015-04-27 Thread Ted Yu
Which hadoop release are you using ?

Can you check hdfs audit log to see who / when deleted spark/ck/hdfsaudit/
receivedData/0/log-1430139541443-1430139601443 ?

Cheers

On Mon, Apr 27, 2015 at 6:21 AM, Sea 261810...@qq.com wrote:

 Hi, all:
 I use function updateStateByKey in Spark Streaming, I need to store the
 states for one minite,  I set spark.cleaner.ttl to 120, the duration is 2
 seconds, but it throws Exception


 Caused by:
 org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File
 does not exist:
 spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443
 at
 org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
 at
 org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51)
 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499)
 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448)
 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428)
 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402)
 at
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468)
 at
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269)
 at
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566)
 at
 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
 at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
 at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:396)
 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

 at org.apache.hadoop.ipc.Client.call(Client.java:1347)
 at org.apache.hadoop.ipc.Client.call(Client.java:1300)
 at
 org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
 at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
 at
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188)
 at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)

 Why?

 my code is

 ssc = StreamingContext(sc,2)
 kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1})
 kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\
 .filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \
 .filter(lambda x: x[1]['isExisted'] != 1) \
 .foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))





?????? Exception in using updateStateByKey

2015-04-27 Thread Sea
I make it to 240, it happens again when 240 seconds is reached.




--  --
??: 261810726;261810...@qq.com;
: 2015??4??27??(??) 10:24
??: Ted Yuyuzhih...@gmail.com; 

: ?? Exception in using updateStateByKey



Yes??I can make it larger, but I also want to know whether there is a formula 
to estimate it




--  --
??: Ted Yu;yuzhih...@gmail.com;
: 2015??4??27??(??) 10:20
??: Sea261810...@qq.com; 

: Re: Exception in using updateStateByKey



Can you make the value for spark.cleaner.ttl larger ?Cheers


On Mon, Apr 27, 2015 at 7:13 AM, Sea 261810...@qq.com wrote:
my hadoop version is 2.2.0?? the hdfs-audit.log is too large?? The problem is 
that?? when  the checkpoint info is deleted(it depends on  
??spark.cleaner.ttl??)??it will throw this exception??
 



-  --
??: Ted Yu;yuzhih...@gmail.com;
: 2015??4??27??(??) 9:55
??: Sea261810...@qq.com; 
: useruser@spark.apache.org; 
: Re: Exception in using updateStateByKey



Which hadoop release are you using ?

Can you check hdfs audit log to see who / when deleted 
spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443 ?


Cheers


On Mon, Apr 27, 2015 at 6:21 AM, Sea 261810...@qq.com wrote:
Hi, all:
I use function updateStateByKey in Spark Streaming, I need to store the states 
for one minite,  I set spark.cleaner.ttl to 120, the duration is 2 seconds, 
but it throws Exception 




Caused by: 
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does 
not exist: spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)


at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)



Why?


my code is 


ssc = StreamingContext(sc,2)
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1})
kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\
.filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \
.filter(lambda x: x[1]['isExisted'] != 1) \
.foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))

?????? Exception in using updateStateByKey

2015-04-27 Thread Sea
Maybe I found the solution??do not set 'spark.cleaner.ttl', just use function 
'remember' in StreamingContext to set the rememberDuration. 




--  --
??: Ted Yu;yuzhih...@gmail.com;
: 2015??4??27??(??) 10:20
??: Sea261810...@qq.com; 

: Re: Exception in using updateStateByKey



Can you make the value for spark.cleaner.ttl larger ?Cheers


On Mon, Apr 27, 2015 at 7:13 AM, Sea 261810...@qq.com wrote:
my hadoop version is 2.2.0?? the hdfs-audit.log is too large?? The problem is 
that?? when  the checkpoint info is deleted(it depends on  
??spark.cleaner.ttl??)??it will throw this exception??
 



-  --
??: Ted Yu;yuzhih...@gmail.com;
: 2015??4??27??(??) 9:55
??: Sea261810...@qq.com; 
: useruser@spark.apache.org; 
: Re: Exception in using updateStateByKey



Which hadoop release are you using ?

Can you check hdfs audit log to see who / when deleted 
spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443 ?


Cheers


On Mon, Apr 27, 2015 at 6:21 AM, Sea 261810...@qq.com wrote:
Hi, all:
I use function updateStateByKey in Spark Streaming, I need to store the states 
for one minite,  I set spark.cleaner.ttl to 120, the duration is 2 seconds, 
but it throws Exception 




Caused by: 
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does 
not exist: spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)


at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)



Why?


my code is 


ssc = StreamingContext(sc,2)
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1})
kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\
.filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \
.filter(lambda x: x[1]['isExisted'] != 1) \
.foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))