Hey Thomas, 

It is a 15 node cluster, I included the cluster summary below. If I run on 1 
partition no issues, other than it backs up a bit, but as soon as I move to 
multiple partitions I get those errors. We are using a 
AbstractFileInputOperator to read the file off of HDFS so I started with the 
Emit Batch Size property of 5, which gets us around 250 tuples/sec, and that 
fails on everything but 1 partition. On the single partition I worked it up to 
25 and didn’t see the issues, just kind of laggy.

Thanks,
Alex


Summary
Security is off.
Safemode is off.
8,670,027 files and directories, 8,835,720 blocks = 17,505,747 total filesystem 
object(s).
Heap Memory used 3.34 GB of 4.83 GB Heap Memory. 
Max Heap Memory is 4.83 GB.
Non Heap Memory used 71.04 MB of 99.31 MB 
Committed Non Heap Memory. 
Max Non Heap Memory is 130 MB.
Configured Capacity:249.97 TB
DFS Used:164.71 TB
Non DFS Used:2.07 TB
DFS Remaining:83.19 TB
DFS Used%:65.89%
DFS Remaining%:33.28%
Block Pool Used:164.71 TB
Block Pool Used%:65.89%
DataNodes usages% (Min/Median/Max/stdDev):43.61% / 68.03% / 71.35% / 6.91%
Live Nodes 14 (Decommissioned: 0)
Dead Nodes 0 (Decommissioned: 0)
Decommissioning Nodes 0
Total Datanode Volume Failures 0 (0 B)
Number of Under-Replicated Blocks 0
Number of Blocks Pending Deletion 94
Block Deletion Start Time 3/11/2016, 1:23:59 PM





On 4/3/16, 12:33 AM, "Thomas Weise" <[email protected]> wrote:

>Alex,
>
>The throughput is very low. Every bucket will have a WAL file open. It
>could be an issue with HDFS. How large is the cluster? Have you tried to
>run it with a small set of partitions?
>
>Thanks,
>Thomas
>
>
>On Sat, Apr 2, 2016 at 4:22 PM, McCullough, Alex <
>[email protected]> wrote:
>
>> Hello All,
>>
>> I work at Capital One and have been working on an application with many of
>> the other Capital One folks active within Apex community.
>>
>> We have run in to an issue with HDHT that I was hoping to get some help
>> with. We are processing customer account data from a flat file and storing
>> lists of historical field aggregates in HDHT. When the DAG first starts if
>> there are any partitions of the HDHT operators running we get errors
>> straight out of the gates and it essentially grinds the whole thing to stop
>> and it has real trouble recovering. If I knock it down to a single
>> partition the errors do not occur.
>>
>> Could this be an issue with simultaneously trying to create so many
>> directories and WALs? We have set a max of 1000 buckets for HDHT, each
>> account has ~7 slices created/updated per tuple processed representing
>> lists of historical field values, and the DAG is processing ~250
>> tuples/sec. Let me know if there is any other info that might be helpful, I
>> have also included the two exceptions that get thrown.
>>
>> Thanks,
>> Alex
>>
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>> No lease on /user/vault8/citadel_data/hdht/702/_WAL-0 (inode 77399727):
>> File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_586890524_21,
>> pendingcreates: 30]
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3399)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3255)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:676)
>> at
>> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:212)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:483)
>> at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:415)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1472)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1403)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>> at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399)
>> at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
>> at com.sun.proxy.$Proxy15.addBlock(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1674)
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1471)
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:668)
>> 2016-04-02 19:03:15,496 WARN org.apache.hadoop.hdfs.DFSClient: Error while
>> syncing
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>> No lease on /user/vault8/citadel_data/hdht/702/_WAL-0 (inode 77399727):
>> File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_586890524_21,
>> pendingcreates: 30]
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3399)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3255)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:676)
>> at
>> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:212)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:483)
>> at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:415)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1472)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1403)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>> at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399)
>> at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
>> at com.sun.proxy.$Proxy15.addBlock(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1674)
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1471)
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:668)
>>
>>
>> SECOND ERROR
>>
>> java.lang.RuntimeException: Failed to flush WAL
>> at com.datatorrent.contrib.hdht.HDHTWriter.endWindow(k:555)
>> at
>> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:145)
>> at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:86)
>> at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:54)
>> at com.datatorrent.stram.stream.OiOStream.put(OiOStream.java:67)
>> at com.datatorrent.stram.stream.MuxStream.put(MuxStream.java:120)
>> at
>> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:154)
>> at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:86)
>> at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:54)
>> at com.datatorrent.stram.stream.OiOStream.put(OiOStream.java:67)
>> at com.datatorrent.stram.stream.MuxStream.put(MuxStream.java:120)
>> at
>> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:154)
>> at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:351)
>> at
>> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
>> Caused by:
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>> No lease on /user/vault8/citadel_data/hdht/702/_WAL-0 (inode 77399727):
>> File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_586890524_21,
>> pendingcreates: 30]
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3399)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3255)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:676)
>> at
>> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:212)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:483)
>> at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:415)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1472)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1403)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>> at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399)
>> at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
>> at com.sun.proxy.$Proxy15.addBlock(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1674)
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1471)
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:668)
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.

Reply via email to