[
https://issues.apache.org/jira/browse/HAMA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13478342#comment-13478342
]
Thomas Jungblut commented on HAMA-559:
--------------------------------------
Good work, looks solid.
Errors:
- messageClass_ is not initialized = NPEs
- directories before the filename needs to be checked if exists (need to be
mkdir'd or deleted)
- in the threadpool shutdown, use shutdownNow
- SpillWriteIndexStatus waits forever in the main thread (and never finishes
the process), see the threaddump
{noformat}
"main" prio=6 tid=0x0000000001d7a000 nid=0x1408 in Object.wait()
[0x000000000213e000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000004031e190> (a
org.apache.hama.bsp.message.SpillingBuffer$SpillWriteIndexStatus)
at java.lang.Object.wait(Object.java:503)
at
org.apache.hama.bsp.message.SpillingBuffer$SpillWriteIndexStatus.startSpilling(SpillingBuffer.java:243)
- locked <0x000000004031e190> (a
org.apache.hama.bsp.message.SpillingBuffer$SpillWriteIndexStatus)
at
org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.incBytesWritten(SpillingBuffer.java:368)
- locked <0x000000004031dfa0> (a
org.apache.hama.bsp.message.SpillingBuffer$SpillingStream)
at
org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.write(SpillingBuffer.java:398)
at
org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.write(SpillingBuffer.java:349)
at
org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.write(SpillingBuffer.java:344)
at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)
at org.apache.hadoop.io.IntWritable.write(IntWritable.java:42)
at org.apache.hama.bsp.message.SpillingQueue.add(SpillingQueue.java:97)
at org.apache.hama.bsp.message.SpillingQueue.add(SpillingQueue.java:1)
at
de.jungblut.benchmark.SpillBenchmark.timeSpill(SpillBenchmark.java:55)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.google.caliper.SimpleBenchmark$1.run(SimpleBenchmark.java:125)
at com.google.caliper.TimeMeasurer.measureReps(TimeMeasurer.java:184)
at com.google.caliper.TimeMeasurer.warmUp(TimeMeasurer.java:63)
at com.google.caliper.TimeMeasurer.run(TimeMeasurer.java:127)
at com.google.caliper.InProcessRunner.run(InProcessRunner.java:74)
at com.google.caliper.InProcessRunner.run(InProcessRunner.java:49)
at com.google.caliper.InProcessRunner.main(InProcessRunner.java:103)
{noformat}
Just a few suggestions:
- SpillIterator can be extended from AbstractIterator in Guava, looks cleaner
and does not implement remove as well.
- everything which involves bytes or buffer sizes in the init and configuration
should be long instead of int
- Instead of "java.io.tmpdir" we should use the same directory from the
diskqueue and instead of a random hash use the task id
- In the buffer we should stay with normal java conventions and don't use _ as
suffix.
- Some annotations are not present, parameter names are autogenerated and a few
empty javadocs, nothing special
- There are some sysouts (better use the logs)
In case of synchronized receives from RPC, we can use Apurvs segmentation for
multiple buffers to increase throughput and decrease locking.
I benched with Caliper against the diskqueue (raw inserts, polls can be benched
later on):
https://gist.github.com/3907709
Result (is biased, I have commented out the part that locked the main thread
and do the actual spill):
{noformat}
0% Scenario{vm=java, trial=0, benchmark=Spill, size=10, type=SPILLING_QUEUE}
2876,07 ns; s=40,55 ns @ 10 trials
8% Scenario{vm=java, trial=0, benchmark=Spill, size=100, type=SPILLING_QUEUE}
9316,85 ns; s=78,40 ns @ 3 trials
17% Scenario{vm=java, trial=0, benchmark=Spill, size=1000, type=SPILLING_QUEUE}
70874,27 ns; s=221,37 ns @ 3 trials
25% Scenario{vm=java, trial=0, benchmark=Spill, size=10000,
type=SPILLING_QUEUE} 684388,29 ns; s=1840,57 ns @ 3 trials
33% Scenario{vm=java, trial=0, benchmark=Spill, size=100000,
type=SPILLING_QUEUE} 6910806,76 ns; s=9592,61 ns @ 3 trials
42% Scenario{vm=java, trial=0, benchmark=Spill, size=1000000,
type=SPILLING_QUEUE} 69555994,62 ns; s=296212,42 ns @ 3 trials
50% Scenario{vm=java, trial=0, benchmark=Spill, size=10, type=DISK_QUEUE}
23428,43 ns; s=203,91 ns @ 4 trials
58% Scenario{vm=java, trial=0, benchmark=Spill, size=100, type=DISK_QUEUE}
203707,18 ns; s=1880,11 ns @ 5 trials
67% Scenario{vm=java, trial=0, benchmark=Spill, size=1000, type=DISK_QUEUE}
2085204,12 ns; s=20188,91 ns @ 3 trials
75% Scenario{vm=java, trial=0, benchmark=Spill, size=10000, type=DISK_QUEUE}
20315540,28 ns; s=1130985,60 ns @ 10 trials
83% Scenario{vm=java, trial=0, benchmark=Spill, size=100000, type=DISK_QUEUE}
196685169,60 ns; s=1888695,70 ns @ 6 trials
92% Scenario{vm=java, trial=0, benchmark=Spill, size=1000000, type=DISK_QUEUE}
2093004128,00 ns; s=1216169,56 ns @ 3 trials
type size us linear runtime
SPILLING_QUEUE 10 2,88 =
SPILLING_QUEUE 100 9,32 =
SPILLING_QUEUE 1000 70,87 =
SPILLING_QUEUE 10000 684,39 =
SPILLING_QUEUE 100000 6910,81 =
SPILLING_QUEUE 1000000 69555,99 =
DISK_QUEUE 10 23,43 =
DISK_QUEUE 100 203,71 =
DISK_QUEUE 1000 2085,20 =
DISK_QUEUE 10000 20315,54 =
DISK_QUEUE 100000 196685,17 ==
DISK_QUEUE 1000000 2093004,13 ==============================
vm: java
trial: 0
benchmark: Spill
{noformat}
TODO: profiling :)
> Add a spilling message queue
> ----------------------------
>
> Key: HAMA-559
> URL: https://issues.apache.org/jira/browse/HAMA-559
> Project: Hama
> Issue Type: Sub-task
> Components: bsp core
> Affects Versions: 0.5.0
> Reporter: Thomas Jungblut
> Assignee: Suraj Menon
> Priority: Minor
> Fix For: 0.6.0
>
> Attachments: HAMA-559.patch-v1
>
>
> After HAMA-521 is done, we can add a spilling queue which just holds the
> messages in RAM that fit into the heap space. The rest can be flushed to disk.
> We may call this a HybridQueue or something like that.
> The benefits should be that we don't have to flush to disk so often and get
> faster. However we may have more GC so it is always overall faster.
> The requirements for this queue also include:
> - The message object once written to the queue (after returning from the
> write call) could be modified, but the changes should not be reflected in the
> messages stored in the queue.
> - For now let's implement a queue that does not support concurrent reading
> and writing. This feature is needed when we implement asynchronous
> communication.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira