Repository: apex-malhar Updated Branches: refs/heads/master 389a2d564 -> 13883da68
APEXMALHAR-2045: Adding bandwidth control feature to limit Input Operator bandwidth limit Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/13883da6 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/13883da6 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/13883da6 Branch: refs/heads/master Commit: 13883da68abc0419120655971b0437f3e5ef9435 Parents: 389a2d5 Author: Priyanka Gugale <[email protected]> Authored: Wed Apr 6 18:15:50 2016 +0530 Committer: Priyanka Gugale <[email protected]> Committed: Thu Jun 30 18:17:50 2016 +0530 ---------------------------------------------------------------------- library/pom.xml | 7 +- .../bandwidth/BandwidthLimitingOperator.java | 29 ++ .../lib/bandwidth/BandwidthManager.java | 131 +++++++++ .../lib/bandwidth/BandwidthPartitioner.java | 78 ++++++ .../lib/bandwidth/BandwidthManagerTest.java | 268 +++++++++++++++++++ .../lib/bandwidth/BandwidthPartitionerTest.java | 106 ++++++++ 6 files changed, 618 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/pom.xml ---------------------------------------------------------------------- diff --git a/library/pom.xml b/library/pom.xml index edb9078..7242027 100644 --- a/library/pom.xml +++ b/library/pom.xml @@ -329,7 +329,12 @@ <artifactId>apex-shaded-ning19</artifactId> <version>1.0.0</version> </dependency> - + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.8.5</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthLimitingOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthLimitingOperator.java b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthLimitingOperator.java new file mode 100644 index 0000000..2ff964d --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthLimitingOperator.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.bandwidth; + +import com.datatorrent.api.Operator; + +/** + * Operator which limits bandwidth consumption. It should have instance of BandwidthManager. + */ +public interface BandwidthLimitingOperator extends Operator +{ + BandwidthManager getBandwidthManager(); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java new file mode 100644 index 0000000..303a3a6 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.bandwidth; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; + +/** + * BandwidthManager keeps track of bandwidth consumption and provides limit on maximum bandwidth that can be consumed at + * any moment. This accumulates bandwidth upto certain limits so that accumulated bandwidth can be used over a period of + * time. + */ +public class BandwidthManager implements Component<Context.OperatorContext> +{ + private static final Logger LOG = LoggerFactory.getLogger(BandwidthManager.class); + /** + * Maximum bandwidth that can be consumed in bytes/sec + */ + private long bandwidthLimit = Long.MAX_VALUE; + private transient long currentBandwidthConsumption; + private final transient ScheduledExecutorService scheduler; + private final transient Object lock = new Object(); + + public BandwidthManager() + { + scheduler = Executors.newScheduledThreadPool(1); + } + + BandwidthManager(ScheduledExecutorService scheduler) + { + this.scheduler = scheduler; + } + + @Override + public void setup(OperatorContext context) + { + scheduler.scheduleAtFixedRate(new Runnable() + { + @Override + public void run() + { + if (isBandwidthRestricted()) { + synchronized (lock) { + if (currentBandwidthConsumption < 0) { + currentBandwidthConsumption += bandwidthLimit; + } else { + currentBandwidthConsumption = bandwidthLimit; + } + LOG.debug("Available bandwidth: " + currentBandwidthConsumption); + } + } + } + }, 1, 1, TimeUnit.SECONDS); + } + + public boolean canConsumeBandwidth() + { + if (!isBandwidthRestricted()) { + return true; + } + synchronized (lock) { + return currentBandwidthConsumption >= 0; + } + } + + public void consumeBandwidth(long sentTupleSize) + { + if (isBandwidthRestricted()) { + synchronized (lock) { + currentBandwidthConsumption -= sentTupleSize; + } + } + } + + public boolean isBandwidthRestricted() + { + return bandwidthLimit != Long.MAX_VALUE; + } + + /** + * get maximum bandwidth that can be consumed in bytes/sec + * + * @return + */ + public long getBandwidth() + { + return bandwidthLimit; + } + + /** + * Set maximum bandwidth that can be consumed in bytes/sec + * + * @param bandwidth + */ + public void setBandwidth(long bandwidth) + { + this.bandwidthLimit = bandwidth; + LOG.debug("Bandwidth limit is set to: " + bandwidth + " bytes/sec"); + } + + @Override + public void teardown() + { + scheduler.shutdownNow(); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java new file mode 100644 index 0000000..6d876a7 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.bandwidth; + +import java.util.Collection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.common.partitioner.StatelessPartitioner; + +public class BandwidthPartitioner<T extends BandwidthLimitingOperator> extends StatelessPartitioner<T> +{ + private static final long serialVersionUID = -7502505996637650237L; + private static final Logger LOG = LoggerFactory.getLogger(BandwidthPartitioner.class); + + /** + * This creates a partitioner which creates only one partition. + */ + public BandwidthPartitioner() + { + } + + /** + * This constructor is used to create the partitioner from a property. + * + * @param value A string which is an integer of the number of partitions to create + */ + public BandwidthPartitioner(String value) + { + super(value); + } + + /** + * This creates a partitioner which creates partitonCount partitions. + * + * @param partitionCount The number of partitions to create. + */ + public BandwidthPartitioner(int partitionCount) + { + super(partitionCount); + } + + @Override + public Collection<Partition<T>> definePartitions(Collection<Partition<T>> partitions, PartitioningContext context) + { + long currentBandwidth = partitions.iterator().next().getPartitionedInstance().getBandwidthManager().getBandwidth() + * partitions.size(); + Collection<Partition<T>> newpartitions = super.definePartitions(partitions, context); + return updateBandwidth(newpartitions, currentBandwidth); + } + + public Collection<Partition<T>> updateBandwidth(Collection<Partition<T>> newpartitions, long currentBandwidth) + { + long newBandwidth = currentBandwidth / newpartitions.size(); + for (Partition<T> partition : newpartitions) { + partition.getPartitionedInstance().getBandwidthManager().setBandwidth(newBandwidth); + } + LOG.info("Updating bandwidth of partitions to value: " + newBandwidth); + return newpartitions; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthManagerTest.java b/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthManagerTest.java new file mode 100644 index 0000000..743a8e2 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthManagerTest.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.bandwidth; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.apache.commons.io.FileUtils; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; + +public class BandwidthManagerTest +{ + private static class TestMeta extends TestWatcher + { + private static final long ONE_SECOND = 1000L; + private String applicationPath; + private BandwidthManager underTest; + private Context.OperatorContext context; + private long bandwidthLimit = 10L; + private ScheduledExecutorService mockschedular; + + @Override + protected void starting(Description description) + { + super.starting(description); + mockschedular = new ScheduledExecutorService() + { + private Runnable command; + + @Override + public void shutdown() + { + } + + @Override + public List<Runnable> shutdownNow() + { + return null; + } + + @Override + public boolean isShutdown() + { + return false; + } + + @Override + public boolean isTerminated() + { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException + { + return false; + } + + @Override + public <T> Future<T> submit(Callable<T> task) + { + return null; + } + + @Override + public <T> Future<T> submit(Runnable task, T result) + { + return null; + } + + @Override + public Future<?> submit(Runnable task) + { + return null; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException + { + return null; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException + { + return null; + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException + { + return null; + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + return null; + } + + @Override + public void execute(Runnable command) + { + this.command.run(); + } + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) + { + this.command = command; + return null; + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) + { + return null; + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) + { + this.command = command; + return null; + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) + { + this.command = command; + return null; + } + }; + underTest = new BandwidthManager(mockschedular); + underTest.setBandwidth(bandwidthLimit); + + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + underTest.setup(context); + } + + @Override + protected void finished(Description description) + { + underTest.teardown(); + try { + FileUtils.deleteDirectory(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testBandwidthForLargeBlocks() throws InterruptedException + { + String data = "Tuple: test data to be emitted."; + long timeCounter = 0; + testMeta.underTest.consumeBandwidth(data.length()); + while (!testMeta.underTest.canConsumeBandwidth()) { + timeCounter += TestMeta.ONE_SECOND; + testMeta.mockschedular.execute(null); // accumulate bandwidth + } + Assert.assertTrue(timeCounter > ((data.length() / testMeta.bandwidthLimit) * 1000)); + } + + @Test + public void testBandwidthForSmallBlocks() + { + String data = "Tuple"; + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + testMeta.underTest.consumeBandwidth(data.length()); + testMeta.mockschedular.execute(null); + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + testMeta.underTest.consumeBandwidth(data.length()); + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + testMeta.underTest.consumeBandwidth(data.length()); + Assert.assertFalse(testMeta.underTest.canConsumeBandwidth()); + } + + @Test + public void testBandwidthForMultipleBlocks() + { + int[] tupleSizes = {5, 2, 5, 4, 10, 25, 2}; + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + testMeta.underTest.consumeBandwidth(tupleSizes[0]); + testMeta.mockschedular.execute(null); + + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + testMeta.underTest.consumeBandwidth(tupleSizes[1]); + + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + testMeta.underTest.consumeBandwidth(tupleSizes[2]); + Assert.assertFalse(testMeta.underTest.canConsumeBandwidth()); + testMeta.mockschedular.execute(null); + + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + testMeta.underTest.consumeBandwidth(tupleSizes[3]); + + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + testMeta.underTest.consumeBandwidth(tupleSizes[4]); + Assert.assertFalse(testMeta.underTest.canConsumeBandwidth()); + testMeta.mockschedular.execute(null); + + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + testMeta.underTest.consumeBandwidth(tupleSizes[5]); + + Assert.assertFalse(testMeta.underTest.canConsumeBandwidth()); + testMeta.mockschedular.execute(null); + Assert.assertFalse(testMeta.underTest.canConsumeBandwidth()); + testMeta.mockschedular.execute(null); + Assert.assertFalse(testMeta.underTest.canConsumeBandwidth()); + testMeta.mockschedular.execute(null); + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + testMeta.underTest.consumeBandwidth(tupleSizes[6]); + + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + } + + @Test + public void testUnsetBandwidth() + { + testMeta.underTest.setBandwidth(Integer.MAX_VALUE); + Assert.assertTrue(testMeta.underTest.canConsumeBandwidth()); + } +} + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthPartitionerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthPartitionerTest.java b/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthPartitionerTest.java new file mode 100644 index 0000000..e852d98 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthPartitionerTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.bandwidth; + +import java.util.Collection; +import java.util.Iterator; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.Partitioner.Partition; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class BandwidthPartitionerTest +{ + @Mock + private BandwidthManager bandwidthManagerMock; + @Mock + private BandwidthLimitingOperator operatorMock; + @Mock + private Partition<BandwidthLimitingOperator> partitionMock; + @Mock + private Partitioner.PartitioningContext partitionContextMock; + @Mock + private Iterator<Partition<BandwidthLimitingOperator>> iteratorMock; + private BandwidthPartitioner<BandwidthLimitingOperator> underTest = new BandwidthPartitioner<BandwidthLimitingOperator>(); + + @Before + public void setup() + { + MockitoAnnotations.initMocks(this); + when(iteratorMock.hasNext()).thenReturn(true, false); + when(iteratorMock.next()).thenReturn(partitionMock); + when(partitionMock.getPartitionedInstance()).thenReturn(operatorMock); + when(operatorMock.getBandwidthManager()).thenReturn(bandwidthManagerMock); + when(bandwidthManagerMock.getBandwidth()).thenReturn(10L); + when(partitionContextMock.getInputPorts()).thenReturn(null); + } + + @Test + public void testBandwidthOnPartitions() + { + when(partitionContextMock.getParallelPartitionCount()).thenReturn(0); // no partitions + Collection<Partition<BandwidthLimitingOperator>> partitions = Lists.newArrayList(); + DefaultPartition<BandwidthLimitingOperator> defaultPartition = new DefaultPartition<BandwidthLimitingOperator>(operatorMock); + partitions.add(defaultPartition); + + underTest.definePartitions(partitions, partitionContextMock); + verify(bandwidthManagerMock).setBandwidth(10L); + } + + @Test + public void testBandwidthOnIncresedPartitions() + { + when(partitionContextMock.getParallelPartitionCount()).thenReturn(5); + Collection<Partition<BandwidthLimitingOperator>> partitions = Lists.newArrayList(); + DefaultPartition<BandwidthLimitingOperator> defaultPartition = new DefaultPartition<BandwidthLimitingOperator>(operatorMock); + partitions.add(defaultPartition); + + underTest.definePartitions(partitions, partitionContextMock); + verify(bandwidthManagerMock, times(5)).setBandwidth(2L); + } + + @Test + public void testBandwidthOnReducedPartitions() + { + when(partitionContextMock.getParallelPartitionCount()).thenReturn(2); + when(bandwidthManagerMock.getBandwidth()).thenReturn(2L); + Collection<Partition<BandwidthLimitingOperator>> partitions = Lists.newArrayList(); + + for (int i = 5; i-- > 0;) { + partitions.add(new DefaultPartition<BandwidthLimitingOperator>(operatorMock)); + } + + underTest.definePartitions(partitions, partitionContextMock); + verify(bandwidthManagerMock, times(2)).setBandwidth(5L); + } + +} +
