Repository: apex-malhar
Updated Branches:
  refs/heads/master 389a2d564 -> 13883da68


APEXMALHAR-2045: Adding bandwidth control feature to limit Input Operator 
bandwidth limit


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/13883da6
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/13883da6
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/13883da6

Branch: refs/heads/master
Commit: 13883da68abc0419120655971b0437f3e5ef9435
Parents: 389a2d5
Author: Priyanka Gugale <[email protected]>
Authored: Wed Apr 6 18:15:50 2016 +0530
Committer: Priyanka Gugale <[email protected]>
Committed: Thu Jun 30 18:17:50 2016 +0530

----------------------------------------------------------------------
 library/pom.xml                                 |   7 +-
 .../bandwidth/BandwidthLimitingOperator.java    |  29 ++
 .../lib/bandwidth/BandwidthManager.java         | 131 +++++++++
 .../lib/bandwidth/BandwidthPartitioner.java     |  78 ++++++
 .../lib/bandwidth/BandwidthManagerTest.java     | 268 +++++++++++++++++++
 .../lib/bandwidth/BandwidthPartitionerTest.java | 106 ++++++++
 6 files changed, 618 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index edb9078..7242027 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -329,7 +329,12 @@
       <artifactId>apex-shaded-ning19</artifactId>
       <version>1.0.0</version>
     </dependency>
-
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.8.5</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthLimitingOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthLimitingOperator.java
 
