Re: Remove Spilling Queue and rewrite checkpoint/recovery

2014-08-15 Thread Suraj Menon
Hi Edward, sorry to enter the discussion so late.

Bundling and Unbundling of message queue is not Spilling queue's
responsibility, it was ended up there to be compatible with the existent
implementation of BSP Peer communication. Remember Spilling Queue
implementation was done to immediately remove some OutOfMemory issues on
sender side first. Spilling Queue gives you a byte array (ByteBuffer) with
a batch of serialized messages.  This is effectively bundling the messages
in byte array (hence the ByteArrayMessageBundle) and sending them for
processing. The SpilledDataProcessor's are implemented as a pipeline of
processing done using inheritance, something like what we may use trait for
in Scala. So if we have a SpilledDataProcessor that sends this bundled
message via RPC to the peer, there is no need to write them to file and
read them back. As I previously mentioned this was done to be compatible
with the existent implementation of peer.send.

Also, the async checkpoint recovery code was written before spilling queue.
Today we can remove the single message write and do this in before peer
sync phase to just write the whole file to HDFS.

I would say performance numbers and maintainability comes first and if you
think removing spilling queue is a solution go for it. As far as async
checkpointing is to be considered, that was a first proof of concept we did
and it is high time we move forward from there.

Chiahung, do you have some instruction on where and how I can build the
scala version of your code?

I am really finding it hard to dedicate time for Hama these days.

- Suraj


On Tue, Aug 12, 2014 at 7:15 AM, Edward J. Yoon edwardy...@apache.org
wrote:

 ChiaHung,

 Yes, I'm thinking similar things.

 On Tue, Aug 12, 2014 at 4:11 PM, Chia-Hung Lin cli...@googlemail.com
 wrote:
  I am currently working on this part based on the superstep api,
  similar to the Superstep.java in the trunk.
 
  The checkpointer[1] saves bundle message instead of single message.
  Not very sure if this is what you are looking for?
 
  [1].
 https://github.com/chlin501/hama/blob/peer-comm-mech-changed/core/src/main/scala/org/apache/hama/monitor/Checkpointer.scala
 
 
 
 
  On 12 August 2014 15:04, Edward J. Yoon edwardy...@apache.org wrote:
  I think that transferring single messages at a time is not a wise way.
  Bundle is used to avoid network overheads and contentions. So, if we
  use Bundle, each processor always sends/receives an bundles.
 
  BSPMessageBundle is Writable (and Iterable). And it manages the
  serialized message as a byte array. If we write an bundles when
  checkpointing or using Disk-queue, it'll be more simple and faster.
 
  In Spilling Queue case, it always requires the process of unbundling
  and putting messages into queue.
 
 
  On Tue, Aug 12, 2014 at 2:41 PM, Tommaso Teofili
  tommaso.teof...@gmail.com wrote:
  -1, can't we first discuss? Also it'd be helpful to be more specific
 on the
  problems.
  Tommaso
 
 
 
  2014-08-12 4:25 GMT+02:00 Edward J. Yoon edwardy...@apache.org:
 
  All,
 
  I'll delete Spilling queue, and rewrite checkpoint/recovery
  implementation (checkpointing bundles is better than checkpointing all
  messages). Current implementation is quite mess :/ there are huge
  deserialization/serialization overheads..
 
  --
  Best Regards, Edward J. Yoon
  CEO at DataSayer Co., Ltd.
 
 
 
 
  --
  Best Regards, Edward J. Yoon
  CEO at DataSayer Co., Ltd.



 --
 Best Regards, Edward J. Yoon
 CEO at DataSayer Co., Ltd.



Re: Remove Spilling Queue and rewrite checkpoint/recovery

2014-08-15 Thread Chia-Hung Lin
Code right now is at https://github.com/chlin501/hama.git

Maven and jdk are required to build the project

Command to have a clean build:
mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true

To test a specific test case:
mvn -DskipTests=false -Dtest=TestCaseName test


On 15 August 2014 18:21, Suraj Menon menonsur...@gmail.com wrote:
 Hi Edward, sorry to enter the discussion so late.

 Bundling and Unbundling of message queue is not Spilling queue's
 responsibility, it was ended up there to be compatible with the existent
 implementation of BSP Peer communication. Remember Spilling Queue
 implementation was done to immediately remove some OutOfMemory issues on
 sender side first. Spilling Queue gives you a byte array (ByteBuffer) with
 a batch of serialized messages.  This is effectively bundling the messages
 in byte array (hence the ByteArrayMessageBundle) and sending them for
 processing. The SpilledDataProcessor's are implemented as a pipeline of
 processing done using inheritance, something like what we may use trait for
 in Scala. So if we have a SpilledDataProcessor that sends this bundled
 message via RPC to the peer, there is no need to write them to file and
 read them back. As I previously mentioned this was done to be compatible
 with the existent implementation of peer.send.

 Also, the async checkpoint recovery code was written before spilling queue.
 Today we can remove the single message write and do this in before peer
 sync phase to just write the whole file to HDFS.

 I would say performance numbers and maintainability comes first and if you
 think removing spilling queue is a solution go for it. As far as async
 checkpointing is to be considered, that was a first proof of concept we did
 and it is high time we move forward from there.

 Chiahung, do you have some instruction on where and how I can build the
 scala version of your code?

 I am really finding it hard to dedicate time for Hama these days.

 - Suraj


 On Tue, Aug 12, 2014 at 7:15 AM, Edward J. Yoon edwardy...@apache.org
 wrote:

 ChiaHung,

 Yes, I'm thinking similar things.

 On Tue, Aug 12, 2014 at 4:11 PM, Chia-Hung Lin cli...@googlemail.com
 wrote:
  I am currently working on this part based on the superstep api,
  similar to the Superstep.java in the trunk.
 
  The checkpointer[1] saves bundle message instead of single message.
  Not very sure if this is what you are looking for?
 
  [1].
 https://github.com/chlin501/hama/blob/peer-comm-mech-changed/core/src/main/scala/org/apache/hama/monitor/Checkpointer.scala
 
 
 
 
  On 12 August 2014 15:04, Edward J. Yoon edwardy...@apache.org wrote:
  I think that transferring single messages at a time is not a wise way.
  Bundle is used to avoid network overheads and contentions. So, if we
  use Bundle, each processor always sends/receives an bundles.
 
  BSPMessageBundle is Writable (and Iterable). And it manages the
  serialized message as a byte array. If we write an bundles when
  checkpointing or using Disk-queue, it'll be more simple and faster.
 
  In Spilling Queue case, it always requires the process of unbundling
  and putting messages into queue.
 
 
  On Tue, Aug 12, 2014 at 2:41 PM, Tommaso Teofili
  tommaso.teof...@gmail.com wrote:
  -1, can't we first discuss? Also it'd be helpful to be more specific
 on the
  problems.
  Tommaso
 
 
 
  2014-08-12 4:25 GMT+02:00 Edward J. Yoon edwardy...@apache.org:
 
  All,
 
  I'll delete Spilling queue, and rewrite checkpoint/recovery
  implementation (checkpointing bundles is better than checkpointing all
  messages). Current implementation is quite mess :/ there are huge
  deserialization/serialization overheads..
 
  --
  Best Regards, Edward J. Yoon
  CEO at DataSayer Co., Ltd.
 
 
 
 
  --
  Best Regards, Edward J. Yoon
  CEO at DataSayer Co., Ltd.



 --
 Best Regards, Edward J. Yoon
 CEO at DataSayer Co., Ltd.



Build failed in Jenkins: Hama-Nightly-for-Hadoop-2.x #334

2014-08-15 Thread Apache Jenkins Server
See https://builds.apache.org/job/Hama-Nightly-for-Hadoop-2.x/334/

--
[...truncated 192 lines...]
at 
org.tmatesoft.svn.core.internal.wc.SVNErrorManager.error(SVNErrorManager.java:51)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:777)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:382)
... 31 more
Caused by: svn: E175002: OPTIONS request failed on '/repos/asf/hama/trunk'
at 
org.tmatesoft.svn.core.SVNErrorMessage.create(SVNErrorMessage.java:208)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:775)
... 32 more
Caused by: svn: E175002: timed out waiting for server
at 
org.tmatesoft.svn.core.SVNErrorMessage.create(SVNErrorMessage.java:208)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:514)
... 32 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at 
org.tmatesoft.svn.core.internal.util.SVNSocketConnection.run(SVNSocketConnection.java:57)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
... 4 more
java.io.IOException: remote file operation failed: 
https://builds.apache.org/job/Hama-Nightly-for-Hadoop-2.x/ws/ at 
hudson.remoting.Channel@36e407c7:ubuntu-5
at hudson.FilePath.act(FilePath.java:910)
at hudson.FilePath.act(FilePath.java:887)
at hudson.scm.SubversionSCM.checkout(SubversionSCM.java:936)
at hudson.scm.SubversionSCM.checkout(SubversionSCM.java:871)
at hudson.model.AbstractProject.checkout(AbstractProject.java:1414)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:671)
at jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:88)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:580)
at hudson.model.Run.execute(Run.java:1676)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.ResourceController.execute(ResourceController.java:88)
at hudson.model.Executor.run(Executor.java:231)
Caused by: java.io.IOException: Failed to check out 
http://svn.apache.org/repos/asf/hama/trunk
at 
hudson.scm.subversion.CheckoutUpdater$1.perform(CheckoutUpdater.java:110)
at 
hudson.scm.subversion.WorkspaceUpdater$UpdateTask.delegateTo(WorkspaceUpdater.java:161)
at 
hudson.scm.SubversionSCM$CheckOutTask.perform(SubversionSCM.java:1030)
at hudson.scm.SubversionSCM$CheckOutTask.invoke(SubversionSCM.java:1011)
at hudson.scm.SubversionSCM$CheckOutTask.invoke(SubversionSCM.java:987)
at hudson.FilePath$FileCallableWrapper.call(FilePath.java:2462)
at hudson.remoting.UserRequest.perform(UserRequest.java:118)
at hudson.remoting.UserRequest.perform(UserRequest.java:48)
at hudson.remoting.Request$2.run(Request.java:328)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.tmatesoft.svn.core.SVNException: svn: E175002: OPTIONS 
/repos/asf/hama/trunk failed
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:388)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:373)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:361)
at 
org.tmatesoft.svn.core.internal.io.dav.DAVConnection.performHttpRequest(DAVConnection.java:707)
at 
org.tmatesoft.svn.core.internal.io.dav.DAVConnection.exchangeCapabilities(DAVConnection.java:627)
at 
org.tmatesoft.svn.core.internal.io.dav.DAVConnection.open(DAVConnection.java:102)
at 
org.tmatesoft.svn.core.internal.io.dav.DAVRepository.openConnection(DAVRepository.java:1020)
at 
org.tmatesoft.svn.core.internal.io.dav.DAVRepository.getLatestRevision(DAVRepository.java:180)
at 
org.tmatesoft.svn.core.internal.wc16.SVNBasicDelegate.getRevisionNumber(SVNBasicDelegate.java:480)
at 

