http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
new file mode 100644
index 0000000..10c6b24
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.memory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.common.memory.ConcurrentFramePool;
+import org.apache.asterix.common.memory.FrameAction;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class ConcurrentFramePoolUnitTest extends TestCase {
+
+    private static final int DEFAULT_FRAME_SIZE = 32768;
+    private static final int NUM_FRAMES = 2048;
+    private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * 
NUM_FRAMES;
+    private static final int NUM_THREADS = 8;
+    private static final int MAX_SIZE = 52;
+    private static final double RELEASE_PROBABILITY = 0.20;
+    private volatile static HyracksDataException cause = null;
+
+    public ConcurrentFramePoolUnitTest(String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(ConcurrentFramePoolUnitTest.class);
+    }
+
+    @org.junit.Test
+    public void testMemoryManager() {
+        AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+        
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+        ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
+                DEFAULT_FRAME_SIZE);
+        int i = 0;
+        while (fmm.get() != null) {
+            i++;
+        }
+        Assert.assertEquals(i, NUM_FRAMES);
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testConcurrentMemoryManager() {
+        try {
+            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
+            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
+            FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new 
FixedSizeAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int i = 0;
+            for (FixedSizeAllocator allocator : runners) {
+                i += allocator.getAllocated();
+            }
+            Assert.assertEquals(NUM_FRAMES, i);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
+            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
+            Random random = new Random();
+            int i = 0;
+            int req;
+            while (true) {
+                req = random.nextInt(MAX_SIZE) + 1;
+                if (req == 1) {
+                    if (fmm.get() != null) {
+                        i += 1;
+                    } else {
+                        break;
+                    }
+                } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
+                    i += req;
+                } else {
+                    break;
+                }
+            }
+
+            Assert.assertEquals(i <= NUM_FRAMES, true);
+            Assert.assertEquals(i + req > NUM_FRAMES, true);
+            Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testConcurrentVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
+            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
+
+            VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new 
VarSizeAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int allocated = 0;
+            for (int i = 0; i < threads.length; i++) {
+                if (runners[i].cause() != null) {
+                    runners[i].cause().printStackTrace();
+                    Assert.fail(runners[i].cause().getMessage());
+                }
+                allocated += runners[i].getAllocated();
+            }
+            Assert.assertEquals(allocated <= NUM_FRAMES, true);
+            for (int i = 0; i < threads.length; i++) {
+                Assert.assertEquals(allocated + runners[i].getLastReq() > 
NUM_FRAMES, true);
+            }
+            Assert.assertEquals(allocated + fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testAcquireReleaseMemoryManager() throws HyracksDataException {
+        AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+        
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+        ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
+                DEFAULT_FRAME_SIZE);
+        Random random = new Random();
+        ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+        while (true) {
+            if (random.nextDouble() < RELEASE_PROBABILITY) {
+                if (!stack.isEmpty()) {
+                    fmm.release(stack.pop());
+                }
+            } else {
+                ByteBuffer buffer = fmm.get();
+                if (buffer == null) {
+                    break;
+                } else {
+                    stack.push(buffer);
+                }
+            }
+        }
+        Assert.assertEquals(stack.size(), NUM_FRAMES);
+        Assert.assertEquals(fmm.remaining(), 0);
+        for (ByteBuffer buffer : stack) {
+            fmm.release(buffer);
+        }
+        stack.clear();
+        Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testConcurrentAcquireReleaseMemoryManager() {
+        try {
+            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
+            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
+            FixedSizeGoodAllocator[] runners = new 
FixedSizeGoodAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new 
FixedSizeGoodAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int i = 0;
+            for (FixedSizeGoodAllocator allocator : runners) {
+                i += allocator.getAllocated();
+            }
+            Assert.assertEquals(NUM_FRAMES, i);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testAcquireReleaseVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
+            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
+            Random random = new Random();
+            ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+            int i = 0;
+            int req;
+            while (true) {
+                // release
+                if (random.nextDouble() < RELEASE_PROBABILITY) {
+                    if (!stack.isEmpty()) {
+                        ByteBuffer buffer = stack.pop();
+                        i -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
+                        fmm.release(buffer);
+                    }
+                } else {
+                    // acquire
+                    req = random.nextInt(MAX_SIZE) + 1;
+                    if (req == 1) {
+                        ByteBuffer buffer = fmm.get();
+                        if (buffer != null) {
+                            stack.push(buffer);
+                            i += 1;
+                        } else {
+                            break;
+                        }
+                    } else {
+                        ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
+                        if (buffer != null) {
+                            stack.push(buffer);
+                            i += req;
+                        } else {
+                            break;
+                        }
+                    }
+                }
+            }
+
+            Assert.assertEquals(i <= NUM_FRAMES, true);
+            Assert.assertEquals(i + req > NUM_FRAMES, true);
+            Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    @org.junit.Test
+    public void testConcurrentAcquireReleaseVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
+            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
+            VarSizeGoodAllocator[] runners = new 
VarSizeGoodAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new 
VarSizeGoodAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int i = 0;
+            for (VarSizeGoodAllocator allocator : runners) {
+                if (allocator.cause() != null) {
+                    allocator.cause().printStackTrace();
+                    Assert.fail(allocator.cause().getMessage());
+                }
+                i += allocator.getAllocated();
+            }
+            Assert.assertEquals(NUM_FRAMES, i + fmm.remaining());
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    @org.junit.Test
+    public void testFixedSizeSubscribtion() {
+        try {
+            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
+            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
+            int i = 0;
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            LinkedBlockingDeque<ByteBuffer> buffers = new 
LinkedBlockingDeque<>();
+            FrameAction frameAction = new FrameAction();
+            frameAction.setFrame(buffer);
+            while (!fmm.subscribe(frameAction)) {
+                buffers.put(frameAction.retrieve());
+                i++;
+            }
+            // One subscriber.
+            // Check that all frames have been consumed
+            Assert.assertEquals(i, NUM_FRAMES);
+            // Release a frame (That will be handed out to the subscriber)
+            fmm.release(buffers.take());
+            // Check that all frames have been consumed (since the released 
frame have been handed to the consumer)
+            Assert.assertEquals(0, fmm.remaining());
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    @org.junit.Test
+    public void testLargerThanBudgetRequests() {
+        HyracksDataException hde = null;
+        try {
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+            fmm.get(32 * DEFAULT_FRAME_SIZE);
+        } catch (HyracksDataException e) {
+            hde = e;
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNotNull(hde);
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testLargerThanBudgetSubscribe() {
+        HyracksDataException hde = null;
+        try {
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 32);
+            FrameAction frameAction = new FrameAction();
+            frameAction.setFrame(buffer);
+            fmm.subscribe(frameAction);
+        } catch (HyracksDataException e) {
+            hde = e;
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNotNull(hde);
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testgetWhileSubscribersExist() {
+        try {
+            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
+            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
+            int i = 0;
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            LinkedBlockingDeque<ByteBuffer> buffers = new 
LinkedBlockingDeque<>();
+            FrameAction frameAction = new FrameAction();
+            frameAction.setFrame(buffer);
+            while (!fmm.subscribe(frameAction)) {
+                buffers.put(frameAction.retrieve());
+                i++;
+            }
+            // One subscriber.
+            // Check that all frames have been consumed
+            Assert.assertEquals(i, NUM_FRAMES);
+            // Release a frame (That will be handed out to the subscriber)
+            fmm.release(buffers.take());
+            // Check that all frames have been consumed (since the released 
frame have been handed to the consumer)
+            Assert.assertEquals(fmm.remaining(), 0);
+            buffers.put(frameAction.retrieve());
+            // Create another subscriber that takes frames of double the size
+            ByteBuffer bufferTimes2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 
2);
+            LinkedBlockingDeque<ByteBuffer> buffersTimes2 = new 
LinkedBlockingDeque<>();
+            FrameAction frameActionTimes2 = new FrameAction();
+            frameActionTimes2.setFrame(bufferTimes2);
+            Assert.assertEquals(true, fmm.subscribe(frameActionTimes2));
+            // release a small one
+            fmm.release(buffers.take());
+            Assert.assertEquals(fmm.remaining(), 1);
+            // Check that a small get fails
+            Assert.assertEquals(null, fmm.get());
+            // release another small one
+            fmm.release(buffers.take());
+            // Check that no small frames exists in the pool since subscriber 
request was satisfied
+            Assert.assertEquals(fmm.remaining(), 0);
+            buffersTimes2.add(frameActionTimes2.retrieve());
+            fmm.release(buffers);
+            fmm.release(bufferTimes2);
+            Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    /*
+     * Runnables used for unit tests
+     */
+    private class FixedSizeAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private int allocated = 0;
+
+        public FixedSizeAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return allocated;
+        }
+
+        @Override
+        public void run() {
+            while (fmm.get() != null) {
+                allocated++;
+            }
+        }
+    }
+
+    private class FixedSizeGoodAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+        private final Random random = new Random();
+
+        public FixedSizeGoodAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return stack.size();
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                if (random.nextDouble() < RELEASE_PROBABILITY) {
+                    if (!stack.isEmpty()) {
+                        try {
+                            fmm.release(stack.pop());
+                        } catch (HyracksDataException e) {
+                            e.printStackTrace();
+                            cause = e;
+                        }
+                    }
+                } else {
+                    ByteBuffer buffer = fmm.get();
+                    if (buffer == null) {
+                        break;
+                    } else {
+                        stack.push(buffer);
+                    }
+                }
+            }
+        }
+    }
+
+    private class VarSizeGoodAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private int allocated = 0;
+        private int req = 0;
+        private final Random random = new Random();
+        private Throwable cause;
+        private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+
+        public VarSizeGoodAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return allocated;
+        }
+
+        public Throwable cause() {
+            return cause;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    if (random.nextDouble() < RELEASE_PROBABILITY) {
+                        if (!stack.isEmpty()) {
+                            ByteBuffer buffer = stack.pop();
+                            allocated -= (buffer.capacity() / 
DEFAULT_FRAME_SIZE);
+                            fmm.release(buffer);
+                        }
+                    } else {
+                        req = random.nextInt(MAX_SIZE) + 1;
+                        if (req == 1) {
+                            ByteBuffer buffer = fmm.get();
+                            if (buffer != null) {
+                                stack.push(buffer);
+                                allocated += 1;
+                            } else {
+                                break;
+                            }
+                        } else {
+                            ByteBuffer buffer = fmm.get(req * 
DEFAULT_FRAME_SIZE);
+                            if (buffer != null) {
+                                stack.push(buffer);
+                                allocated += req;
+                            } else {
+                                break;
+                            }
+                        }
+                    }
+                }
+            } catch (Throwable th) {
+                this.cause = th;
+            }
+        }
+    }
+
+    private class VarSizeAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private int allocated = 0;
+        private int req = 0;
+        private final Random random = new Random();
+        private Throwable cause;
+
+        public VarSizeAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return allocated;
+        }
+
+        public int getLastReq() {
+            return req;
+        }
+
+        public Throwable cause() {
+            return cause;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    req = random.nextInt(MAX_SIZE) + 1;
+                    if (req == 1) {
+                        if (fmm.get() != null) {
+                            allocated += 1;
+                        } else {
+                            break;
+                        }
+                    } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
+                        allocated += req;
+                    } else {
+                        break;
+                    }
+                }
+            } catch (Throwable th) {
+                this.cause = th;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index d4e3641..cd04515 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -24,8 +24,8 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.ConcurrentFramePool;
-import org.apache.asterix.active.FrameAction;
+import org.apache.asterix.common.memory.ConcurrentFramePool;
+import org.apache.asterix.common.memory.FrameAction;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.util.FeedUtils.Mode;
@@ -73,11 +73,12 @@ public class FeedRuntimeInputHandler extends 
AbstractUnaryInputUnaryOutputOperat
             throws HyracksDataException {
         this.writer = writer;
 
-        this.spiller =
-                fpa.spillToDiskOnCongestion() ? new FrameSpiller(ctx,
+        this.spiller = fpa.spillToDiskOnCongestion()
+                ? new FrameSpiller(ctx,
                         connectionId.getFeedId() + "_" + 
connectionId.getDatasetName() + "_"
                                 + runtimeId.getRuntimeName() + "_" + 
runtimeId.getPartition(),
-                        fpa.getMaxSpillOnDisk()) : null;
+                        fpa.getMaxSpillOnDisk())
+                : null;
         this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
         this.fpa = fpa;
         this.framePool = framePool;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
deleted file mode 100644
index 0f6a2ea..0000000
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
+++ /dev/null
@@ -1,610 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.test;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Random;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.asterix.active.ConcurrentFramePool;
-import org.apache.asterix.active.FrameAction;
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.junit.Assert;
-import org.mockito.Mockito;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-public class ConcurrentFramePoolUnitTest extends TestCase {
-
-    private static final int DEFAULT_FRAME_SIZE = 32768;
-    private static final int NUM_FRAMES = 2048;
-    private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * 
NUM_FRAMES;
-    private static final int NUM_THREADS = 8;
-    private static final int MAX_SIZE = 52;
-    private static final double RELEASE_PROBABILITY = 0.20;
-    private volatile static HyracksDataException cause = null;
-
-    public ConcurrentFramePoolUnitTest(String testName) {
-        super(testName);
-    }
-
-    /**
-     * @return the suite of tests being tested
-     */
-    public static Test suite() {
-        return new TestSuite(ConcurrentFramePoolUnitTest.class);
-    }
-
-    @org.junit.Test
-    public void testMemoryManager() {
-        AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
-        
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-        ConcurrentFramePool fmm =
-                new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-        int i = 0;
-        while (fmm.get() != null) {
-            i++;
-        }
-        Assert.assertEquals(i, NUM_FRAMES);
-        Assert.assertNull(cause);
-    }
-
-    @org.junit.Test
-    public void testConcurrentMemoryManager() {
-        try {
-            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
-            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS];
-            Thread[] threads = new Thread[NUM_THREADS];
-            Arrays.parallelSetAll(runners, (int i) -> new 
FixedSizeAllocator(fmm));
-            for (int i = 0; i < threads.length; i++) {
-                threads[i] = new Thread(runners[i]);
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].start();
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].join();
-            }
-            int i = 0;
-            for (FixedSizeAllocator allocator : runners) {
-                i += allocator.getAllocated();
-            }
-            Assert.assertEquals(NUM_FRAMES, i);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-        Assert.assertNull(cause);
-    }
-
-    @org.junit.Test
-    public void testVarSizeMemoryManager() {
-        try {
-            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
-            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            Random random = new Random();
-            int i = 0;
-            int req;
-            while (true) {
-                req = random.nextInt(MAX_SIZE) + 1;
-                if (req == 1) {
-                    if (fmm.get() != null) {
-                        i += 1;
-                    } else {
-                        break;
-                    }
-                } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
-                    i += req;
-                } else {
-                    break;
-                }
-            }
-
-            Assert.assertEquals(i <= NUM_FRAMES, true);
-            Assert.assertEquals(i + req > NUM_FRAMES, true);
-            Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-        Assert.assertNull(cause);
-    }
-
-    @org.junit.Test
-    public void testConcurrentVarSizeMemoryManager() {
-        try {
-            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
-            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-
-            VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS];
-            Thread[] threads = new Thread[NUM_THREADS];
-            Arrays.parallelSetAll(runners, (int i) -> new 
VarSizeAllocator(fmm));
-            for (int i = 0; i < threads.length; i++) {
-                threads[i] = new Thread(runners[i]);
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].start();
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].join();
-            }
-            int allocated = 0;
-            for (int i = 0; i < threads.length; i++) {
-                if (runners[i].cause() != null) {
-                    runners[i].cause().printStackTrace();
-                    Assert.fail(runners[i].cause().getMessage());
-                }
-                allocated += runners[i].getAllocated();
-            }
-            Assert.assertEquals(allocated <= NUM_FRAMES, true);
-            for (int i = 0; i < threads.length; i++) {
-                Assert.assertEquals(allocated + runners[i].getLastReq() > 
NUM_FRAMES, true);
-            }
-            Assert.assertEquals(allocated + fmm.remaining(), NUM_FRAMES);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-        Assert.assertNull(cause);
-    }
-
-    @org.junit.Test
-    public void testAcquireReleaseMemoryManager() throws HyracksDataException {
-        AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
-        
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-        ConcurrentFramePool fmm =
-                new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-        Random random = new Random();
-        ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
-        while (true) {
-            if (random.nextDouble() < RELEASE_PROBABILITY) {
-                if (!stack.isEmpty()) {
-                    fmm.release(stack.pop());
-                }
-            } else {
-                ByteBuffer buffer = fmm.get();
-                if (buffer == null) {
-                    break;
-                } else {
-                    stack.push(buffer);
-                }
-            }
-        }
-        Assert.assertEquals(stack.size(), NUM_FRAMES);
-        Assert.assertEquals(fmm.remaining(), 0);
-        for (ByteBuffer buffer : stack) {
-            fmm.release(buffer);
-        }
-        stack.clear();
-        Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
-        Assert.assertNull(cause);
-    }
-
-    @org.junit.Test
-    public void testConcurrentAcquireReleaseMemoryManager() {
-        try {
-            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
-            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            FixedSizeGoodAllocator[] runners = new 
FixedSizeGoodAllocator[NUM_THREADS];
-            Thread[] threads = new Thread[NUM_THREADS];
-            Arrays.parallelSetAll(runners, (int i) -> new 
FixedSizeGoodAllocator(fmm));
-            for (int i = 0; i < threads.length; i++) {
-                threads[i] = new Thread(runners[i]);
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].start();
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].join();
-            }
-            int i = 0;
-            for (FixedSizeGoodAllocator allocator : runners) {
-                i += allocator.getAllocated();
-            }
-            Assert.assertEquals(NUM_FRAMES, i);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-        Assert.assertNull(cause);
-    }
-
-    @org.junit.Test
-    public void testAcquireReleaseVarSizeMemoryManager() {
-        try {
-            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
-            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            Random random = new Random();
-            ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
-            int i = 0;
-            int req;
-            while (true) {
-                // release
-                if (random.nextDouble() < RELEASE_PROBABILITY) {
-                    if (!stack.isEmpty()) {
-                        ByteBuffer buffer = stack.pop();
-                        i -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
-                        fmm.release(buffer);
-                    }
-                } else {
-                    // acquire
-                    req = random.nextInt(MAX_SIZE) + 1;
-                    if (req == 1) {
-                        ByteBuffer buffer = fmm.get();
-                        if (buffer != null) {
-                            stack.push(buffer);
-                            i += 1;
-                        } else {
-                            break;
-                        }
-                    } else {
-                        ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
-                        if (buffer != null) {
-                            stack.push(buffer);
-                            i += req;
-                        } else {
-                            break;
-                        }
-                    }
-                }
-            }
-
-            Assert.assertEquals(i <= NUM_FRAMES, true);
-            Assert.assertEquals(i + req > NUM_FRAMES, true);
-            Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        } finally {
-            Assert.assertNull(cause);
-        }
-    }
-
-    @org.junit.Test
-    public void testConcurrentAcquireReleaseVarSizeMemoryManager() {
-        try {
-            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
-            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            VarSizeGoodAllocator[] runners = new 
VarSizeGoodAllocator[NUM_THREADS];
-            Thread[] threads = new Thread[NUM_THREADS];
-            Arrays.parallelSetAll(runners, (int i) -> new 
VarSizeGoodAllocator(fmm));
-            for (int i = 0; i < threads.length; i++) {
-                threads[i] = new Thread(runners[i]);
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].start();
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].join();
-            }
-            int i = 0;
-            for (VarSizeGoodAllocator allocator : runners) {
-                if (allocator.cause() != null) {
-                    allocator.cause().printStackTrace();
-                    Assert.fail(allocator.cause().getMessage());
-                }
-                i += allocator.getAllocated();
-            }
-            Assert.assertEquals(NUM_FRAMES, i + fmm.remaining());
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        } finally {
-            Assert.assertNull(cause);
-        }
-    }
-
-    @org.junit.Test
-    public void testFixedSizeSubscribtion() {
-        try {
-            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
-            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            int i = 0;
-            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
-            LinkedBlockingDeque<ByteBuffer> buffers = new 
LinkedBlockingDeque<>();
-            FrameAction frameAction = new FrameAction();
-            frameAction.setFrame(buffer);
-            while (!fmm.subscribe(frameAction)) {
-                buffers.put(frameAction.retrieve());
-                i++;
-            }
-            // One subscriber.
-            // Check that all frames have been consumed
-            Assert.assertEquals(i, NUM_FRAMES);
-            // Release a frame (That will be handed out to the subscriber)
-            fmm.release(buffers.take());
-            // Check that all frames have been consumed (since the released 
frame have been handed to the consumer)
-            Assert.assertEquals(0, fmm.remaining());
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        } finally {
-            Assert.assertNull(cause);
-        }
-    }
-
-    @org.junit.Test
-    public void testLargerThanBudgetRequests() {
-        HyracksDataException hde = null;
-        try {
-            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
-            fmm.get(32 * DEFAULT_FRAME_SIZE);
-        } catch (HyracksDataException e) {
-            hde = e;
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-        Assert.assertNotNull(hde);
-        Assert.assertNull(cause);
-    }
-
-    @org.junit.Test
-    public void testLargerThanBudgetSubscribe() {
-        HyracksDataException hde = null;
-        try {
-            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
-            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 32);
-            FrameAction frameAction = new FrameAction();
-            frameAction.setFrame(buffer);
-            fmm.subscribe(frameAction);
-        } catch (HyracksDataException e) {
-            hde = e;
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-        Assert.assertNotNull(hde);
-        Assert.assertNull(cause);
-    }
-
-    @org.junit.Test
-    public void testgetWhileSubscribersExist() {
-        try {
-            AsterixFeedProperties afp = 
Mockito.mock(AsterixFeedProperties.class);
-            
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            int i = 0;
-            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
-            LinkedBlockingDeque<ByteBuffer> buffers = new 
LinkedBlockingDeque<>();
-            FrameAction frameAction = new FrameAction();
-            frameAction.setFrame(buffer);
-            while (!fmm.subscribe(frameAction)) {
-                buffers.put(frameAction.retrieve());
-                i++;
-            }
-            // One subscriber.
-            // Check that all frames have been consumed
-            Assert.assertEquals(i, NUM_FRAMES);
-            // Release a frame (That will be handed out to the subscriber)
-            fmm.release(buffers.take());
-            // Check that all frames have been consumed (since the released 
frame have been handed to the consumer)
-            Assert.assertEquals(fmm.remaining(), 0);
-            buffers.put(frameAction.retrieve());
-            // Create another subscriber that takes frames of double the size
-            ByteBuffer bufferTimes2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 
2);
-            LinkedBlockingDeque<ByteBuffer> buffersTimes2 = new 
LinkedBlockingDeque<>();
-            FrameAction frameActionTimes2 = new FrameAction();
-            frameActionTimes2.setFrame(bufferTimes2);
-            Assert.assertEquals(true, fmm.subscribe(frameActionTimes2));
-            // release a small one
-            fmm.release(buffers.take());
-            Assert.assertEquals(fmm.remaining(), 1);
-            // Check that a small get fails
-            Assert.assertEquals(null, fmm.get());
-            // release another small one
-            fmm.release(buffers.take());
-            // Check that no small frames exists in the pool since subscriber 
request was satisfied
-            Assert.assertEquals(fmm.remaining(), 0);
-            buffersTimes2.add(frameActionTimes2.retrieve());
-            fmm.release(buffers);
-            fmm.release(bufferTimes2);
-            Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        } finally {
-            Assert.assertNull(cause);
-        }
-    }
-
-    /*
-     * Runnables used for unit tests
-     */
-    private class FixedSizeAllocator implements Runnable {
-        private final ConcurrentFramePool fmm;
-        private int allocated = 0;
-
-        public FixedSizeAllocator(ConcurrentFramePool fmm) {
-            this.fmm = fmm;
-        }
-
-        public int getAllocated() {
-            return allocated;
-        }
-
-        @Override
-        public void run() {
-            while (fmm.get() != null) {
-                allocated++;
-            }
-        }
-    }
-
-    private class FixedSizeGoodAllocator implements Runnable {
-        private final ConcurrentFramePool fmm;
-        private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
-        private final Random random = new Random();
-
-        public FixedSizeGoodAllocator(ConcurrentFramePool fmm) {
-            this.fmm = fmm;
-        }
-
-        public int getAllocated() {
-            return stack.size();
-        }
-
-        @Override
-        public void run() {
-            while (true) {
-                if (random.nextDouble() < RELEASE_PROBABILITY) {
-                    if (!stack.isEmpty()) {
-                        try {
-                            fmm.release(stack.pop());
-                        } catch (HyracksDataException e) {
-                            e.printStackTrace();
-                            cause = e;
-                        }
-                    }
-                } else {
-                    ByteBuffer buffer = fmm.get();
-                    if (buffer == null) {
-                        break;
-                    } else {
-                        stack.push(buffer);
-                    }
-                }
-            }
-        }
-    }
-
-    private class VarSizeGoodAllocator implements Runnable {
-        private final ConcurrentFramePool fmm;
-        private int allocated = 0;
-        private int req = 0;
-        private final Random random = new Random();
-        private Throwable cause;
-        private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
-
-        public VarSizeGoodAllocator(ConcurrentFramePool fmm) {
-            this.fmm = fmm;
-        }
-
-        public int getAllocated() {
-            return allocated;
-        }
-
-        public Throwable cause() {
-            return cause;
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    if (random.nextDouble() < RELEASE_PROBABILITY) {
-                        if (!stack.isEmpty()) {
-                            ByteBuffer buffer = stack.pop();
-                            allocated -= (buffer.capacity() / 
DEFAULT_FRAME_SIZE);
-                            fmm.release(buffer);
-                        }
-                    } else {
-                        req = random.nextInt(MAX_SIZE) + 1;
-                        if (req == 1) {
-                            ByteBuffer buffer = fmm.get();
-                            if (buffer != null) {
-                                stack.push(buffer);
-                                allocated += 1;
-                            } else {
-                                break;
-                            }
-                        } else {
-                            ByteBuffer buffer = fmm.get(req * 
DEFAULT_FRAME_SIZE);
-                            if (buffer != null) {
-                                stack.push(buffer);
-                                allocated += req;
-                            } else {
-                                break;
-                            }
-                        }
-                    }
-                }
-            } catch (Throwable th) {
-                this.cause = th;
-            }
-        }
-    }
-
-    private class VarSizeAllocator implements Runnable {
-        private final ConcurrentFramePool fmm;
-        private int allocated = 0;
-        private int req = 0;
-        private final Random random = new Random();
-        private Throwable cause;
-
-        public VarSizeAllocator(ConcurrentFramePool fmm) {
-            this.fmm = fmm;
-        }
-
-        public int getAllocated() {
-            return allocated;
-        }
-
-        public int getLastReq() {
-            return req;
-        }
-
-        public Throwable cause() {
-            return cause;
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    req = random.nextInt(MAX_SIZE) + 1;
-                    if (req == 1) {
-                        if (fmm.get() != null) {
-                            allocated += 1;
-                        } else {
-                            break;
-                        }
-                    } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
-                        allocated += req;
-                    } else {
-                        break;
-                    }
-                }
-            } catch (Throwable th) {
-                this.cause = th;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index e643206..171d271 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -26,8 +26,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.ConcurrentFramePool;
 import org.apache.asterix.active.EntityId;
+import org.apache.asterix.common.memory.ConcurrentFramePool;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -114,11 +114,11 @@ public class InputHandlerTest extends TestCase {
             Random random = new Random();
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // No spill, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, false, NUM_FRAMES * 
DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, 
NUM_FRAMES * DEFAULT_FRAME_SIZE,
+                    DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer =
-                    FrameWriterTestUtils.create(Collections.emptyList(), 
Collections.emptyList(), false);
+            TestFrameWriter writer = 
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
+                    false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
0, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
@@ -156,11 +156,11 @@ public class InputHandlerTest extends TestCase {
             int numRounds = 10;
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // No spill, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, false, NUM_FRAMES * 
DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, 
NUM_FRAMES * DEFAULT_FRAME_SIZE,
+                    DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer =
-                    FrameWriterTestUtils.create(Collections.emptyList(), 
Collections.emptyList(), false);
+            TestFrameWriter writer = 
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
+                    false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
0, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
@@ -205,14 +205,14 @@ public class InputHandlerTest extends TestCase {
             int totalMinFrames = 0;
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // Spill budget = Memory budget, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * 
numberOfSpillFrames, DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, 
DEFAULT_FRAME_SIZE * numberOfSpillFrames,
+                    DISCARD_ALLOWANCE);
             // Non-Active Writer
             TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool =
-                    new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * 
DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
numberOfMemoryFrames * DEFAULT_FRAME_SIZE,
+                    DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
             handler.open();
             ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
@@ -275,8 +275,8 @@ public class InputHandlerTest extends TestCase {
             Assert.assertEquals(0, handler.getNumDiscarded());
             // We can only discard one frame
             double numDiscarded = 0;
-            boolean nextShouldDiscard =
-                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= 
fpa.getMaxFractionDiscard();
+            boolean nextShouldDiscard = ((numDiscarded + 1.0) / 
(handler.getTotal() + 1.0)) <= fpa
+                    .getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(buffer5);
                 numDiscarded++;
@@ -315,14 +315,14 @@ public class InputHandlerTest extends TestCase {
             int numberOfSpillFrames = 50;
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // Spill budget = Memory budget, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * 
numberOfSpillFrames, DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, 
DEFAULT_FRAME_SIZE * numberOfSpillFrames,
+                    DISCARD_ALLOWANCE);
             // Non-Active Writer
             TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool =
-                    new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * 
DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
numberOfMemoryFrames * DEFAULT_FRAME_SIZE,
+                    DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
             handler.open();
             VSizeFrame frame = new VSizeFrame(ctx);
@@ -345,8 +345,8 @@ public class InputHandlerTest extends TestCase {
             Assert.assertEquals(0, handler.getNumDiscarded());
             // We can only discard one frame
             double numDiscarded = 0;
-            boolean nextShouldDiscard =
-                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= 
fpa.getMaxFractionDiscard();
+            boolean nextShouldDiscard = ((numDiscarded + 1.0) / 
(handler.getTotal() + 1.0)) <= fpa
+                    .getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(frame.getBuffer());
                 numDiscarded++;
@@ -394,8 +394,8 @@ public class InputHandlerTest extends TestCase {
             TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool =
-                    new ConcurrentFramePool(NODE_ID, discardTestFrames * 
DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
discardTestFrames * DEFAULT_FRAME_SIZE,
+                    DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
             handler.open();
             // add NUM_FRAMES times
@@ -411,8 +411,8 @@ public class InputHandlerTest extends TestCase {
             }
             // Next call should NOT block but should discard.
             double numDiscarded = 0.0;
-            boolean nextShouldDiscard =
-                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= 
fpa.getMaxFractionDiscard();
+            boolean nextShouldDiscard = ((numDiscarded + 1.0) / 
(handler.getTotal() + 1.0)) <= fpa
+                    .getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(buffer);
                 numDiscarded++;
@@ -456,8 +456,8 @@ public class InputHandlerTest extends TestCase {
             TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool =
-                    new ConcurrentFramePool(NODE_ID, discardTestFrames * 
DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
discardTestFrames * DEFAULT_FRAME_SIZE,
+                    DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
             handler.open();
             VSizeFrame frame = new VSizeFrame(ctx);
@@ -467,8 +467,8 @@ public class InputHandlerTest extends TestCase {
             }
             // Next 5 calls call should NOT block but should discard.
             double numDiscarded = 0.0;
-            boolean nextShouldDiscard =
-                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= 
fpa.getMaxFractionDiscard();
+            boolean nextShouldDiscard = ((numDiscarded + 1.0) / 
(handler.getTotal() + 1.0)) <= fpa
+                    .getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(frame.getBuffer());
                 numDiscarded++;
@@ -507,8 +507,8 @@ public class InputHandlerTest extends TestCase {
         try {
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // Spill budget = Memory budget, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * 
NUM_FRAMES, DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, 
DEFAULT_FRAME_SIZE * NUM_FRAMES,
+                    DISCARD_ALLOWANCE);
             // Non-Active Writer
             TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
@@ -554,8 +554,8 @@ public class InputHandlerTest extends TestCase {
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 
0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer =
-                    FrameWriterTestUtils.create(Collections.emptyList(), 
Collections.emptyList(), false);
+            TestFrameWriter writer = 
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
+                    false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
@@ -595,8 +595,8 @@ public class InputHandlerTest extends TestCase {
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 
0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer =
-                    FrameWriterTestUtils.create(Collections.emptyList(), 
Collections.emptyList(), false);
+            TestFrameWriter writer = 
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
+                    false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
@@ -683,8 +683,8 @@ public class InputHandlerTest extends TestCase {
                 Random random = new Random();
                 IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
                 // Spill budget = Memory budget, No discard
-                FeedPolicyAccessor fpa =
-                        createFeedPolicyAccessor(true, false, 
DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+                FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, 
DEFAULT_FRAME_SIZE * NUM_FRAMES,
+                        DISCARD_ALLOWANCE);
                 // Non-Active Writer
                 TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
                 writer.freeze();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
index bf103fc..e7b15a2 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
@@ -33,6 +33,7 @@ import 
org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.config.AsterixStorageProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
+import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -62,6 +63,7 @@ public class AsterixAppContextInfo implements 
IAsterixApplicationContextInfo, IA
     private AsterixBuildProperties buildProperties;
     private AsterixReplicationProperties replicationProperties;
     private AsterixExtensionProperties extensionProperties;
+    private MessagingProperties messagingProperties;
     private final IGlobalRecoveryMaanger globalRecoveryMaanger;
     private IHyracksClientConnection hcc;
     private final ILibraryManager libraryManager;
@@ -92,10 +94,11 @@ public class AsterixAppContextInfo implements 
IAsterixApplicationContextInfo, IA
         INSTANCE.txnProperties = new 
AsterixTransactionProperties(propertiesAccessor);
         INSTANCE.feedProperties = new 
AsterixFeedProperties(propertiesAccessor);
         INSTANCE.extensionProperties = new 
AsterixExtensionProperties(propertiesAccessor);
-        INSTANCE.replicationProperties =
-                new AsterixReplicationProperties(propertiesAccessor, 
AsterixClusterProperties.INSTANCE.getCluster());
+        INSTANCE.replicationProperties = new 
AsterixReplicationProperties(propertiesAccessor,
+                AsterixClusterProperties.INSTANCE.getCluster());
         INSTANCE.hcc = hcc;
         INSTANCE.buildProperties = new 
AsterixBuildProperties(propertiesAccessor);
+        INSTANCE.messagingProperties = new 
MessagingProperties(propertiesAccessor);
         
Logger.getLogger("org.apache").setLevel(INSTANCE.externalProperties.getLogLevel());
     }
 
@@ -191,4 +194,9 @@ public class AsterixAppContextInfo implements 
IAsterixApplicationContextInfo, IA
     public AsterixExtensionProperties getExtensionProperties() {
         return extensionProperties;
     }
+
+    @Override
+    public MessagingProperties getMessagingProperties() {
+        return messagingProperties;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
index 5b29530..ab1ebe1 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
@@ -57,7 +57,7 @@ public class GlobalResourceIdFactory implements 
IResourceIdFactory, IApplication
             //if no response available or it has an exception, request a new 
one
             if (reponse == null || reponse.getException() != null) {
                 ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
-                ((INCMessageBroker) 
appCtx.getMessageBroker()).sendMessage(msg, this);
+                ((INCMessageBroker) 
appCtx.getMessageBroker()).sendMessageToCC(msg, this);
                 reponse = (ResourceIdRequestResponseMessage) 
resourceIdResponseQ.take();
                 if (reponse.getException() != null) {
                     throw new 
HyracksDataException(reponse.getException().getMessage());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java
index 77934c6..b1aa45f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.api.application;
 
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
 import org.apache.hyracks.api.context.IHyracksRootContext;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.resources.memory.IMemoryManager;
@@ -77,4 +78,19 @@ public interface INCApplicationContext extends 
IApplicationContext {
      * @param handler
      */
     public void setStateDumpHandler(IStateDumpHandler handler);
+
+    /**
+     * Set the application MessagingChannelInterfaceFactory
+     *
+     * @param interfaceFactory
+     */
+    public void setMessagingChannelInterfaceFactory(IChannelInterfaceFactory 
interfaceFactory);
+
+    /**
+     * Get the application MessagingChannelInterfaceFactory previously set by
+     * the {@link 
#setMessagingChannelInterfaceFactory(IChannelInterfaceFactory)} call.
+     *
+     * @return
+     */
+    public IChannelInterfaceFactory getMessagingChannelInterfaceFactory();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
index c41dafe..a79b955 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
@@ -33,12 +33,15 @@ public class NodeControllerInfo implements Serializable {
 
     private final NetworkAddress datasetNetworkAddress;
 
+    private final NetworkAddress messagingNetworkAddress;
+
     public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress 
netAddress,
-            NetworkAddress datasetNetworkAddress) {
+            NetworkAddress datasetNetworkAddress, NetworkAddress 
messagingNetworkAddress) {
         this.nodeId = nodeId;
         this.status = status;
         this.netAddress = netAddress;
         this.datasetNetworkAddress = datasetNetworkAddress;
+        this.messagingNetworkAddress = messagingNetworkAddress;
     }
 
     public String getNodeId() {
@@ -56,4 +59,8 @@ public class NodeControllerInfo implements Serializable {
     public NetworkAddress getDatasetNetworkAddress() {
         return datasetNetworkAddress;
     }
+
+    public NetworkAddress getMessagingNetworkAddress() {
+        return messagingNetworkAddress;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
new file mode 100644
index 0000000..585b6bd
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Accepts buffers.
+ *
+ * @author vinayakb
+ */
+@FunctionalInterface
+public interface IBufferAcceptor {
+    /**
+     * Accept a buffer.
+     *
+     * @param buffer
+     */
+    public void accept(ByteBuffer buffer);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
new file mode 100644
index 0000000..5b3a233
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface IBufferFactory {
+
+    public ByteBuffer createBuffer() throws HyracksDataException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
new file mode 100644
index 0000000..02de858
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+public interface IChannelControlBlock {
+
+    /**
+     * Get the read interface of this channel.
+     *
+     * @return the read interface.
+     */
+    public IChannelReadInterface getReadInterface();
+
+    /**
+     * Get the write interface of this channel.
+     *
+     * @return the write interface.
+     */
+    public IChannelWriteInterface getWriteInterface();
+
+    /**
+     * Add write credit to this channel.
+     *
+     * @param delta
+     *            number of bytes
+     */
+    public void addWriteCredits(int delta);
+
+    /**
+     * @return The channel's unique id within its ChannelSet.
+     */
+    public int getChannelId();
+
+    /**
+     * Add pending credit.
+     *
+     * @param credit
+     */
+    public void addPendingCredits(int credit);
+
+    /**
+     * Increments the pending write operations of this channel.
+     */
+    public void markPendingWrite();
+
+    /**
+     * Clears the pending write operations of this channel.
+     */
+    public void unmarkPendingWrite();
+
+    /**
+     * Sets a flag indicating this channel was closed locally.
+     */
+    public void reportLocalEOS();
+
+    /**
+     * A flag indicating if the channel was closed on the remote side.
+     *
+     * @return
+     */
+    public boolean isRemotelyClosed();
+
+    /**
+     * Complete the current write operation on this channel.
+     */
+    public void writeComplete();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelInterfaceFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelInterfaceFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelInterfaceFactory.java
new file mode 100644
index 0000000..d147fc7
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelInterfaceFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+public interface IChannelInterfaceFactory {
+
+    /**
+     * Creates {@link IChannelReadInterface} and assigns the passed
+     * {@link IChannelControlBlock} to it.
+     *
+     * @param ccb
+     * @return
+     */
+    public IChannelReadInterface createReadInterface(IChannelControlBlock ccb);
+
+    /**
+     * Creates {@link IChannelWriteInterface} and assigns the passed
+     * {@link IChannelControlBlock} to it.
+     *
+     * @param ccb
+     * @return
+     */
+    public IChannelWriteInterface createWriteInterface(IChannelControlBlock 
ccb);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
new file mode 100644
index 0000000..357d761
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hyracks.api.exceptions.NetException;
+
+/**
+ * Represents the read interface of a {@link IChannelControlBlock}.
+ *
+ * @author vinayakb
+ */
+public interface IChannelReadInterface {
+    /**
+     * Set the callback that will be invoked by the network layer when a 
buffer has been
+     * filled with data received from the remote side.
+     *
+     * @param fullBufferAcceptor
+     *            - the full buffer acceptor.
+     */
+    public void setFullBufferAcceptor(ICloseableBufferAcceptor 
fullBufferAcceptor);
+
+    /**
+     * Get the acceptor that collects empty buffers when the client has 
finished consuming
+     * a previously full buffer.
+     *
+     * @return the empty buffer acceptor.
+     */
+    public IBufferAcceptor getEmptyBufferAcceptor();
+
+    /**
+     * Set the buffer factory which is in charge of creating buffers if the 
request does not
+     * make the number of allocated buffers goes beyond limit
+     *
+     * @param bufferFactory
+     *            - the buffer factory
+     * @param limit
+     *            - the limit of buffers
+     * @param frameSize
+     *            - the size of each buffer
+     */
+    public void setBufferFactory(IBufferFactory bufferFactory, int limit, int 
frameSize);
+
+    /**
+     * Try to read as much as {@code size} bytes from {@code sc}
+     *
+     * @param sc
+     * @param size
+     * @return The number of read bytes.
+     * @throws IOException
+     * @throws NetException
+     */
+    public int read(SocketChannel sc, int size) throws IOException, 
NetException;
+
+    /**
+     * Sets the read credits of this {@link IChannelReadInterface}
+     *
+     * @param credits
+     */
+    public void setReadCredits(int credits);
+
+    /**
+     * @return The current read credits of this {@link IChannelReadInterface}
+     */
+    public int getCredits();
+
+    /**
+     * Forces the current read buffer to be flushed
+     */
+    public void flush();
+
+    /**
+     * @return The current full buffer acceptor of this {@link 
IChannelReadInterface}
+     */
+    public ICloseableBufferAcceptor getFullBufferAcceptor();
+
+    /**
+     * @return The buffer factory used by this {@link IChannelReadInterface}
+     */
+    public IBufferFactory getBufferFactory();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
new file mode 100644
index 0000000..993dd2c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+import org.apache.hyracks.api.exceptions.NetException;
+
+/**
+ * Represents the write interface of a {@link IChannelControlBlock}.
+ */
+public interface IChannelWriteInterface {
+    /**
+     * Set the callback interface that must be invoked when a full buffer has 
been emptied by
+     * writing the data to the remote end.
+     *
+     * @param emptyBufferAcceptor
+     *            - the empty buffer acceptor.
+     */
+    public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor);
+
+    /**
+     * Get the full buffer acceptor that accepts buffers filled with data that 
need to be written
+     * to the remote end.
+     *
+     * @return the full buffer acceptor.
+     */
+    public ICloseableBufferAcceptor getFullBufferAcceptor();
+
+    /**
+     * Set the buffer factory which is in charge of creating buffers if the 
request does not
+     * make the number of allocated buffers goes beyond limit
+     *
+     * @param bufferFactory
+     *            - the buffer factory
+     * @param limit
+     *            - the limit of buffers
+     * @param frameSize
+     *            - the size of each buffer
+     */
+    public void setBufferFactory(IBufferFactory bufferFactory, int limit, int 
frameSize);
+
+    /**
+     * Performs a pending write operation based on the current state of
+     * this {@link IChannelWriteInterface}
+     *
+     * @param writerState
+     * @throws NetException
+     */
+    public void write(IConnectionWriterState writerState) throws NetException;
+
+    /**
+     * Completes the current write operation on this {@link 
IChannelWriteInterface}
+     */
+    public void writeComplete();
+
+    /**
+     * Add credits to this this {@link IChannelWriteInterface}
+     *
+     * @param credit
+     */
+    public void addCredits(int credit);
+
+    /**
+     * @return The current credits of this {@link IChannelWriteInterface}
+     */
+    public int getCredits();
+
+    /**
+     * Adjusts the {@link IChannelControlBlock} writability based on the 
current
+     * state of this {@link IChannelWriteInterface}
+     */
+    public void adjustChannelWritability();
+}

Reply via email to