Re: Remove Spilling Queue and rewrite checkpoint/recovery
Hi Edward, sorry to enter the discussion so late. Bundling and Unbundling of message queue is not Spilling queue's responsibility, it was ended up there to be compatible with the existent implementation of BSP Peer communication. Remember Spilling Queue implementation was done to immediately remove some OutOfMemory issues on sender side first. Spilling Queue gives you a byte array (ByteBuffer) with a batch of serialized messages. This is effectively bundling the messages in byte array (hence the ByteArrayMessageBundle) and sending them for processing. The SpilledDataProcessor's are implemented as a pipeline of processing done using inheritance, something like what we may use trait for in Scala. So if we have a SpilledDataProcessor that sends this bundled message via RPC to the peer, there is no need to write them to file and read them back. As I previously mentioned this was done to be compatible with the existent implementation of peer.send. Also, the async checkpoint recovery code was written before spilling queue. Today we can remove the single message write and do this in before peer sync phase to just write the whole file to HDFS. I would say performance numbers and maintainability comes first and if you think removing spilling queue is a solution go for it. As far as async checkpointing is to be considered, that was a first proof of concept we did and it is high time we move forward from there. Chiahung, do you have some instruction on where and how I can build the scala version of your code? I am really finding it hard to dedicate time for Hama these days. - Suraj On Tue, Aug 12, 2014 at 7:15 AM, Edward J. Yoon edwardy...@apache.org wrote: ChiaHung, Yes, I'm thinking similar things. On Tue, Aug 12, 2014 at 4:11 PM, Chia-Hung Lin cli...@googlemail.com wrote: I am currently working on this part based on the superstep api, similar to the Superstep.java in the trunk. The checkpointer[1] saves bundle message instead of single message. Not very sure if this is what you are looking for? [1]. https://github.com/chlin501/hama/blob/peer-comm-mech-changed/core/src/main/scala/org/apache/hama/monitor/Checkpointer.scala On 12 August 2014 15:04, Edward J. Yoon edwardy...@apache.org wrote: I think that transferring single messages at a time is not a wise way. Bundle is used to avoid network overheads and contentions. So, if we use Bundle, each processor always sends/receives an bundles. BSPMessageBundle is Writable (and Iterable). And it manages the serialized message as a byte array. If we write an bundles when checkpointing or using Disk-queue, it'll be more simple and faster. In Spilling Queue case, it always requires the process of unbundling and putting messages into queue. On Tue, Aug 12, 2014 at 2:41 PM, Tommaso Teofili tommaso.teof...@gmail.com wrote: -1, can't we first discuss? Also it'd be helpful to be more specific on the problems. Tommaso 2014-08-12 4:25 GMT+02:00 Edward J. Yoon edwardy...@apache.org: All, I'll delete Spilling queue, and rewrite checkpoint/recovery implementation (checkpointing bundles is better than checkpointing all messages). Current implementation is quite mess :/ there are huge deserialization/serialization overheads.. -- Best Regards, Edward J. Yoon CEO at DataSayer Co., Ltd. -- Best Regards, Edward J. Yoon CEO at DataSayer Co., Ltd. -- Best Regards, Edward J. Yoon CEO at DataSayer Co., Ltd.
Re: Remove Spilling Queue and rewrite checkpoint/recovery
Code right now is at https://github.com/chlin501/hama.git Maven and jdk are required to build the project Command to have a clean build: mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true To test a specific test case: mvn -DskipTests=false -Dtest=TestCaseName test On 15 August 2014 18:21, Suraj Menon menonsur...@gmail.com wrote: Hi Edward, sorry to enter the discussion so late. Bundling and Unbundling of message queue is not Spilling queue's responsibility, it was ended up there to be compatible with the existent implementation of BSP Peer communication. Remember Spilling Queue implementation was done to immediately remove some OutOfMemory issues on sender side first. Spilling Queue gives you a byte array (ByteBuffer) with a batch of serialized messages. This is effectively bundling the messages in byte array (hence the ByteArrayMessageBundle) and sending them for processing. The SpilledDataProcessor's are implemented as a pipeline of processing done using inheritance, something like what we may use trait for in Scala. So if we have a SpilledDataProcessor that sends this bundled message via RPC to the peer, there is no need to write them to file and read them back. As I previously mentioned this was done to be compatible with the existent implementation of peer.send. Also, the async checkpoint recovery code was written before spilling queue. Today we can remove the single message write and do this in before peer sync phase to just write the whole file to HDFS. I would say performance numbers and maintainability comes first and if you think removing spilling queue is a solution go for it. As far as async checkpointing is to be considered, that was a first proof of concept we did and it is high time we move forward from there. Chiahung, do you have some instruction on where and how I can build the scala version of your code? I am really finding it hard to dedicate time for Hama these days. - Suraj On Tue, Aug 12, 2014 at 7:15 AM, Edward J. Yoon edwardy...@apache.org wrote: ChiaHung, Yes, I'm thinking similar things. On Tue, Aug 12, 2014 at 4:11 PM, Chia-Hung Lin cli...@googlemail.com wrote: I am currently working on this part based on the superstep api, similar to the Superstep.java in the trunk. The checkpointer[1] saves bundle message instead of single message. Not very sure if this is what you are looking for? [1]. https://github.com/chlin501/hama/blob/peer-comm-mech-changed/core/src/main/scala/org/apache/hama/monitor/Checkpointer.scala On 12 August 2014 15:04, Edward J. Yoon edwardy...@apache.org wrote: I think that transferring single messages at a time is not a wise way. Bundle is used to avoid network overheads and contentions. So, if we use Bundle, each processor always sends/receives an bundles. BSPMessageBundle is Writable (and Iterable). And it manages the serialized message as a byte array. If we write an bundles when checkpointing or using Disk-queue, it'll be more simple and faster. In Spilling Queue case, it always requires the process of unbundling and putting messages into queue. On Tue, Aug 12, 2014 at 2:41 PM, Tommaso Teofili tommaso.teof...@gmail.com wrote: -1, can't we first discuss? Also it'd be helpful to be more specific on the problems. Tommaso 2014-08-12 4:25 GMT+02:00 Edward J. Yoon edwardy...@apache.org: All, I'll delete Spilling queue, and rewrite checkpoint/recovery implementation (checkpointing bundles is better than checkpointing all messages). Current implementation is quite mess :/ there are huge deserialization/serialization overheads.. -- Best Regards, Edward J. Yoon CEO at DataSayer Co., Ltd. -- Best Regards, Edward J. Yoon CEO at DataSayer Co., Ltd. -- Best Regards, Edward J. Yoon CEO at DataSayer Co., Ltd.
Build failed in Jenkins: Hama-Nightly-for-Hadoop-2.x #334
See https://builds.apache.org/job/Hama-Nightly-for-Hadoop-2.x/334/ -- [...truncated 192 lines...] at org.tmatesoft.svn.core.internal.wc.SVNErrorManager.error(SVNErrorManager.java:51) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:777) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:382) ... 31 more Caused by: svn: E175002: OPTIONS request failed on '/repos/asf/hama/trunk' at org.tmatesoft.svn.core.SVNErrorMessage.create(SVNErrorMessage.java:208) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:775) ... 32 more Caused by: svn: E175002: timed out waiting for server at org.tmatesoft.svn.core.SVNErrorMessage.create(SVNErrorMessage.java:208) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:514) ... 32 more Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at org.tmatesoft.svn.core.internal.util.SVNSocketConnection.run(SVNSocketConnection.java:57) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) ... 4 more java.io.IOException: remote file operation failed: https://builds.apache.org/job/Hama-Nightly-for-Hadoop-2.x/ws/ at hudson.remoting.Channel@36e407c7:ubuntu-5 at hudson.FilePath.act(FilePath.java:910) at hudson.FilePath.act(FilePath.java:887) at hudson.scm.SubversionSCM.checkout(SubversionSCM.java:936) at hudson.scm.SubversionSCM.checkout(SubversionSCM.java:871) at hudson.model.AbstractProject.checkout(AbstractProject.java:1414) at hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:671) at jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:88) at hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:580) at hudson.model.Run.execute(Run.java:1676) at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43) at hudson.model.ResourceController.execute(ResourceController.java:88) at hudson.model.Executor.run(Executor.java:231) Caused by: java.io.IOException: Failed to check out http://svn.apache.org/repos/asf/hama/trunk at hudson.scm.subversion.CheckoutUpdater$1.perform(CheckoutUpdater.java:110) at hudson.scm.subversion.WorkspaceUpdater$UpdateTask.delegateTo(WorkspaceUpdater.java:161) at hudson.scm.SubversionSCM$CheckOutTask.perform(SubversionSCM.java:1030) at hudson.scm.SubversionSCM$CheckOutTask.invoke(SubversionSCM.java:1011) at hudson.scm.SubversionSCM$CheckOutTask.invoke(SubversionSCM.java:987) at hudson.FilePath$FileCallableWrapper.call(FilePath.java:2462) at hudson.remoting.UserRequest.perform(UserRequest.java:118) at hudson.remoting.UserRequest.perform(UserRequest.java:48) at hudson.remoting.Request$2.run(Request.java:328) at hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.tmatesoft.svn.core.SVNException: svn: E175002: OPTIONS /repos/asf/hama/trunk failed at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:388) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:373) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:361) at org.tmatesoft.svn.core.internal.io.dav.DAVConnection.performHttpRequest(DAVConnection.java:707) at org.tmatesoft.svn.core.internal.io.dav.DAVConnection.exchangeCapabilities(DAVConnection.java:627) at org.tmatesoft.svn.core.internal.io.dav.DAVConnection.open(DAVConnection.java:102) at org.tmatesoft.svn.core.internal.io.dav.DAVRepository.openConnection(DAVRepository.java:1020) at org.tmatesoft.svn.core.internal.io.dav.DAVRepository.getLatestRevision(DAVRepository.java:180) at org.tmatesoft.svn.core.internal.wc16.SVNBasicDelegate.getRevisionNumber(SVNBasicDelegate.java:480) at
Build failed in Jenkins: Hama-Nightly-for-Hadoop-1.x #1339
See https://builds.apache.org/job/Hama-Nightly-for-Hadoop-1.x/1339/ -- [...truncated 192 lines...] at org.tmatesoft.svn.core.internal.wc.SVNErrorManager.error(SVNErrorManager.java:51) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:777) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:382) ... 31 more Caused by: svn: E175002: OPTIONS request failed on '/repos/asf/hama/trunk' at org.tmatesoft.svn.core.SVNErrorMessage.create(SVNErrorMessage.java:208) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:775) ... 32 more Caused by: svn: E175002: timed out waiting for server at org.tmatesoft.svn.core.SVNErrorMessage.create(SVNErrorMessage.java:208) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:514) ... 32 more Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at org.tmatesoft.svn.core.internal.util.SVNSocketConnection.run(SVNSocketConnection.java:57) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) ... 4 more java.io.IOException: remote file operation failed: https://builds.apache.org/job/Hama-Nightly-for-Hadoop-1.x/ws/ at hudson.remoting.Channel@36e407c7:ubuntu-5 at hudson.FilePath.act(FilePath.java:910) at hudson.FilePath.act(FilePath.java:887) at hudson.scm.SubversionSCM.checkout(SubversionSCM.java:936) at hudson.scm.SubversionSCM.checkout(SubversionSCM.java:871) at hudson.model.AbstractProject.checkout(AbstractProject.java:1414) at hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:671) at jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:88) at hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:580) at hudson.model.Run.execute(Run.java:1676) at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43) at hudson.model.ResourceController.execute(ResourceController.java:88) at hudson.model.Executor.run(Executor.java:231) Caused by: java.io.IOException: Failed to check out http://svn.apache.org/repos/asf/hama/trunk at hudson.scm.subversion.CheckoutUpdater$1.perform(CheckoutUpdater.java:110) at hudson.scm.subversion.WorkspaceUpdater$UpdateTask.delegateTo(WorkspaceUpdater.java:161) at hudson.scm.SubversionSCM$CheckOutTask.perform(SubversionSCM.java:1030) at hudson.scm.SubversionSCM$CheckOutTask.invoke(SubversionSCM.java:1011) at hudson.scm.SubversionSCM$CheckOutTask.invoke(SubversionSCM.java:987) at hudson.FilePath$FileCallableWrapper.call(FilePath.java:2462) at hudson.remoting.UserRequest.perform(UserRequest.java:118) at hudson.remoting.UserRequest.perform(UserRequest.java:48) at hudson.remoting.Request$2.run(Request.java:328) at hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.tmatesoft.svn.core.SVNException: svn: E175002: OPTIONS /repos/asf/hama/trunk failed at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:388) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:373) at org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:361) at org.tmatesoft.svn.core.internal.io.dav.DAVConnection.performHttpRequest(DAVConnection.java:707) at org.tmatesoft.svn.core.internal.io.dav.DAVConnection.exchangeCapabilities(DAVConnection.java:627) at org.tmatesoft.svn.core.internal.io.dav.DAVConnection.open(DAVConnection.java:102) at org.tmatesoft.svn.core.internal.io.dav.DAVRepository.openConnection(DAVRepository.java:1020) at org.tmatesoft.svn.core.internal.io.dav.DAVRepository.getLatestRevision(DAVRepository.java:180) at org.tmatesoft.svn.core.internal.wc16.SVNBasicDelegate.getRevisionNumber(SVNBasicDelegate.java:480) at