http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java new file mode 100644 index 0000000..10c6b24 --- /dev/null +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java @@ -0,0 +1,610 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.memory; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.LinkedBlockingDeque; + +import org.apache.asterix.common.config.AsterixFeedProperties; +import org.apache.asterix.common.memory.ConcurrentFramePool; +import org.apache.asterix.common.memory.FrameAction; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.junit.Assert; +import org.mockito.Mockito; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +public class ConcurrentFramePoolUnitTest extends TestCase { + + private static final int DEFAULT_FRAME_SIZE = 32768; + private static final int NUM_FRAMES = 2048; + private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES; + private static final int NUM_THREADS = 8; + private static final int MAX_SIZE = 52; + private static final double RELEASE_PROBABILITY = 0.20; + private volatile static HyracksDataException cause = null; + + public ConcurrentFramePoolUnitTest(String testName) { + super(testName); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() { + return new TestSuite(ConcurrentFramePoolUnitTest.class); + } + + @org.junit.Test + public void testMemoryManager() { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), + DEFAULT_FRAME_SIZE); + int i = 0; + while (fmm.get() != null) { + i++; + } + Assert.assertEquals(i, NUM_FRAMES); + Assert.assertNull(cause); + } + + @org.junit.Test + public void testConcurrentMemoryManager() { + try { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), + DEFAULT_FRAME_SIZE); + FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS]; + Thread[] threads = new Thread[NUM_THREADS]; + Arrays.parallelSetAll(runners, (int i) -> new FixedSizeAllocator(fmm)); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(runners[i]); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + int i = 0; + for (FixedSizeAllocator allocator : runners) { + i += allocator.getAllocated(); + } + Assert.assertEquals(NUM_FRAMES, i); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } + Assert.assertNull(cause); + } + + @org.junit.Test + public void testVarSizeMemoryManager() { + try { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), + DEFAULT_FRAME_SIZE); + Random random = new Random(); + int i = 0; + int req; + while (true) { + req = random.nextInt(MAX_SIZE) + 1; + if (req == 1) { + if (fmm.get() != null) { + i += 1; + } else { + break; + } + } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) { + i += req; + } else { + break; + } + } + + Assert.assertEquals(i <= NUM_FRAMES, true); + Assert.assertEquals(i + req > NUM_FRAMES, true); + Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } + Assert.assertNull(cause); + } + + @org.junit.Test + public void testConcurrentVarSizeMemoryManager() { + try { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), + DEFAULT_FRAME_SIZE); + + VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS]; + Thread[] threads = new Thread[NUM_THREADS]; + Arrays.parallelSetAll(runners, (int i) -> new VarSizeAllocator(fmm)); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(runners[i]); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + int allocated = 0; + for (int i = 0; i < threads.length; i++) { + if (runners[i].cause() != null) { + runners[i].cause().printStackTrace(); + Assert.fail(runners[i].cause().getMessage()); + } + allocated += runners[i].getAllocated(); + } + Assert.assertEquals(allocated <= NUM_FRAMES, true); + for (int i = 0; i < threads.length; i++) { + Assert.assertEquals(allocated + runners[i].getLastReq() > NUM_FRAMES, true); + } + Assert.assertEquals(allocated + fmm.remaining(), NUM_FRAMES); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } + Assert.assertNull(cause); + } + + @org.junit.Test + public void testAcquireReleaseMemoryManager() throws HyracksDataException { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), + DEFAULT_FRAME_SIZE); + Random random = new Random(); + ArrayDeque<ByteBuffer> stack = new ArrayDeque<>(); + while (true) { + if (random.nextDouble() < RELEASE_PROBABILITY) { + if (!stack.isEmpty()) { + fmm.release(stack.pop()); + } + } else { + ByteBuffer buffer = fmm.get(); + if (buffer == null) { + break; + } else { + stack.push(buffer); + } + } + } + Assert.assertEquals(stack.size(), NUM_FRAMES); + Assert.assertEquals(fmm.remaining(), 0); + for (ByteBuffer buffer : stack) { + fmm.release(buffer); + } + stack.clear(); + Assert.assertEquals(fmm.remaining(), NUM_FRAMES); + Assert.assertNull(cause); + } + + @org.junit.Test + public void testConcurrentAcquireReleaseMemoryManager() { + try { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), + DEFAULT_FRAME_SIZE); + FixedSizeGoodAllocator[] runners = new FixedSizeGoodAllocator[NUM_THREADS]; + Thread[] threads = new Thread[NUM_THREADS]; + Arrays.parallelSetAll(runners, (int i) -> new FixedSizeGoodAllocator(fmm)); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(runners[i]); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + int i = 0; + for (FixedSizeGoodAllocator allocator : runners) { + i += allocator.getAllocated(); + } + Assert.assertEquals(NUM_FRAMES, i); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } + Assert.assertNull(cause); + } + + @org.junit.Test + public void testAcquireReleaseVarSizeMemoryManager() { + try { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), + DEFAULT_FRAME_SIZE); + Random random = new Random(); + ArrayDeque<ByteBuffer> stack = new ArrayDeque<>(); + int i = 0; + int req; + while (true) { + // release + if (random.nextDouble() < RELEASE_PROBABILITY) { + if (!stack.isEmpty()) { + ByteBuffer buffer = stack.pop(); + i -= (buffer.capacity() / DEFAULT_FRAME_SIZE); + fmm.release(buffer); + } + } else { + // acquire + req = random.nextInt(MAX_SIZE) + 1; + if (req == 1) { + ByteBuffer buffer = fmm.get(); + if (buffer != null) { + stack.push(buffer); + i += 1; + } else { + break; + } + } else { + ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE); + if (buffer != null) { + stack.push(buffer); + i += req; + } else { + break; + } + } + } + } + + Assert.assertEquals(i <= NUM_FRAMES, true); + Assert.assertEquals(i + req > NUM_FRAMES, true); + Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } finally { + Assert.assertNull(cause); + } + } + + @org.junit.Test + public void testConcurrentAcquireReleaseVarSizeMemoryManager() { + try { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), + DEFAULT_FRAME_SIZE); + VarSizeGoodAllocator[] runners = new VarSizeGoodAllocator[NUM_THREADS]; + Thread[] threads = new Thread[NUM_THREADS]; + Arrays.parallelSetAll(runners, (int i) -> new VarSizeGoodAllocator(fmm)); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(runners[i]); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + int i = 0; + for (VarSizeGoodAllocator allocator : runners) { + if (allocator.cause() != null) { + allocator.cause().printStackTrace(); + Assert.fail(allocator.cause().getMessage()); + } + i += allocator.getAllocated(); + } + Assert.assertEquals(NUM_FRAMES, i + fmm.remaining()); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } finally { + Assert.assertNull(cause); + } + } + + @org.junit.Test + public void testFixedSizeSubscribtion() { + try { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), + DEFAULT_FRAME_SIZE); + int i = 0; + ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); + LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>(); + FrameAction frameAction = new FrameAction(); + frameAction.setFrame(buffer); + while (!fmm.subscribe(frameAction)) { + buffers.put(frameAction.retrieve()); + i++; + } + // One subscriber. + // Check that all frames have been consumed + Assert.assertEquals(i, NUM_FRAMES); + // Release a frame (That will be handed out to the subscriber) + fmm.release(buffers.take()); + // Check that all frames have been consumed (since the released frame have been handed to the consumer) + Assert.assertEquals(0, fmm.remaining()); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } finally { + Assert.assertNull(cause); + } + } + + @org.junit.Test + public void testLargerThanBudgetRequests() { + HyracksDataException hde = null; + try { + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE); + fmm.get(32 * DEFAULT_FRAME_SIZE); + } catch (HyracksDataException e) { + hde = e; + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } + Assert.assertNotNull(hde); + Assert.assertNull(cause); + } + + @org.junit.Test + public void testLargerThanBudgetSubscribe() { + HyracksDataException hde = null; + try { + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE); + ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 32); + FrameAction frameAction = new FrameAction(); + frameAction.setFrame(buffer); + fmm.subscribe(frameAction); + } catch (HyracksDataException e) { + hde = e; + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } + Assert.assertNotNull(hde); + Assert.assertNull(cause); + } + + @org.junit.Test + public void testgetWhileSubscribersExist() { + try { + AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); + Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); + ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), + DEFAULT_FRAME_SIZE); + int i = 0; + ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); + LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>(); + FrameAction frameAction = new FrameAction(); + frameAction.setFrame(buffer); + while (!fmm.subscribe(frameAction)) { + buffers.put(frameAction.retrieve()); + i++; + } + // One subscriber. + // Check that all frames have been consumed + Assert.assertEquals(i, NUM_FRAMES); + // Release a frame (That will be handed out to the subscriber) + fmm.release(buffers.take()); + // Check that all frames have been consumed (since the released frame have been handed to the consumer) + Assert.assertEquals(fmm.remaining(), 0); + buffers.put(frameAction.retrieve()); + // Create another subscriber that takes frames of double the size + ByteBuffer bufferTimes2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2); + LinkedBlockingDeque<ByteBuffer> buffersTimes2 = new LinkedBlockingDeque<>(); + FrameAction frameActionTimes2 = new FrameAction(); + frameActionTimes2.setFrame(bufferTimes2); + Assert.assertEquals(true, fmm.subscribe(frameActionTimes2)); + // release a small one + fmm.release(buffers.take()); + Assert.assertEquals(fmm.remaining(), 1); + // Check that a small get fails + Assert.assertEquals(null, fmm.get()); + // release another small one + fmm.release(buffers.take()); + // Check that no small frames exists in the pool since subscriber request was satisfied + Assert.assertEquals(fmm.remaining(), 0); + buffersTimes2.add(frameActionTimes2.retrieve()); + fmm.release(buffers); + fmm.release(bufferTimes2); + Assert.assertEquals(fmm.remaining(), NUM_FRAMES); + } catch (Throwable th) { + th.printStackTrace(); + Assert.fail(th.getMessage()); + } finally { + Assert.assertNull(cause); + } + } + + /* + * Runnables used for unit tests + */ + private class FixedSizeAllocator implements Runnable { + private final ConcurrentFramePool fmm; + private int allocated = 0; + + public FixedSizeAllocator(ConcurrentFramePool fmm) { + this.fmm = fmm; + } + + public int getAllocated() { + return allocated; + } + + @Override + public void run() { + while (fmm.get() != null) { + allocated++; + } + } + } + + private class FixedSizeGoodAllocator implements Runnable { + private final ConcurrentFramePool fmm; + private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>(); + private final Random random = new Random(); + + public FixedSizeGoodAllocator(ConcurrentFramePool fmm) { + this.fmm = fmm; + } + + public int getAllocated() { + return stack.size(); + } + + @Override + public void run() { + while (true) { + if (random.nextDouble() < RELEASE_PROBABILITY) { + if (!stack.isEmpty()) { + try { + fmm.release(stack.pop()); + } catch (HyracksDataException e) { + e.printStackTrace(); + cause = e; + } + } + } else { + ByteBuffer buffer = fmm.get(); + if (buffer == null) { + break; + } else { + stack.push(buffer); + } + } + } + } + } + + private class VarSizeGoodAllocator implements Runnable { + private final ConcurrentFramePool fmm; + private int allocated = 0; + private int req = 0; + private final Random random = new Random(); + private Throwable cause; + private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>(); + + public VarSizeGoodAllocator(ConcurrentFramePool fmm) { + this.fmm = fmm; + } + + public int getAllocated() { + return allocated; + } + + public Throwable cause() { + return cause; + } + + @Override + public void run() { + try { + while (true) { + if (random.nextDouble() < RELEASE_PROBABILITY) { + if (!stack.isEmpty()) { + ByteBuffer buffer = stack.pop(); + allocated -= (buffer.capacity() / DEFAULT_FRAME_SIZE); + fmm.release(buffer); + } + } else { + req = random.nextInt(MAX_SIZE) + 1; + if (req == 1) { + ByteBuffer buffer = fmm.get(); + if (buffer != null) { + stack.push(buffer); + allocated += 1; + } else { + break; + } + } else { + ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE); + if (buffer != null) { + stack.push(buffer); + allocated += req; + } else { + break; + } + } + } + } + } catch (Throwable th) { + this.cause = th; + } + } + } + + private class VarSizeAllocator implements Runnable { + private final ConcurrentFramePool fmm; + private int allocated = 0; + private int req = 0; + private final Random random = new Random(); + private Throwable cause; + + public VarSizeAllocator(ConcurrentFramePool fmm) { + this.fmm = fmm; + } + + public int getAllocated() { + return allocated; + } + + public int getLastReq() { + return req; + } + + public Throwable cause() { + return cause; + } + + @Override + public void run() { + try { + while (true) { + req = random.nextInt(MAX_SIZE) + 1; + if (req == 1) { + if (fmm.get() != null) { + allocated += 1; + } else { + break; + } + } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) { + allocated += req; + } else { + break; + } + } + } catch (Throwable th) { + this.cause = th; + } + } + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java index d4e3641..cd04515 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java @@ -24,8 +24,8 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.active.ActiveRuntimeId; -import org.apache.asterix.active.ConcurrentFramePool; -import org.apache.asterix.active.FrameAction; +import org.apache.asterix.common.memory.ConcurrentFramePool; +import org.apache.asterix.common.memory.FrameAction; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.util.FeedUtils.Mode; @@ -73,11 +73,12 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat throws HyracksDataException { this.writer = writer; - this.spiller = - fpa.spillToDiskOnCongestion() ? new FrameSpiller(ctx, + this.spiller = fpa.spillToDiskOnCongestion() + ? new FrameSpiller(ctx, connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_" + runtimeId.getRuntimeName() + "_" + runtimeId.getPartition(), - fpa.getMaxSpillOnDisk()) : null; + fpa.getMaxSpillOnDisk()) + : null; this.exceptionHandler = new FeedExceptionHandler(ctx, fta); this.fpa = fpa; this.framePool = framePool; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java deleted file mode 100644 index 0f6a2ea..0000000 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java +++ /dev/null @@ -1,610 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.test; - -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Random; -import java.util.concurrent.LinkedBlockingDeque; - -import org.apache.asterix.active.ConcurrentFramePool; -import org.apache.asterix.active.FrameAction; -import org.apache.asterix.common.config.AsterixFeedProperties; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.junit.Assert; -import org.mockito.Mockito; - -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -public class ConcurrentFramePoolUnitTest extends TestCase { - - private static final int DEFAULT_FRAME_SIZE = 32768; - private static final int NUM_FRAMES = 2048; - private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES; - private static final int NUM_THREADS = 8; - private static final int MAX_SIZE = 52; - private static final double RELEASE_PROBABILITY = 0.20; - private volatile static HyracksDataException cause = null; - - public ConcurrentFramePoolUnitTest(String testName) { - super(testName); - } - - /** - * @return the suite of tests being tested - */ - public static Test suite() { - return new TestSuite(ConcurrentFramePoolUnitTest.class); - } - - @org.junit.Test - public void testMemoryManager() { - AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); - Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); - ConcurrentFramePool fmm = - new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); - int i = 0; - while (fmm.get() != null) { - i++; - } - Assert.assertEquals(i, NUM_FRAMES); - Assert.assertNull(cause); - } - - @org.junit.Test - public void testConcurrentMemoryManager() { - try { - AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); - Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); - ConcurrentFramePool fmm = - new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); - FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS]; - Thread[] threads = new Thread[NUM_THREADS]; - Arrays.parallelSetAll(runners, (int i) -> new FixedSizeAllocator(fmm)); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread(runners[i]); - } - for (int i = 0; i < threads.length; i++) { - threads[i].start(); - } - for (int i = 0; i < threads.length; i++) { - threads[i].join(); - } - int i = 0; - for (FixedSizeAllocator allocator : runners) { - i += allocator.getAllocated(); - } - Assert.assertEquals(NUM_FRAMES, i); - } catch (Throwable th) { - th.printStackTrace(); - Assert.fail(th.getMessage()); - } - Assert.assertNull(cause); - } - - @org.junit.Test - public void testVarSizeMemoryManager() { - try { - AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); - Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); - ConcurrentFramePool fmm = - new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); - Random random = new Random(); - int i = 0; - int req; - while (true) { - req = random.nextInt(MAX_SIZE) + 1; - if (req == 1) { - if (fmm.get() != null) { - i += 1; - } else { - break; - } - } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) { - i += req; - } else { - break; - } - } - - Assert.assertEquals(i <= NUM_FRAMES, true); - Assert.assertEquals(i + req > NUM_FRAMES, true); - Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES); - } catch (Throwable th) { - th.printStackTrace(); - Assert.fail(th.getMessage()); - } - Assert.assertNull(cause); - } - - @org.junit.Test - public void testConcurrentVarSizeMemoryManager() { - try { - AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); - Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); - ConcurrentFramePool fmm = - new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); - - VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS]; - Thread[] threads = new Thread[NUM_THREADS]; - Arrays.parallelSetAll(runners, (int i) -> new VarSizeAllocator(fmm)); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread(runners[i]); - } - for (int i = 0; i < threads.length; i++) { - threads[i].start(); - } - for (int i = 0; i < threads.length; i++) { - threads[i].join(); - } - int allocated = 0; - for (int i = 0; i < threads.length; i++) { - if (runners[i].cause() != null) { - runners[i].cause().printStackTrace(); - Assert.fail(runners[i].cause().getMessage()); - } - allocated += runners[i].getAllocated(); - } - Assert.assertEquals(allocated <= NUM_FRAMES, true); - for (int i = 0; i < threads.length; i++) { - Assert.assertEquals(allocated + runners[i].getLastReq() > NUM_FRAMES, true); - } - Assert.assertEquals(allocated + fmm.remaining(), NUM_FRAMES); - } catch (Throwable th) { - th.printStackTrace(); - Assert.fail(th.getMessage()); - } - Assert.assertNull(cause); - } - - @org.junit.Test - public void testAcquireReleaseMemoryManager() throws HyracksDataException { - AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); - Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); - ConcurrentFramePool fmm = - new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); - Random random = new Random(); - ArrayDeque<ByteBuffer> stack = new ArrayDeque<>(); - while (true) { - if (random.nextDouble() < RELEASE_PROBABILITY) { - if (!stack.isEmpty()) { - fmm.release(stack.pop()); - } - } else { - ByteBuffer buffer = fmm.get(); - if (buffer == null) { - break; - } else { - stack.push(buffer); - } - } - } - Assert.assertEquals(stack.size(), NUM_FRAMES); - Assert.assertEquals(fmm.remaining(), 0); - for (ByteBuffer buffer : stack) { - fmm.release(buffer); - } - stack.clear(); - Assert.assertEquals(fmm.remaining(), NUM_FRAMES); - Assert.assertNull(cause); - } - - @org.junit.Test - public void testConcurrentAcquireReleaseMemoryManager() { - try { - AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); - Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); - ConcurrentFramePool fmm = - new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); - FixedSizeGoodAllocator[] runners = new FixedSizeGoodAllocator[NUM_THREADS]; - Thread[] threads = new Thread[NUM_THREADS]; - Arrays.parallelSetAll(runners, (int i) -> new FixedSizeGoodAllocator(fmm)); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread(runners[i]); - } - for (int i = 0; i < threads.length; i++) { - threads[i].start(); - } - for (int i = 0; i < threads.length; i++) { - threads[i].join(); - } - int i = 0; - for (FixedSizeGoodAllocator allocator : runners) { - i += allocator.getAllocated(); - } - Assert.assertEquals(NUM_FRAMES, i); - } catch (Throwable th) { - th.printStackTrace(); - Assert.fail(th.getMessage()); - } - Assert.assertNull(cause); - } - - @org.junit.Test - public void testAcquireReleaseVarSizeMemoryManager() { - try { - AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); - Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); - ConcurrentFramePool fmm = - new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); - Random random = new Random(); - ArrayDeque<ByteBuffer> stack = new ArrayDeque<>(); - int i = 0; - int req; - while (true) { - // release - if (random.nextDouble() < RELEASE_PROBABILITY) { - if (!stack.isEmpty()) { - ByteBuffer buffer = stack.pop(); - i -= (buffer.capacity() / DEFAULT_FRAME_SIZE); - fmm.release(buffer); - } - } else { - // acquire - req = random.nextInt(MAX_SIZE) + 1; - if (req == 1) { - ByteBuffer buffer = fmm.get(); - if (buffer != null) { - stack.push(buffer); - i += 1; - } else { - break; - } - } else { - ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE); - if (buffer != null) { - stack.push(buffer); - i += req; - } else { - break; - } - } - } - } - - Assert.assertEquals(i <= NUM_FRAMES, true); - Assert.assertEquals(i + req > NUM_FRAMES, true); - Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES); - } catch (Throwable th) { - th.printStackTrace(); - Assert.fail(th.getMessage()); - } finally { - Assert.assertNull(cause); - } - } - - @org.junit.Test - public void testConcurrentAcquireReleaseVarSizeMemoryManager() { - try { - AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); - Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); - ConcurrentFramePool fmm = - new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); - VarSizeGoodAllocator[] runners = new VarSizeGoodAllocator[NUM_THREADS]; - Thread[] threads = new Thread[NUM_THREADS]; - Arrays.parallelSetAll(runners, (int i) -> new VarSizeGoodAllocator(fmm)); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread(runners[i]); - } - for (int i = 0; i < threads.length; i++) { - threads[i].start(); - } - for (int i = 0; i < threads.length; i++) { - threads[i].join(); - } - int i = 0; - for (VarSizeGoodAllocator allocator : runners) { - if (allocator.cause() != null) { - allocator.cause().printStackTrace(); - Assert.fail(allocator.cause().getMessage()); - } - i += allocator.getAllocated(); - } - Assert.assertEquals(NUM_FRAMES, i + fmm.remaining()); - } catch (Throwable th) { - th.printStackTrace(); - Assert.fail(th.getMessage()); - } finally { - Assert.assertNull(cause); - } - } - - @org.junit.Test - public void testFixedSizeSubscribtion() { - try { - AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); - Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); - ConcurrentFramePool fmm = - new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); - int i = 0; - ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); - LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>(); - FrameAction frameAction = new FrameAction(); - frameAction.setFrame(buffer); - while (!fmm.subscribe(frameAction)) { - buffers.put(frameAction.retrieve()); - i++; - } - // One subscriber. - // Check that all frames have been consumed - Assert.assertEquals(i, NUM_FRAMES); - // Release a frame (That will be handed out to the subscriber) - fmm.release(buffers.take()); - // Check that all frames have been consumed (since the released frame have been handed to the consumer) - Assert.assertEquals(0, fmm.remaining()); - } catch (Throwable th) { - th.printStackTrace(); - Assert.fail(th.getMessage()); - } finally { - Assert.assertNull(cause); - } - } - - @org.junit.Test - public void testLargerThanBudgetRequests() { - HyracksDataException hde = null; - try { - ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE); - fmm.get(32 * DEFAULT_FRAME_SIZE); - } catch (HyracksDataException e) { - hde = e; - } catch (Throwable th) { - th.printStackTrace(); - Assert.fail(th.getMessage()); - } - Assert.assertNotNull(hde); - Assert.assertNull(cause); - } - - @org.junit.Test - public void testLargerThanBudgetSubscribe() { - HyracksDataException hde = null; - try { - ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE); - ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 32); - FrameAction frameAction = new FrameAction(); - frameAction.setFrame(buffer); - fmm.subscribe(frameAction); - } catch (HyracksDataException e) { - hde = e; - } catch (Throwable th) { - th.printStackTrace(); - Assert.fail(th.getMessage()); - } - Assert.assertNotNull(hde); - Assert.assertNull(cause); - } - - @org.junit.Test - public void testgetWhileSubscribersExist() { - try { - AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class); - Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET); - ConcurrentFramePool fmm = - new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE); - int i = 0; - ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); - LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>(); - FrameAction frameAction = new FrameAction(); - frameAction.setFrame(buffer); - while (!fmm.subscribe(frameAction)) { - buffers.put(frameAction.retrieve()); - i++; - } - // One subscriber. - // Check that all frames have been consumed - Assert.assertEquals(i, NUM_FRAMES); - // Release a frame (That will be handed out to the subscriber) - fmm.release(buffers.take()); - // Check that all frames have been consumed (since the released frame have been handed to the consumer) - Assert.assertEquals(fmm.remaining(), 0); - buffers.put(frameAction.retrieve()); - // Create another subscriber that takes frames of double the size - ByteBuffer bufferTimes2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2); - LinkedBlockingDeque<ByteBuffer> buffersTimes2 = new LinkedBlockingDeque<>(); - FrameAction frameActionTimes2 = new FrameAction(); - frameActionTimes2.setFrame(bufferTimes2); - Assert.assertEquals(true, fmm.subscribe(frameActionTimes2)); - // release a small one - fmm.release(buffers.take()); - Assert.assertEquals(fmm.remaining(), 1); - // Check that a small get fails - Assert.assertEquals(null, fmm.get()); - // release another small one - fmm.release(buffers.take()); - // Check that no small frames exists in the pool since subscriber request was satisfied - Assert.assertEquals(fmm.remaining(), 0); - buffersTimes2.add(frameActionTimes2.retrieve()); - fmm.release(buffers); - fmm.release(bufferTimes2); - Assert.assertEquals(fmm.remaining(), NUM_FRAMES); - } catch (Throwable th) { - th.printStackTrace(); - Assert.fail(th.getMessage()); - } finally { - Assert.assertNull(cause); - } - } - - /* - * Runnables used for unit tests - */ - private class FixedSizeAllocator implements Runnable { - private final ConcurrentFramePool fmm; - private int allocated = 0; - - public FixedSizeAllocator(ConcurrentFramePool fmm) { - this.fmm = fmm; - } - - public int getAllocated() { - return allocated; - } - - @Override - public void run() { - while (fmm.get() != null) { - allocated++; - } - } - } - - private class FixedSizeGoodAllocator implements Runnable { - private final ConcurrentFramePool fmm; - private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>(); - private final Random random = new Random(); - - public FixedSizeGoodAllocator(ConcurrentFramePool fmm) { - this.fmm = fmm; - } - - public int getAllocated() { - return stack.size(); - } - - @Override - public void run() { - while (true) { - if (random.nextDouble() < RELEASE_PROBABILITY) { - if (!stack.isEmpty()) { - try { - fmm.release(stack.pop()); - } catch (HyracksDataException e) { - e.printStackTrace(); - cause = e; - } - } - } else { - ByteBuffer buffer = fmm.get(); - if (buffer == null) { - break; - } else { - stack.push(buffer); - } - } - } - } - } - - private class VarSizeGoodAllocator implements Runnable { - private final ConcurrentFramePool fmm; - private int allocated = 0; - private int req = 0; - private final Random random = new Random(); - private Throwable cause; - private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>(); - - public VarSizeGoodAllocator(ConcurrentFramePool fmm) { - this.fmm = fmm; - } - - public int getAllocated() { - return allocated; - } - - public Throwable cause() { - return cause; - } - - @Override - public void run() { - try { - while (true) { - if (random.nextDouble() < RELEASE_PROBABILITY) { - if (!stack.isEmpty()) { - ByteBuffer buffer = stack.pop(); - allocated -= (buffer.capacity() / DEFAULT_FRAME_SIZE); - fmm.release(buffer); - } - } else { - req = random.nextInt(MAX_SIZE) + 1; - if (req == 1) { - ByteBuffer buffer = fmm.get(); - if (buffer != null) { - stack.push(buffer); - allocated += 1; - } else { - break; - } - } else { - ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE); - if (buffer != null) { - stack.push(buffer); - allocated += req; - } else { - break; - } - } - } - } - } catch (Throwable th) { - this.cause = th; - } - } - } - - private class VarSizeAllocator implements Runnable { - private final ConcurrentFramePool fmm; - private int allocated = 0; - private int req = 0; - private final Random random = new Random(); - private Throwable cause; - - public VarSizeAllocator(ConcurrentFramePool fmm) { - this.fmm = fmm; - } - - public int getAllocated() { - return allocated; - } - - public int getLastReq() { - return req; - } - - public Throwable cause() { - return cause; - } - - @Override - public void run() { - try { - while (true) { - req = random.nextInt(MAX_SIZE) + 1; - if (req == 1) { - if (fmm.get() != null) { - allocated += 1; - } else { - break; - } - } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) { - allocated += req; - } else { - break; - } - } - } catch (Throwable th) { - this.cause = th; - } - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java index e643206..171d271 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java @@ -26,8 +26,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.asterix.active.ActiveRuntimeId; -import org.apache.asterix.active.ConcurrentFramePool; import org.apache.asterix.active.EntityId; +import org.apache.asterix.common.memory.ConcurrentFramePool; import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; @@ -114,11 +114,11 @@ public class InputHandlerTest extends TestCase { Random random = new Random(); IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // No spill, No discard - FeedPolicyAccessor fpa = - createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, + DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = - FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); + TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), + false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -156,11 +156,11 @@ public class InputHandlerTest extends TestCase { int numRounds = 10; IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // No spill, No discard - FeedPolicyAccessor fpa = - createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, + DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = - FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); + TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), + false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -205,14 +205,14 @@ public class InputHandlerTest extends TestCase { int totalMinFrames = 0; IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // Spill budget = Memory budget, No discard - FeedPolicyAccessor fpa = - createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, + DISCARD_ALLOWANCE); // Non-Active Writer TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool - ConcurrentFramePool framePool = - new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, + DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); handler.open(); ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); @@ -275,8 +275,8 @@ public class InputHandlerTest extends TestCase { Assert.assertEquals(0, handler.getNumDiscarded()); // We can only discard one frame double numDiscarded = 0; - boolean nextShouldDiscard = - ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa + .getMaxFractionDiscard(); while (nextShouldDiscard) { handler.nextFrame(buffer5); numDiscarded++; @@ -315,14 +315,14 @@ public class InputHandlerTest extends TestCase { int numberOfSpillFrames = 50; IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // Spill budget = Memory budget, No discard - FeedPolicyAccessor fpa = - createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, + DISCARD_ALLOWANCE); // Non-Active Writer TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool - ConcurrentFramePool framePool = - new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, + DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); handler.open(); VSizeFrame frame = new VSizeFrame(ctx); @@ -345,8 +345,8 @@ public class InputHandlerTest extends TestCase { Assert.assertEquals(0, handler.getNumDiscarded()); // We can only discard one frame double numDiscarded = 0; - boolean nextShouldDiscard = - ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa + .getMaxFractionDiscard(); while (nextShouldDiscard) { handler.nextFrame(frame.getBuffer()); numDiscarded++; @@ -394,8 +394,8 @@ public class InputHandlerTest extends TestCase { TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool - ConcurrentFramePool framePool = - new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, + DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); handler.open(); // add NUM_FRAMES times @@ -411,8 +411,8 @@ public class InputHandlerTest extends TestCase { } // Next call should NOT block but should discard. double numDiscarded = 0.0; - boolean nextShouldDiscard = - ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa + .getMaxFractionDiscard(); while (nextShouldDiscard) { handler.nextFrame(buffer); numDiscarded++; @@ -456,8 +456,8 @@ public class InputHandlerTest extends TestCase { TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool - ConcurrentFramePool framePool = - new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); + ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, + DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); handler.open(); VSizeFrame frame = new VSizeFrame(ctx); @@ -467,8 +467,8 @@ public class InputHandlerTest extends TestCase { } // Next 5 calls call should NOT block but should discard. double numDiscarded = 0.0; - boolean nextShouldDiscard = - ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); + boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa + .getMaxFractionDiscard(); while (nextShouldDiscard) { handler.nextFrame(frame.getBuffer()); numDiscarded++; @@ -507,8 +507,8 @@ public class InputHandlerTest extends TestCase { try { IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // Spill budget = Memory budget, No discard - FeedPolicyAccessor fpa = - createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, + DISCARD_ALLOWANCE); // Non-Active Writer TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); @@ -554,8 +554,8 @@ public class InputHandlerTest extends TestCase { // No spill, No discard FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = - FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); + TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), + false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -595,8 +595,8 @@ public class InputHandlerTest extends TestCase { // No spill, No discard FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = - FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); + TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), + false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -683,8 +683,8 @@ public class InputHandlerTest extends TestCase { Random random = new Random(); IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // Spill budget = Memory budget, No discard - FeedPolicyAccessor fpa = - createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, + DISCARD_ALLOWANCE); // Non-Active Writer TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java index bf103fc..e7b15a2 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java @@ -33,6 +33,7 @@ import org.apache.asterix.common.config.AsterixReplicationProperties; import org.apache.asterix.common.config.AsterixStorageProperties; import org.apache.asterix.common.config.AsterixTransactionProperties; import org.apache.asterix.common.config.IAsterixPropertiesProvider; +import org.apache.asterix.common.config.MessagingProperties; import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; @@ -62,6 +63,7 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA private AsterixBuildProperties buildProperties; private AsterixReplicationProperties replicationProperties; private AsterixExtensionProperties extensionProperties; + private MessagingProperties messagingProperties; private final IGlobalRecoveryMaanger globalRecoveryMaanger; private IHyracksClientConnection hcc; private final ILibraryManager libraryManager; @@ -92,10 +94,11 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA INSTANCE.txnProperties = new AsterixTransactionProperties(propertiesAccessor); INSTANCE.feedProperties = new AsterixFeedProperties(propertiesAccessor); INSTANCE.extensionProperties = new AsterixExtensionProperties(propertiesAccessor); - INSTANCE.replicationProperties = - new AsterixReplicationProperties(propertiesAccessor, AsterixClusterProperties.INSTANCE.getCluster()); + INSTANCE.replicationProperties = new AsterixReplicationProperties(propertiesAccessor, + AsterixClusterProperties.INSTANCE.getCluster()); INSTANCE.hcc = hcc; INSTANCE.buildProperties = new AsterixBuildProperties(propertiesAccessor); + INSTANCE.messagingProperties = new MessagingProperties(propertiesAccessor); Logger.getLogger("org.apache").setLevel(INSTANCE.externalProperties.getLogLevel()); } @@ -191,4 +194,9 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA public AsterixExtensionProperties getExtensionProperties() { return extensionProperties; } + + @Override + public MessagingProperties getMessagingProperties() { + return messagingProperties; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java index 5b29530..ab1ebe1 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java @@ -57,7 +57,7 @@ public class GlobalResourceIdFactory implements IResourceIdFactory, IApplication //if no response available or it has an exception, request a new one if (reponse == null || reponse.getException() != null) { ResourceIdRequestMessage msg = new ResourceIdRequestMessage(); - ((INCMessageBroker) appCtx.getMessageBroker()).sendMessage(msg, this); + ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg, this); reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take(); if (reponse.getException() != null) { throw new HyracksDataException(reponse.getException().getMessage()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java index 77934c6..b1aa45f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.api.application; +import org.apache.hyracks.api.comm.IChannelInterfaceFactory; import org.apache.hyracks.api.context.IHyracksRootContext; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.resources.memory.IMemoryManager; @@ -77,4 +78,19 @@ public interface INCApplicationContext extends IApplicationContext { * @param handler */ public void setStateDumpHandler(IStateDumpHandler handler); + + /** + * Set the application MessagingChannelInterfaceFactory + * + * @param interfaceFactory + */ + public void setMessagingChannelInterfaceFactory(IChannelInterfaceFactory interfaceFactory); + + /** + * Get the application MessagingChannelInterfaceFactory previously set by + * the {@link #setMessagingChannelInterfaceFactory(IChannelInterfaceFactory)} call. + * + * @return + */ + public IChannelInterfaceFactory getMessagingChannelInterfaceFactory(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java index c41dafe..a79b955 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java @@ -33,12 +33,15 @@ public class NodeControllerInfo implements Serializable { private final NetworkAddress datasetNetworkAddress; + private final NetworkAddress messagingNetworkAddress; + public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress, - NetworkAddress datasetNetworkAddress) { + NetworkAddress datasetNetworkAddress, NetworkAddress messagingNetworkAddress) { this.nodeId = nodeId; this.status = status; this.netAddress = netAddress; this.datasetNetworkAddress = datasetNetworkAddress; + this.messagingNetworkAddress = messagingNetworkAddress; } public String getNodeId() { @@ -56,4 +59,8 @@ public class NodeControllerInfo implements Serializable { public NetworkAddress getDatasetNetworkAddress() { return datasetNetworkAddress; } + + public NetworkAddress getMessagingNetworkAddress() { + return messagingNetworkAddress; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java new file mode 100644 index 0000000..585b6bd --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.comm; + +import java.nio.ByteBuffer; + +/** + * Accepts buffers. + * + * @author vinayakb + */ +@FunctionalInterface +public interface IBufferAcceptor { + /** + * Accept a buffer. + * + * @param buffer + */ + public void accept(ByteBuffer buffer); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java new file mode 100644 index 0000000..5b3a233 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.comm; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +@FunctionalInterface +public interface IBufferFactory { + + public ByteBuffer createBuffer() throws HyracksDataException; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java new file mode 100644 index 0000000..02de858 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.comm; + +public interface IChannelControlBlock { + + /** + * Get the read interface of this channel. + * + * @return the read interface. + */ + public IChannelReadInterface getReadInterface(); + + /** + * Get the write interface of this channel. + * + * @return the write interface. + */ + public IChannelWriteInterface getWriteInterface(); + + /** + * Add write credit to this channel. + * + * @param delta + * number of bytes + */ + public void addWriteCredits(int delta); + + /** + * @return The channel's unique id within its ChannelSet. + */ + public int getChannelId(); + + /** + * Add pending credit. + * + * @param credit + */ + public void addPendingCredits(int credit); + + /** + * Increments the pending write operations of this channel. + */ + public void markPendingWrite(); + + /** + * Clears the pending write operations of this channel. + */ + public void unmarkPendingWrite(); + + /** + * Sets a flag indicating this channel was closed locally. + */ + public void reportLocalEOS(); + + /** + * A flag indicating if the channel was closed on the remote side. + * + * @return + */ + public boolean isRemotelyClosed(); + + /** + * Complete the current write operation on this channel. + */ + public void writeComplete(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelInterfaceFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelInterfaceFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelInterfaceFactory.java new file mode 100644 index 0000000..d147fc7 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelInterfaceFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.comm; + +public interface IChannelInterfaceFactory { + + /** + * Creates {@link IChannelReadInterface} and assigns the passed + * {@link IChannelControlBlock} to it. + * + * @param ccb + * @return + */ + public IChannelReadInterface createReadInterface(IChannelControlBlock ccb); + + /** + * Creates {@link IChannelWriteInterface} and assigns the passed + * {@link IChannelControlBlock} to it. + * + * @param ccb + * @return + */ + public IChannelWriteInterface createWriteInterface(IChannelControlBlock ccb); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java new file mode 100644 index 0000000..357d761 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.comm; + +import java.io.IOException; +import java.nio.channels.SocketChannel; + +import org.apache.hyracks.api.exceptions.NetException; + +/** + * Represents the read interface of a {@link IChannelControlBlock}. + * + * @author vinayakb + */ +public interface IChannelReadInterface { + /** + * Set the callback that will be invoked by the network layer when a buffer has been + * filled with data received from the remote side. + * + * @param fullBufferAcceptor + * - the full buffer acceptor. + */ + public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor); + + /** + * Get the acceptor that collects empty buffers when the client has finished consuming + * a previously full buffer. + * + * @return the empty buffer acceptor. + */ + public IBufferAcceptor getEmptyBufferAcceptor(); + + /** + * Set the buffer factory which is in charge of creating buffers if the request does not + * make the number of allocated buffers goes beyond limit + * + * @param bufferFactory + * - the buffer factory + * @param limit + * - the limit of buffers + * @param frameSize + * - the size of each buffer + */ + public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize); + + /** + * Try to read as much as {@code size} bytes from {@code sc} + * + * @param sc + * @param size + * @return The number of read bytes. + * @throws IOException + * @throws NetException + */ + public int read(SocketChannel sc, int size) throws IOException, NetException; + + /** + * Sets the read credits of this {@link IChannelReadInterface} + * + * @param credits + */ + public void setReadCredits(int credits); + + /** + * @return The current read credits of this {@link IChannelReadInterface} + */ + public int getCredits(); + + /** + * Forces the current read buffer to be flushed + */ + public void flush(); + + /** + * @return The current full buffer acceptor of this {@link IChannelReadInterface} + */ + public ICloseableBufferAcceptor getFullBufferAcceptor(); + + /** + * @return The buffer factory used by this {@link IChannelReadInterface} + */ + public IBufferFactory getBufferFactory(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java new file mode 100644 index 0000000..993dd2c --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.comm; + +import org.apache.hyracks.api.exceptions.NetException; + +/** + * Represents the write interface of a {@link IChannelControlBlock}. + */ +public interface IChannelWriteInterface { + /** + * Set the callback interface that must be invoked when a full buffer has been emptied by + * writing the data to the remote end. + * + * @param emptyBufferAcceptor + * - the empty buffer acceptor. + */ + public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor); + + /** + * Get the full buffer acceptor that accepts buffers filled with data that need to be written + * to the remote end. + * + * @return the full buffer acceptor. + */ + public ICloseableBufferAcceptor getFullBufferAcceptor(); + + /** + * Set the buffer factory which is in charge of creating buffers if the request does not + * make the number of allocated buffers goes beyond limit + * + * @param bufferFactory + * - the buffer factory + * @param limit + * - the limit of buffers + * @param frameSize + * - the size of each buffer + */ + public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize); + + /** + * Performs a pending write operation based on the current state of + * this {@link IChannelWriteInterface} + * + * @param writerState + * @throws NetException + */ + public void write(IConnectionWriterState writerState) throws NetException; + + /** + * Completes the current write operation on this {@link IChannelWriteInterface} + */ + public void writeComplete(); + + /** + * Add credits to this this {@link IChannelWriteInterface} + * + * @param credit + */ + public void addCredits(int credit); + + /** + * @return The current credits of this {@link IChannelWriteInterface} + */ + public int getCredits(); + + /** + * Adjusts the {@link IChannelControlBlock} writability based on the current + * state of this {@link IChannelWriteInterface} + */ + public void adjustChannelWritability(); +}