[
https://issues.apache.org/jira/browse/SAMZA-245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073800#comment-14073800
]
Chris Riccomini commented on SAMZA-245:
---------------------------------------
Something has been bugging me. Why is it that using an ArrayList in
BlockingEnvelopeMap, and converting it to an ArrayDeque in SystemConsumers is
faster than just using an ArrayDeque in BlockingEnvelopeMap, and using it
directly in SystemConsumers?
I wrote a little test to see how LinkedList, ArrayList, and ArrayDeque perform
on the methods that BlockingEnvelopeMap and SystemConsumers call:
{code}
public static LinkedBlockingQueue<Integer> loadBlockingQueue(int operations) {
Integer integer = new Integer(0);
LinkedBlockingQueue<Integer> linkedBlockingQueue = new
LinkedBlockingQueue<Integer>();
for (int operation = 0; operation < operations; ++operation) {
linkedBlockingQueue.add(integer);
}
return linkedBlockingQueue;
}
public static void main(String[] args) throws InterruptedException {
int OPERATIONS = 10000000;
int LOOPS = 3;
Integer integer = new Integer(0);
LinkedBlockingQueue<Integer> linkedBlockingQueue =
loadBlockingQueue(OPERATIONS);
for (int loop = 0; loop < LOOPS; ++loop) {
System.out.println("--------- ITERATION " + loop + " ---------");
// ----
long start = System.currentTimeMillis();
List<Integer> list = new LinkedList<Integer>();
System.out.println("list(): " + (System.currentTimeMillis() - start) +
"ms");
start = System.currentTimeMillis();
for (int operation = 0; operation < OPERATIONS; ++operation) {
list.add(integer);
}
System.out.println("list.add: " + (System.currentTimeMillis() - start) +
"ms");
// ----
start = System.currentTimeMillis();
for (int operation = 0; operation < OPERATIONS; ++operation) {
list.size();
}
System.out.println("list.size: " + (System.currentTimeMillis() - start) +
"ms");
// ----
start = System.currentTimeMillis();
for (int operation = 0; operation < OPERATIONS; ++operation) {
list.remove(0);
}
System.out.println("list.remove: " + (System.currentTimeMillis() - start)
+ "ms");
// ----
start = System.currentTimeMillis();
linkedBlockingQueue.drainTo(list);
System.out.println("list.drainTo: " + (System.currentTimeMillis() -
start) + "ms");
linkedBlockingQueue = loadBlockingQueue(OPERATIONS);
// ----
start = System.currentTimeMillis();
ArrayList<Integer> arrayList = new ArrayList<Integer>(OPERATIONS);
System.out.println("arrayList(): " + (System.currentTimeMillis() - start)
+ "ms");
start = System.currentTimeMillis();
for (int operation = 0; operation < OPERATIONS; ++operation) {
arrayList.add(integer);
}
System.out.println("arrayList.add: " + (System.currentTimeMillis() -
start) + "ms");
// ----
start = System.currentTimeMillis();
for (int operation = 0; operation < OPERATIONS; ++operation) {
arrayList.size();
}
System.out.println("arrayList.size: " + (System.currentTimeMillis() -
start) + "ms");
// ----
start = System.currentTimeMillis();
// for (int operation = 0; operation < OPERATIONS; ++operation) {
// arrayList.remove(0);
// }
System.out.println("arrayList.remove: DOES NOT FINISH");
// ----
arrayList = new ArrayList<Integer>(OPERATIONS);
start = System.currentTimeMillis();
linkedBlockingQueue.drainTo(arrayList);
System.out.println("arrayList.drainTo: " + (System.currentTimeMillis() -
start) + "ms");
linkedBlockingQueue = loadBlockingQueue(OPERATIONS);
// ----
start = System.currentTimeMillis();
new ArrayDeque<Integer>(arrayList);
System.out.println("new q(arrayList): " + (System.currentTimeMillis() -
start) + "ms");
// ----
start = System.currentTimeMillis();
Queue<Integer> q = new ArrayDeque<Integer>(OPERATIONS);
System.out.println("q(): " + (System.currentTimeMillis() - start) + "ms");
start = System.currentTimeMillis();
for (int operation = 0; operation < OPERATIONS; ++operation) {
q.add(integer);
}
System.out.println("q.add: " + (System.currentTimeMillis() - start) +
"ms");
// ----
start = System.currentTimeMillis();
for (int operation = 0; operation < OPERATIONS; ++operation) {
q.size();
}
System.out.println("q.size: " + (System.currentTimeMillis() - start) +
"ms");
// ----
start = System.currentTimeMillis();
for (int operation = 0; operation < OPERATIONS; ++operation) {
q.remove();
}
System.out.println("q.remove: " + (System.currentTimeMillis() - start) +
"ms");
// ----
start = System.currentTimeMillis();
for (int operation = 0; operation < OPERATIONS; ++operation) {
q.offer(integer);
}
System.out.println("q.offer: " + (System.currentTimeMillis() - start) +
"ms");
// ----
start = System.currentTimeMillis();
for (int operation = 0; operation < OPERATIONS; ++operation) {
q.poll();
}
System.out.println("q.poll: " + (System.currentTimeMillis() - start) +
"ms");
// ----
start = System.currentTimeMillis();
linkedBlockingQueue.drainTo(q);
System.out.println("q.drainTo: " + (System.currentTimeMillis() - start) +
"ms");
linkedBlockingQueue = loadBlockingQueue(OPERATIONS);
}
}
{code}
Running this code shows:
{noformat}
--------- ITERATION 0 ---------
list(): 0ms
list.add: 1797ms
list.size: 4ms
list.remove: 82ms
list.drainTo: 1915ms
arrayList(): 2422ms
arrayList.add: 37ms
arrayList.size: 5ms
arrayList.remove: DOES NOT FINISH
arrayList.drainTo: 95ms
new q(arrayList): 75ms
q(): 79ms
q.add: 38ms
q.size: 5ms
q.remove: 22ms
q.offer: 48ms
q.poll: 20ms
q.drainTo: 113ms
--------- ITERATION 1 ---------
list(): 0ms
list.add: 1387ms
list.size: 0ms
list.remove: 61ms
list.drainTo: 1761ms
arrayList(): 539ms
arrayList.add: 46ms
arrayList.size: 0ms
arrayList.remove: DOES NOT FINISH
arrayList.drainTo: 100ms
new q(arrayList): 98ms
q(): 63ms
q.add: 46ms
q.size: 0ms
q.remove: 28ms
q.offer: 48ms
q.poll: 28ms
q.drainTo: 102ms
--------- ITERATION 2 ---------
list(): 0ms
list.add: 1360ms
list.size: 0ms
list.remove: 65ms
list.drainTo: 1778ms
arrayList(): 533ms
arrayList.add: 47ms
arrayList.size: 0ms
arrayList.remove: DOES NOT FINISH
arrayList.drainTo: 123ms
new q(arrayList): 93ms
q(): 78ms
q.add: 44ms
q.size: 0ms
q.remove: 29ms
q.offer: 51ms
q.poll: 31ms
q.drainTo: 122ms
{noformat}
As you can see, the ArrayDeque is faster or on-par with everything else on
almost every method. These results are [backed
up|http://stackoverflow.com/questions/6129805/what-is-the-fastest-java-collection-with-the-basic-functionality-of-a-queue]
by others as well.
Yet, switching SystemConsumer.poll to return a Queue, and using ArrayDeque in
BlockingEnvelopeMap exhibits two odd behaviors:
# There is a "startup time" where the first few hundred thousand messages are
processed rather slowly (0-5s).
# The total messages/sec is the same, or slightly slower than the ArrayList
(~5%).
I added a bunch of counters to BlockingEnvelopeMap so we can understand what's
happening inside BlockingEnvelopeMap in this performance test:
{code}
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String
metricsGroupName) {
// ...
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.err.println("add: " + addCalls + ", drain: " + drainToCalls + ",
timeouts: " + timeoutCalls + ", blocking calls: " + timeRemainingCalls + ",
block on outstanding calls: " + blockOnOutstandingCalls + ", polls: " +
pollCalls + ", average queue size: " + (totalQueueItems / (float) pollsPerSsp)
+ ", ssp-queue-polls: " + sspQueuePolls + ", got messages: " + gotMessages);
}
});
}
// ...
int addCalls = 0;
int drainToCalls = 0;
int totalQueueItems = 0;
int pollCalls = 0;
int timeoutCalls = 0;
int timeRemainingCalls = 0;
int blockOnOutstandingCalls = 0;
int sspQueuePolls = 0;
int gotMessages = 0;
public Map<SystemStreamPartition, Queue<IncomingMessageEnvelope>>
poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws
InterruptedException {
long stopTime = clock.currentTimeMillis() + timeout;
Map<SystemStreamPartition, Queue<IncomingMessageEnvelope>> messagesToReturn
= new HashMap<SystemStreamPartition, Queue<IncomingMessageEnvelope>>();
metrics.incPoll();
++pollCalls;
for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
BlockingQueue<IncomingMessageEnvelope> queue =
bufferedMessages.get(systemStreamPartition);
Queue<IncomingMessageEnvelope> outgoingList = new
ArrayDeque<IncomingMessageEnvelope>(queue.size());
totalQueueItems += queue.size();
++sspQueuePolls;
if (queue.size() > 0) {
queue.drainTo(outgoingList);
++drainToCalls;
} else if (timeout != 0) {
IncomingMessageEnvelope envelope = null;
++timeoutCalls;
// How long we can legally block (if timeout > 0)
long timeRemaining = stopTime - clock.currentTimeMillis();
if (timeout == SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES) {
++blockOnOutstandingCalls;
// Block until we get at least one message, or until we catch up to
// the head of the stream.
while (envelope == null && !isAtHead(systemStreamPartition)) {
metrics.incBlockingPoll(systemStreamPartition);
envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
}
} else if (timeout > 0 && timeRemaining > 0) {
++timeRemainingCalls;
// Block until we get at least one message.
metrics.incBlockingTimeoutPoll(systemStreamPartition);
envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS);
}
// If we got a message, add it.
if (envelope != null) {
outgoingList.add(envelope);
++addCalls;
// Drain any remaining messages without blocking.
queue.drainTo(outgoingList);
++drainToCalls;
}
}
if (outgoingList.size() > 0) {
messagesToReturn.put(systemStreamPartition, outgoingList);
}
}
if (messagesToReturn.size() > 0) {
++gotMessages;
}
return messagesToReturn;
}
{code}
Running with the Queue/ArrayDeque implementation shows:
{noformat}
add: 0, drain: 4034, timeouts: 2895, blocking calls: 0, block on outstanding
calls: 0, polls: 43, average queue size: 1708.5803, ssp-queue-polls: 11683, got
messages: 43
{noformat}
Some interesting stats to note:
# Every time SystemConsumers polled BlockingEnvelopeMap, there were messages
available to process.
# SystemConsumers only polled BlockingEnvelopeMap 43 times over an 18s
execution window (43/18=2.38 times/sec). The rest of the time, there were no
empty SSPs in SystemConsumers, the entire BlockingEnvelopeMap.poll logic was
skipped.
# No blocking polls were made.
# Add was never called (because a blocking poll was never made).
Based on these metrics, only drainTo and size are called on the data structure,
both of which ArrayDeque seems to perform better at in my little test. Yet the
ArrayList in BlockingEnvelopeMap leads to faster performance in
TestSamzaContainerPerformance.
I also adjusted the test runtime, number of messages the MockSystemConsumer was
emitting, log verbosity, and number of MockSystemConsumer threads. In all
cases, the results were the same: ArrayList was slightly faster.
I must be missing something, but I don't know what. I think the better thing to
spend time on now is to investigate speeding up the OffsetManager.update call
in TaskInstance.process, which takes over half of the CPU time in the main
thread. I'll back off the ArrayList/ArrayDeque investigation for now.
> Improve SystemConsumers performance
> -----------------------------------
>
> Key: SAMZA-245
> URL: https://issues.apache.org/jira/browse/SAMZA-245
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.8.0
>
> Attachments: SAMZA-245-1.patch, SAMZA-245-3.patch, SAMZA-245-4.patch,
> SAMZA-245-5.patch, SAMZA-245-6.patch, SAMZA-245-7.patch, SAMZA-245.0.patch,
> org.apache.samza.test.performance.TestSamzaContainerPerformance.SAMZA-245-3.html,
> org.apache.samza.test.performance.TestSamzaContainerPerformance.master.html
>
>
> As part of SAMZA-220, a more radical patch was proposed. This patch appears
> to improve SystemConsumers' performance pretty significantly, while also
> reducing its complexity. The decision was made to move this change into the
> 0.8.0 release, rather than the 0.7.0 release, since it's a fairly risky
> change.
> This ticket is to explore updating SystemConsumers to eliminate almost all
> loops in order to increase performance in the Samza container.
--
This message was sent by Atlassian JIRA
(v6.2#6252)