[ 
https://issues.apache.org/jira/browse/HAMA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13478342#comment-13478342
 ] 

Thomas Jungblut commented on HAMA-559:
--------------------------------------

Good work, looks solid. 

Errors:

- messageClass_ is not initialized = NPEs
- directories before the filename needs to be checked if exists (need to be 
mkdir'd or deleted)
- in the threadpool shutdown, use shutdownNow
- SpillWriteIndexStatus waits forever in the main thread (and never finishes 
the process), see the threaddump

{noformat}
"main" prio=6 tid=0x0000000001d7a000 nid=0x1408 in Object.wait() 
[0x000000000213e000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000004031e190> (a 
org.apache.hama.bsp.message.SpillingBuffer$SpillWriteIndexStatus)
        at java.lang.Object.wait(Object.java:503)
        at 
org.apache.hama.bsp.message.SpillingBuffer$SpillWriteIndexStatus.startSpilling(SpillingBuffer.java:243)
        - locked <0x000000004031e190> (a 
org.apache.hama.bsp.message.SpillingBuffer$SpillWriteIndexStatus)
        at 
org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.incBytesWritten(SpillingBuffer.java:368)
        - locked <0x000000004031dfa0> (a 
org.apache.hama.bsp.message.SpillingBuffer$SpillingStream)
        at 
org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.write(SpillingBuffer.java:398)
        at 
org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.write(SpillingBuffer.java:349)
        at 
org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.write(SpillingBuffer.java:344)
        at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)
        at org.apache.hadoop.io.IntWritable.write(IntWritable.java:42)
        at org.apache.hama.bsp.message.SpillingQueue.add(SpillingQueue.java:97)
        at org.apache.hama.bsp.message.SpillingQueue.add(SpillingQueue.java:1)
        at 
de.jungblut.benchmark.SpillBenchmark.timeSpill(SpillBenchmark.java:55)
        at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at com.google.caliper.SimpleBenchmark$1.run(SimpleBenchmark.java:125)
        at com.google.caliper.TimeMeasurer.measureReps(TimeMeasurer.java:184)
        at com.google.caliper.TimeMeasurer.warmUp(TimeMeasurer.java:63)
        at com.google.caliper.TimeMeasurer.run(TimeMeasurer.java:127)
        at com.google.caliper.InProcessRunner.run(InProcessRunner.java:74)
        at com.google.caliper.InProcessRunner.run(InProcessRunner.java:49)
        at com.google.caliper.InProcessRunner.main(InProcessRunner.java:103)

{noformat}

Just a few suggestions:

- SpillIterator can be extended from AbstractIterator in Guava, looks cleaner 
and does not implement remove as well.
- everything which involves bytes or buffer sizes in the init and configuration 
should be long instead of int
- Instead of "java.io.tmpdir" we should use the same directory from the 
diskqueue and instead of a random hash use the task id
- In the buffer we should stay with normal java conventions and don't use _ as 
suffix.
- Some annotations are not present, parameter names are autogenerated and a few 
empty javadocs, nothing special
- There are some sysouts (better use the logs)

In case of synchronized receives from RPC, we can use Apurvs segmentation for 
multiple buffers to increase throughput and decrease locking.

I benched with Caliper against the diskqueue (raw inserts, polls can be benched 
later on):

https://gist.github.com/3907709

Result (is biased, I have commented out the part that locked the main thread 
and do the actual spill):

{noformat}

 0% Scenario{vm=java, trial=0, benchmark=Spill, size=10, type=SPILLING_QUEUE} 
2876,07 ns; s=40,55 ns @ 10 trials
 8% Scenario{vm=java, trial=0, benchmark=Spill, size=100, type=SPILLING_QUEUE} 
9316,85 ns; s=78,40 ns @ 3 trials
17% Scenario{vm=java, trial=0, benchmark=Spill, size=1000, type=SPILLING_QUEUE} 
70874,27 ns; s=221,37 ns @ 3 trials
25% Scenario{vm=java, trial=0, benchmark=Spill, size=10000, 
type=SPILLING_QUEUE} 684388,29 ns; s=1840,57 ns @ 3 trials
33% Scenario{vm=java, trial=0, benchmark=Spill, size=100000, 
type=SPILLING_QUEUE} 6910806,76 ns; s=9592,61 ns @ 3 trials
42% Scenario{vm=java, trial=0, benchmark=Spill, size=1000000, 
type=SPILLING_QUEUE} 69555994,62 ns; s=296212,42 ns @ 3 trials
50% Scenario{vm=java, trial=0, benchmark=Spill, size=10, type=DISK_QUEUE} 
23428,43 ns; s=203,91 ns @ 4 trials
58% Scenario{vm=java, trial=0, benchmark=Spill, size=100, type=DISK_QUEUE} 
203707,18 ns; s=1880,11 ns @ 5 trials
67% Scenario{vm=java, trial=0, benchmark=Spill, size=1000, type=DISK_QUEUE} 
2085204,12 ns; s=20188,91 ns @ 3 trials
75% Scenario{vm=java, trial=0, benchmark=Spill, size=10000, type=DISK_QUEUE} 
20315540,28 ns; s=1130985,60 ns @ 10 trials
83% Scenario{vm=java, trial=0, benchmark=Spill, size=100000, type=DISK_QUEUE} 
196685169,60 ns; s=1888695,70 ns @ 6 trials
92% Scenario{vm=java, trial=0, benchmark=Spill, size=1000000, type=DISK_QUEUE} 
2093004128,00 ns; s=1216169,56 ns @ 3 trials

          type    size         us linear runtime
SPILLING_QUEUE      10       2,88 =
SPILLING_QUEUE     100       9,32 =
SPILLING_QUEUE    1000      70,87 =
SPILLING_QUEUE   10000     684,39 =
SPILLING_QUEUE  100000    6910,81 =
SPILLING_QUEUE 1000000   69555,99 =
    DISK_QUEUE      10      23,43 =
    DISK_QUEUE     100     203,71 =
    DISK_QUEUE    1000    2085,20 =
    DISK_QUEUE   10000   20315,54 =
    DISK_QUEUE  100000  196685,17 ==
    DISK_QUEUE 1000000 2093004,13 ==============================

vm: java
trial: 0
benchmark: Spill

{noformat}



TODO: profiling :)
                
> Add a spilling message queue
> ----------------------------
>
>                 Key: HAMA-559
>                 URL: https://issues.apache.org/jira/browse/HAMA-559
>             Project: Hama
>          Issue Type: Sub-task
>          Components: bsp core
>    Affects Versions: 0.5.0
>            Reporter: Thomas Jungblut
>            Assignee: Suraj Menon
>            Priority: Minor
>             Fix For: 0.6.0
>
>         Attachments: HAMA-559.patch-v1
>
>
> After HAMA-521 is done, we can add a spilling queue which just holds the 
> messages in RAM that fit into the heap space. The rest can be flushed to disk.
> We may call this a HybridQueue or something like that.
> The benefits should be that we don't have to flush to disk so often and get 
> faster. However we may have more GC so it is always overall faster.
> The requirements for this queue also include:
> - The message object once written to the queue (after returning from the 
> write call) could be modified, but the changes should not be reflected in the 
> messages stored in the queue.
> - For now let's implement a queue that does not support concurrent reading 
> and writing. This feature is needed when we implement asynchronous 
> communication.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to