Build failed in Jenkins: Hama-Nightly-for-Hadoop-1.x #1339

2014-08-15 Thread Apache Jenkins Server
See https://builds.apache.org/job/Hama-Nightly-for-Hadoop-1.x/1339/

--
[...truncated 192 lines...]
at 
org.tmatesoft.svn.core.internal.wc.SVNErrorManager.error(SVNErrorManager.java:51)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:777)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:382)
... 31 more
Caused by: svn: E175002: OPTIONS request failed on '/repos/asf/hama/trunk'
at 
org.tmatesoft.svn.core.SVNErrorMessage.create(SVNErrorMessage.java:208)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:775)
... 32 more
Caused by: svn: E175002: timed out waiting for server
at 
org.tmatesoft.svn.core.SVNErrorMessage.create(SVNErrorMessage.java:208)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection._request(HTTPConnection.java:514)
... 32 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at 
org.tmatesoft.svn.core.internal.util.SVNSocketConnection.run(SVNSocketConnection.java:57)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
... 4 more
java.io.IOException: remote file operation failed: 
https://builds.apache.org/job/Hama-Nightly-for-Hadoop-1.x/ws/ at 
hudson.remoting.Channel@36e407c7:ubuntu-5
at hudson.FilePath.act(FilePath.java:910)
at hudson.FilePath.act(FilePath.java:887)
at hudson.scm.SubversionSCM.checkout(SubversionSCM.java:936)
at hudson.scm.SubversionSCM.checkout(SubversionSCM.java:871)
at hudson.model.AbstractProject.checkout(AbstractProject.java:1414)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:671)
at jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:88)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:580)
at hudson.model.Run.execute(Run.java:1676)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.ResourceController.execute(ResourceController.java:88)
at hudson.model.Executor.run(Executor.java:231)
Caused by: java.io.IOException: Failed to check out 
http://svn.apache.org/repos/asf/hama/trunk
at 
hudson.scm.subversion.CheckoutUpdater$1.perform(CheckoutUpdater.java:110)
at 
hudson.scm.subversion.WorkspaceUpdater$UpdateTask.delegateTo(WorkspaceUpdater.java:161)
at 
hudson.scm.SubversionSCM$CheckOutTask.perform(SubversionSCM.java:1030)
at hudson.scm.SubversionSCM$CheckOutTask.invoke(SubversionSCM.java:1011)
at hudson.scm.SubversionSCM$CheckOutTask.invoke(SubversionSCM.java:987)
at hudson.FilePath$FileCallableWrapper.call(FilePath.java:2462)
at hudson.remoting.UserRequest.perform(UserRequest.java:118)
at hudson.remoting.UserRequest.perform(UserRequest.java:48)
at hudson.remoting.Request$2.run(Request.java:328)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.tmatesoft.svn.core.SVNException: svn: E175002: OPTIONS 
/repos/asf/hama/trunk failed
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:388)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:373)
at 
org.tmatesoft.svn.core.internal.io.dav.http.HTTPConnection.request(HTTPConnection.java:361)
at 
org.tmatesoft.svn.core.internal.io.dav.DAVConnection.performHttpRequest(DAVConnection.java:707)
at 
org.tmatesoft.svn.core.internal.io.dav.DAVConnection.exchangeCapabilities(DAVConnection.java:627)
at 
org.tmatesoft.svn.core.internal.io.dav.DAVConnection.open(DAVConnection.java:102)
at 
org.tmatesoft.svn.core.internal.io.dav.DAVRepository.openConnection(DAVRepository.java:1020)
at 
org.tmatesoft.svn.core.internal.io.dav.DAVRepository.getLatestRevision(DAVRepository.java:180)
at 
org.tmatesoft.svn.core.internal.wc16.SVNBasicDelegate.getRevisionNumber(SVNBasicDelegate.java:480)
at