JDK ArrayBlockingQueue is the slowest BlockingQueue implementation. On my machine passing 100000000 objects from one thread to another takes 28 seconds using ArrayBlockingQueue, 16 seconds using CircularBuffer and 0.1 seconds using SpscArrayQueue. In all cases queue capacity was around 65000 elements. I guess we should be able to pass data 500 times faster compared to loopback interface.

Thank you,

Vlad

On 11/14/15 14:40, Munagala Ramanath wrote:
I was curious about this so I wrote a couple of small standalone
programs, one using the loopback
interface and one using ArrayBlockingQueue to send byte arrays from
one thread to another.

The loopback version is about 50% slower than the ArrayBlockingQueue:

Reader: read 4000000 tuples of size 128 in 3893 ms    (ArrayBlockingQueue)
Writer: wrote 4000000 tuples of size 128 in 3893 ms

Writer: wrote 4000000 tuples of size 128 in 5783 ms    (Loopback)
Reader: read 4000000 tuples of size 128 in 5820 ms

Ram

On Fri, Nov 13, 2015 at 11:19 AM, Isha Arkatkar <[email protected]> wrote:
Hi all,

    For APEX-259 (https://malhar.atlassian.net/browse/APEX-259), I am
exploring option of passing serialized tuples from publisher to buffer
server through a blocking queue.

Right now, publisher and buffer server reside within the same container,
however, communication between the two goes though sockets. We want to
check if we get any performance benefit by changing this communication to
queue-based one.

This is in exploration phase right now, but if we do see improvement, we
may want to provide it as a pluggable option.

Please let me know your thoughts!

Thanks,
Isha

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package com.datatorrent.stram.engine;

import java.util.concurrent.ArrayBlockingQueue;

import org.jctools.queues.SpscArrayQueue;
import org.junit.Test;

import com.datatorrent.netlet.util.CircularBuffer;

public class BlockingQueueTest
{
  private static final int COUNT = 100000000;
  @Test
  public void testBlockingQueuePerformance()
  {
    final ArrayBlockingQueue<Object> blockingQueue = new 
ArrayBlockingQueue<>(65000);
    new Thread(
      new Runnable()
      {
        @Override
        public void run()
        {
          final Object o = new Byte[128];
          try {
            for (int i = 0; i < COUNT; i++) {
              blockingQueue.put(o);
            }
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    ).start();

    try {
      for (int i = 0; i < COUNT; i++) {
        blockingQueue.take();
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  @Test
  public void testSpscQueuePerformance()
  {
    final SpscArrayQueue<Object> spscArrayQueue = new SpscArrayQueue<>(65000);
    new Thread(
      new Runnable()
      {
        @Override
        public void run()
        {
          final Object o = new Byte[128];
          try {
            for (int i = 0; i < COUNT; i++) {
              while (!spscArrayQueue.offer(o)) {
                Thread.sleep(10);
              }
            }
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    ).start();

    try {
      for (int i = 0; i < COUNT; i++) {
        while (spscArrayQueue.peek() == null) {
          Thread.sleep(10);
        }
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  @Test
  public void testCircularBufferPerformance()
  {
    final CircularBuffer<Object> circularBuffer = new 
CircularBuffer<Object>(65000);
    new Thread(
      new Runnable()
      {
        @Override
        public void run()
        {
          final Object o = new Byte[128];
          try {
            for (int i = 0; i < COUNT; i++) {
              circularBuffer.put(o);
            }
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    ).start();

    try {
      for (int i = 0; i < COUNT; i++) {
        circularBuffer.take();
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Reply via email to