b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthLimitingOperator.java
new file mode 100644
index 0000000..2ff964d
--- /dev/null
+++ 
b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthLimitingOperator.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import com.datatorrent.api.Operator;
+
+/**
+ * Operator which limits bandwidth consumption. It should have instance of 
BandwidthManager.
+ */
+public interface BandwidthLimitingOperator extends Operator
+{
+  BandwidthManager getBandwidthManager();
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java 
b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java
new file mode 100644
index 0000000..303a3a6
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * BandwidthManager keeps track of bandwidth consumption and provides limit on 
maximum bandwidth that can be consumed at
+ * any moment. This accumulates bandwidth upto certain limits so that 
accumulated bandwidth can be used over a period of
+ * time.
+ */
+public class BandwidthManager implements Component<Context.OperatorContext>
+{
+  private static final Logger LOG = 
LoggerFactory.getLogger(BandwidthManager.class);
+  /**
+   * Maximum bandwidth that can be consumed in bytes/sec
+   */
+  private long bandwidthLimit = Long.MAX_VALUE;
+  private transient long currentBandwidthConsumption;
+  private final transient ScheduledExecutorService scheduler;
+  private final transient Object lock = new Object();
+
+  public BandwidthManager()
+  {
+    scheduler = Executors.newScheduledThreadPool(1);
+  }
+
+  BandwidthManager(ScheduledExecutorService scheduler)
+  {
+    this.scheduler = scheduler;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    scheduler.scheduleAtFixedRate(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        if (isBandwidthRestricted()) {
+          synchronized (lock) {
+            if (currentBandwidthConsumption < 0) {
+              currentBandwidthConsumption += bandwidthLimit;
+            } else {
+              currentBandwidthConsumption = bandwidthLimit;
+            }
+            LOG.debug("Available bandwidth: " + currentBandwidthConsumption);
+          }
+        }
+      }
+    }, 1, 1, TimeUnit.SECONDS);
+  }
+
+  public boolean canConsumeBandwidth()
+  {
+    if (!isBandwidthRestricted()) {
+      return true;
+    }
+    synchronized (lock) {
+      return currentBandwidthConsumption >= 0;
+    }
+  }
+
+  public void consumeBandwidth(long sentTupleSize)
+  {
+    if (isBandwidthRestricted()) {
+      synchronized (lock) {
+        currentBandwidthConsumption -= sentTupleSize;
+      }
+    }
+  }
+
+  public boolean isBandwidthRestricted()
+  {
+    return bandwidthLimit != Long.MAX_VALUE;
+  }
+
+  /**
+   * get maximum bandwidth that can be consumed in bytes/sec
+   *
+   * @return
+   */
+  public long getBandwidth()
+  {
+    return bandwidthLimit;
+  }
+
+  /**
+   * Set maximum bandwidth that can be consumed in bytes/sec
+   *
+   * @param bandwidth
+   */
+  public void setBandwidth(long bandwidth)
+  {
+    this.bandwidthLimit = bandwidth;
+    LOG.debug("Bandwidth limit is set to: " + bandwidth + " bytes/sec");
+  }
+
+  @Override
+  public void teardown()
+  {
+    scheduler.shutdownNow();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java 
b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java
new file mode 100644
index 0000000..6d876a7
--- /dev/null
+++ 
b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+
+public class BandwidthPartitioner<T extends BandwidthLimitingOperator> extends 
StatelessPartitioner<T>
+{
+  private static final long serialVersionUID = -7502505996637650237L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(BandwidthPartitioner.class);
+
+  /**
+   * This creates a partitioner which creates only one partition.
+   */
+  public BandwidthPartitioner()
+  {
+  }
+
+  /**
+   * This constructor is used to create the partitioner from a property.
+   * 
+   * @param value A string which is an integer of the number of partitions to 
create
+   */
+  public BandwidthPartitioner(String value)
+  {
+    super(value);
+  }
+
+  /**
+   * This creates a partitioner which creates partitonCount partitions.
+   * 
+   * @param partitionCount The number of partitions to create.
+   */
+  public BandwidthPartitioner(int partitionCount)
+  {
+    super(partitionCount);
+  }
+
+  @Override
+  public Collection<Partition<T>> definePartitions(Collection<Partition<T>> 
partitions, PartitioningContext context)
+  {
+    long currentBandwidth = 
partitions.iterator().next().getPartitionedInstance().getBandwidthManager().getBandwidth()
+        * partitions.size();
+    Collection<Partition<T>> newpartitions = 
super.definePartitions(partitions, context);
+    return updateBandwidth(newpartitions, currentBandwidth);
+  }
+
+  public Collection<Partition<T>> updateBandwidth(Collection<Partition<T>> 
newpartitions, long currentBandwidth)
+  {
+    long newBandwidth = currentBandwidth / newpartitions.size();
+    for (Partition<T> partition : newpartitions) {
+      
partition.getPartitionedInstance().getBandwidthManager().setBandwidth(newBandwidth);
+    }
+    LOG.info("Updating bandwidth of partitions to value: " + newBandwidth);
+    return newpartitions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthManagerTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthManagerTest.java 
b/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthManagerTest.java
new file mode 100644
index 0000000..743a8e2
--- /dev/null
+++ 
b/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthManagerTest.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.bandwidth;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.apache.commons.io.FileUtils;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+public class BandwidthManagerTest
+{
+  private static class TestMeta extends TestWatcher
+  {
+    private static final long ONE_SECOND = 1000L;
+    private String applicationPath;
+    private BandwidthManager underTest;
+    private Context.OperatorContext context;
+    private long bandwidthLimit = 10L;
+    private ScheduledExecutorService mockschedular;
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      mockschedular = new ScheduledExecutorService()
+      {
+        private Runnable command;
+
+        @Override
+        public void shutdown()
+        {
+        }
+
+        @Override
+        public List<Runnable> shutdownNow()
+        {
+          return null;
+        }
+
+        @Override
+        public boolean isShutdown()
+        {
+          return false;
+        }
+
+        @Override
+        public boolean isTerminated()
+        {
+          return false;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException
+        {
+          return false;
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task)
+        {
+          return null;
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result)
+        {
+          return null;
+        }
+
+        @Override
+        public Future<?> submit(Runnable task)
+        {
+          return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks) throws InterruptedException
+        {
+          return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks, long timeout, TimeUnit unit) throws InterruptedException
+        {
+          return null;
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws 
InterruptedException, ExecutionException
+        {
+          return null;
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit) throws InterruptedException, ExecutionException, 
TimeoutException
+        {
+          return null;
+        }
+
+        @Override
+        public void execute(Runnable command)
+        {
+          this.command.run();
+        }
+
+        @Override
+        public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit)
+        {
+          this.command = command;
+          return null;
+        }
+
+        @Override
+        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long 
delay, TimeUnit unit)
+        {
+          return null;
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit)
+        {
+          this.command = command;
+          return null;
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
long initialDelay, long delay, TimeUnit unit)
+        {
+          this.command = command;
+          return null;
+        }
+      };
+      underTest = new BandwidthManager(mockschedular);
+      underTest.setBandwidth(bandwidthLimit);
+
+      applicationPath = "target/" + description.getClassName() + "/" + 
description.getMethodName();
+      Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.APPLICATION_PATH, applicationPath);
+      context = new OperatorContextTestHelper.TestIdOperatorContext(1, 
attributes);
+      underTest.setup(context);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      underTest.teardown();
+      try {
+        FileUtils.deleteDirectory(new File("target/" + 
description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testBandwidthForLargeBlocks() throws InterruptedException
+  {
+    String data = "Tuple: test data to be emitted.";
+    long timeCounter = 0;
+    testMeta.underTest.consumeBandwidth(data.length());
+    while (!testMeta.underTest.canConsumeBandwidth()) {
+      timeCounter += TestMeta.ONE_SECOND;
+      testMeta.mockschedular.execute(null); // accumulate bandwidth
+    }
+    Assert.assertTrue(timeCounter > ((data.length() / testMeta.bandwidthLimit) 
* 1000));
+  }
+
+  @Test
+  public void testBandwidthForSmallBlocks()
+  {
+    String data = "Tuple";
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+    testMeta.underTest.consumeBandwidth(data.length());
+    testMeta.mockschedular.execute(null);
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+    testMeta.underTest.consumeBandwidth(data.length());
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+    testMeta.underTest.consumeBandwidth(data.length());
+    Assert.assertFalse(testMeta.underTest.canConsumeBandwidth());
+  }
+
+  @Test
+  public void testBandwidthForMultipleBlocks()
+  {
+    int[] tupleSizes = {5, 2, 5, 4, 10, 25, 2};
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+    testMeta.underTest.consumeBandwidth(tupleSizes[0]);
+    testMeta.mockschedular.execute(null);
+
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+    testMeta.underTest.consumeBandwidth(tupleSizes[1]);
+
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+    testMeta.underTest.consumeBandwidth(tupleSizes[2]);
+    Assert.assertFalse(testMeta.underTest.canConsumeBandwidth());
+    testMeta.mockschedular.execute(null);
+
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+    testMeta.underTest.consumeBandwidth(tupleSizes[3]);
+
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+    testMeta.underTest.consumeBandwidth(tupleSizes[4]);
+    Assert.assertFalse(testMeta.underTest.canConsumeBandwidth());
+    testMeta.mockschedular.execute(null);
+
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+    testMeta.underTest.consumeBandwidth(tupleSizes[5]);
+
+    Assert.assertFalse(testMeta.underTest.canConsumeBandwidth());
+    testMeta.mockschedular.execute(null);
+    Assert.assertFalse(testMeta.underTest.canConsumeBandwidth());
+    testMeta.mockschedular.execute(null);
+    Assert.assertFalse(testMeta.underTest.canConsumeBandwidth());
+    testMeta.mockschedular.execute(null);
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+    testMeta.underTest.consumeBandwidth(tupleSizes[6]);
+
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+  }
+
+  @Test
+  public void testUnsetBandwidth()
+  {
+    testMeta.underTest.setBandwidth(Integer.MAX_VALUE);
+    Assert.assertTrue(testMeta.underTest.canConsumeBandwidth());
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13883da6/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthPartitionerTest.java
 
b/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthPartitionerTest.java
new file mode 100644
index 0000000..e852d98
--- /dev/null
+++ 
b/library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthPartitionerTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.bandwidth;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.Partitioner.Partition;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class BandwidthPartitionerTest
+{
+  @Mock
+  private BandwidthManager bandwidthManagerMock;
+  @Mock
+  private BandwidthLimitingOperator operatorMock;
+  @Mock
+  private Partition<BandwidthLimitingOperator> partitionMock;
+  @Mock
+  private Partitioner.PartitioningContext partitionContextMock;
+  @Mock
+  private Iterator<Partition<BandwidthLimitingOperator>> iteratorMock;
+  private BandwidthPartitioner<BandwidthLimitingOperator> underTest = new 
BandwidthPartitioner<BandwidthLimitingOperator>();
+
+  @Before
+  public void setup()
+  {
+    MockitoAnnotations.initMocks(this);
+    when(iteratorMock.hasNext()).thenReturn(true, false);
+    when(iteratorMock.next()).thenReturn(partitionMock);
+    when(partitionMock.getPartitionedInstance()).thenReturn(operatorMock);
+    when(operatorMock.getBandwidthManager()).thenReturn(bandwidthManagerMock);
+    when(bandwidthManagerMock.getBandwidth()).thenReturn(10L);
+    when(partitionContextMock.getInputPorts()).thenReturn(null);
+  }
+
+  @Test
+  public void testBandwidthOnPartitions()
+  {
+    when(partitionContextMock.getParallelPartitionCount()).thenReturn(0); // 
no partitions
+    Collection<Partition<BandwidthLimitingOperator>> partitions = 
Lists.newArrayList();
+    DefaultPartition<BandwidthLimitingOperator> defaultPartition = new 
DefaultPartition<BandwidthLimitingOperator>(operatorMock);
+    partitions.add(defaultPartition);
+
+    underTest.definePartitions(partitions, partitionContextMock);
+    verify(bandwidthManagerMock).setBandwidth(10L);
+  }
+
+  @Test
+  public void testBandwidthOnIncresedPartitions()
+  {
+    when(partitionContextMock.getParallelPartitionCount()).thenReturn(5);
+    Collection<Partition<BandwidthLimitingOperator>> partitions = 
Lists.newArrayList();
+    DefaultPartition<BandwidthLimitingOperator> defaultPartition = new 
DefaultPartition<BandwidthLimitingOperator>(operatorMock);
+    partitions.add(defaultPartition);
+
+    underTest.definePartitions(partitions, partitionContextMock);
+    verify(bandwidthManagerMock, times(5)).setBandwidth(2L);
+  }
+
+  @Test
+  public void testBandwidthOnReducedPartitions()
+  {
+    when(partitionContextMock.getParallelPartitionCount()).thenReturn(2);
+    when(bandwidthManagerMock.getBandwidth()).thenReturn(2L);
+    Collection<Partition<BandwidthLimitingOperator>> partitions = 
Lists.newArrayList();
+
+    for (int i = 5; i-- > 0;) {
+      partitions.add(new 
DefaultPartition<BandwidthLimitingOperator>(operatorMock));
+    }
+
+    underTest.definePartitions(partitions, partitionContextMock);
+    verify(bandwidthManagerMock, times(2)).setBandwidth(5L);
+  }
+
+}
+

Reply via email to