Repository: apex-core
Updated Branches:
  refs/heads/master 938361374 -> aa81bea30


http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java 
b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index 3e208c8..952a36b 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -152,8 +152,8 @@ public class ProcessingModeTests
 
   public static class CollectorOperator extends BaseOperator implements 
com.datatorrent.api.Operator.CheckpointListener
   {
-    public static HashSet<Long> collection = new HashSet<Long>(20);
-    public static ArrayList<Long> duplicates = new ArrayList<Long>();
+    public static HashSet<Long> collection = new HashSet<>(20);
+    public static ArrayList<Long> duplicates = new ArrayList<>();
     private boolean simulateFailure;
     private long checkPointWindowId;
     public final transient DefaultInputPort<Long> input = new 
DefaultInputPort<Long>()
@@ -211,7 +211,7 @@ public class ProcessingModeTests
   {
     public final transient MyInputPort input1 = new MyInputPort(100);
     public final transient MyInputPort input2 = new MyInputPort(200);
-    public final transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<Integer>();
+    public final transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<>();
 
     public class MyInputPort extends DefaultInputPort<Integer>
     {
@@ -255,7 +255,7 @@ public class ProcessingModeTests
   @SuppressWarnings("SleepWhileInLoop")
   public void testNonLinearOperatorRecovery() throws InterruptedException
   {
-    final HashSet<Object> collection = new HashSet<Object>();
+    final HashSet<Object> collection = new HashSet<>();
     com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new 
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
     map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
     map.put(OperatorContext.PROCESSING_MODE, processingMode);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
index 8ba57be..d8d97e0 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
@@ -47,7 +47,7 @@ public class SliderTest
       emit = true;
     }
 
-    public final transient DefaultOutputPort<Integer> defaultOutputPort = new 
DefaultOutputPort<Integer>();
+    public final transient DefaultOutputPort<Integer> defaultOutputPort = new 
DefaultOutputPort<>();
 
     @Override
     public void emitTuples()

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
index 4625368..47f1ea0 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
@@ -83,7 +83,7 @@ public class StatsTest
     public static class TestInputStatsListener implements StatsListener, 
Serializable
     {
       private static final long serialVersionUID = 1L;
-      private List<OperatorStats> inputOperatorStats = new 
ArrayList<OperatorStats>();
+      private List<OperatorStats> inputOperatorStats = new ArrayList<>();
 
       @Override
       public Response processStats(BatchedOperatorStats stats)
@@ -101,7 +101,7 @@ public class StatsTest
   public static class TestCollector extends GenericTestOperator implements 
StatsListener
   {
     transient long windowId;
-    List<OperatorStats> collectorOperatorStats = new 
ArrayList<OperatorStats>();
+    List<OperatorStats> collectorOperatorStats = new ArrayList<>();
 
     @Override
     public Response processStats(BatchedOperatorStats stats)
@@ -129,7 +129,7 @@ public class StatsTest
     public static class TestCollectorStatsListener implements StatsListener, 
Serializable
     {
       private static final long serialVersionUID = 1L;
-      List<OperatorStats> collectorOperatorStats = new 
ArrayList<OperatorStats>();
+      List<OperatorStats> collectorOperatorStats = new ArrayList<>();
 
       @Override
       public Response processStats(BatchedOperatorStats stats)

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
index 451972e..36f9d63 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
@@ -93,7 +93,7 @@ public class StreamingContainerTest
   private static class CommitAwareOperator extends BaseOperator implements 
CheckpointListener, InputOperator
   {
     private transient String name;
-    public final transient DefaultOutputPort<String> output = new 
DefaultOutputPort<String>();
+    public final transient DefaultOutputPort<String> output = new 
DefaultOutputPort<>();
 
     @InputPortFieldAnnotation(optional = true)
     public final transient DefaultInputPort<String> input = new 
DefaultInputPort<String>()

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
 
b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
index e6b5cd5..9e6c788 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
@@ -41,9 +41,9 @@ public class TestGeneratorInputOperator implements 
InputOperator
   private int emitInterval = 1000;
   private final int spinMillis = 50;
   private String myStringProperty;
-  private final ConcurrentLinkedQueue<String> externallyAddedTuples = new 
ConcurrentLinkedQueue<String>();
+  private final ConcurrentLinkedQueue<String> externallyAddedTuples = new 
ConcurrentLinkedQueue<>();
   @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<Object> outport = new 
DefaultOutputPort<Object>();
+  public final transient DefaultOutputPort<Object> outport = new 
DefaultOutputPort<>();
 
   public int getMaxTuples()
   {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index 0c95b75..85125c7 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -256,7 +256,7 @@ public class WindowGeneratorTest
 
   static class RandomNumberGenerator implements InputOperator
   {
-    public final transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<Integer>();
+    public final transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<>();
 
     @Override
     public void emitTuples()

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
 
b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
index 0730289..7dc0686 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
@@ -220,7 +220,7 @@ public class InjectConfigTest
     public transient String transientProperty = "transientProperty";
 
     public java.util.concurrent.ConcurrentHashMap<String, String> mapProperty 
= new java.util.concurrent
-        .ConcurrentHashMap<String, String>();
+        .ConcurrentHashMap<>();
 
     public java.util.concurrent.ConcurrentHashMap<String, String> 
getMapProperty()
     {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java 
b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
index d40fd7b..9d66ca3 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -570,12 +570,12 @@ public class StreamPersistanceTests
       }
     };
 
-    public final transient DefaultOutputPort<Object> output = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new 
DefaultOutputPort<>();
 
     @Override
     public Collection definePartitions(Collection partitions, 
PartitioningContext context)
     {
-      Collection<Partition> newPartitions = new ArrayList<Partition>();
+      Collection<Partition> newPartitions = new ArrayList<>();
 
       // Mostly for 1 partition we dont need to do this
       int partitionBits = (Integer.numberOfLeadingZeros(0) - 
Integer.numberOfLeadingZeros(1));
@@ -590,7 +590,7 @@ public class StreamPersistanceTests
         // No partitioning done so far..
         // Single partition again, but with only even numbers ok?
         PassThruOperatorWithCodec newInstance = new 
PassThruOperatorWithCodec();
-        Partition partition = new 
DefaultPartition<PassThruOperatorWithCodec>(newInstance);
+        Partition partition = new DefaultPartition<>(newInstance);
 
         // Consider partitions are 1 & 2 and we are sending only 1 partition
         // Partition 1 = even numbers
@@ -796,7 +796,7 @@ public class StreamPersistanceTests
       }
     };
 
-    public final transient DefaultOutputPort<Object> output = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new 
DefaultOutputPort<>();
 
     @Override
     public Collection definePartitions(Collection partitions, 
PartitioningContext context)

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java 
b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
index 14f2b1b..705904e 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
@@ -45,7 +45,7 @@ import 
com.datatorrent.stram.plan.physical.PhysicalPlan.PlanContext;
 
 public class TestPlanContext implements PlanContext, StorageAgent
 {
-  public List<Runnable> events = new ArrayList<Runnable>();
+  public List<Runnable> events = new ArrayList<>();
   public Collection<PTOperator> undeploy;
   public Collection<PTOperator> deploy;
   public Set<PTContainer> releaseContainers;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index 18dfd99..3999ace 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -625,7 +625,7 @@ public class LogicalPlanConfigurationTest
           output.emit(tuple);
         }
       };
-      public transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<Integer>();
+      public transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<>();
     }
 
     class DummyOutputOperator extends BaseOperator
@@ -644,8 +644,8 @@ public class LogicalPlanConfigurationTest
 
     class TestUnifierAttributeModule implements Module
     {
-      public transient ProxyInputPort<Integer> moduleInput = new 
ProxyInputPort<Integer>();
-      public transient ProxyOutputPort<Integer> moduleOutput = new 
Module.ProxyOutputPort<Integer>();
+      public transient ProxyInputPort<Integer> moduleInput = new 
ProxyInputPort<>();
+      public transient ProxyOutputPort<Integer> moduleOutput = new 
Module.ProxyOutputPort<>();
 
       @Override
       public void populateDAG(DAG dag, Configuration conf)

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index dd32cc7..1507e2d 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -173,9 +173,9 @@ public class LogicalPlanTest
 
   public static class ValidationOperator extends BaseOperator
   {
-    public final transient DefaultOutputPort<Object> goodOutputPort = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> goodOutputPort = new 
DefaultOutputPort<>();
 
-    public final transient DefaultOutputPort<Object> badOutputPort = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> badOutputPort = new 
DefaultOutputPort<>();
   }
 
   public static class CounterOperator extends BaseOperator
@@ -641,7 +641,7 @@ public class LogicalPlanTest
 
   private class TestAnnotationsOperator2 extends BaseOperator implements 
InputOperator
   {
-    public final transient DefaultOutputPort<Object> outport1 = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport1 = new 
DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -653,9 +653,9 @@ public class LogicalPlanTest
   private class TestAnnotationsOperator3 extends BaseOperator implements 
InputOperator
   {
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport1 = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport1 = new 
DefaultOutputPort<>();
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport2 = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport2 = new 
DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -773,7 +773,7 @@ public class LogicalPlanTest
   public void testAttributeValuesSerializableCheck() throws 
NoSuchFieldException, SecurityException, IllegalArgumentException, 
IllegalAccessException
   {
     LogicalPlan dag = new LogicalPlan();
-    Attribute<Object> attr = new Attribute<Object>(new TestAttributeValue(), 
new Object2String());
+    Attribute<Object> attr = new Attribute<>(new TestAttributeValue(), new 
Object2String());
     Field nameField = Attribute.class.getDeclaredField("name");
     nameField.setAccessible(true);
     nameField.set(attr, "Test_Attribute");

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
index 1966678..64aaa44 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
@@ -50,7 +50,7 @@ public class ModuleAppTest
   {
 
     Random r = new Random();
-    public transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -73,7 +73,7 @@ public class ModuleAppTest
         output.emit(tuple);
       }
     };
-    public transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<>();
   }
 
   /*
@@ -92,7 +92,7 @@ public class ModuleAppTest
         output.emit(tuple);
       }
     };
-    public transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<>();
   }
 
   /*
@@ -118,8 +118,8 @@ public class ModuleAppTest
   static class TestModule implements Module
   {
 
-    public transient ProxyInputPort<Integer> moduleInput = new 
ProxyInputPort<Integer>();
-    public transient ProxyOutputPort<Integer> moduleOutput = new 
Module.ProxyOutputPort<Integer>();
+    public transient ProxyInputPort<Integer> moduleInput = new 
ProxyInputPort<>();
+    public transient ProxyOutputPort<Integer> moduleOutput = new 
Module.ProxyOutputPort<>();
 
     @Override
     public void populateDAG(DAG dag, Configuration conf)

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
index 5b5583a..7759363 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
@@ -59,7 +59,7 @@ public class TestModuleExpansion
     private int inputOperatorProp = 0;
 
     Random r = new Random();
-    public transient DefaultOutputPort<Integer> out = new 
DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> out = new 
DefaultOutputPort<>();
 
     @Override
     public void emitTuples()

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
index 8fad613..956db88 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
@@ -49,10 +49,10 @@ public class TestModules
     public volatile Object inport1Tuple = null;
 
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport1 = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport1 = new 
DefaultOutputPort<>();
 
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport2 = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport2 = new 
DefaultOutputPort<>();
 
     private String emitFormat;
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index 61a85a5..941f5a4 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -476,7 +476,7 @@ public class PhysicalPlanTest
 
     OperatorMeta o1Meta = dag.getMeta(o1);
     dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
-    TestPartitioner<TestInputOperator<Object>> partitioner = new 
TestPartitioner<TestInputOperator<Object>>();
+    TestPartitioner<TestInputOperator<Object>> partitioner = new 
TestPartitioner<>();
     dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, partitioner);
 
     TestPlanContext ctx = new TestPlanContext();
@@ -497,7 +497,7 @@ public class PhysicalPlanTest
     plan.onStatusUpdate(o1p1);
     Assert.assertEquals("scale up triggered", 1, ctx.events.size());
     // add another partition, keep existing as is
-    partitioner.extraPartitions.add(new 
DefaultPartition<TestInputOperator<Object>>(o1));
+    partitioner.extraPartitions.add(new DefaultPartition<>(o1));
     Runnable r = ctx.events.remove(0);
     r.run();
     partitioner.extraPartitions.clear();
@@ -745,7 +745,7 @@ public class PhysicalPlanTest
       partitions.add(new DefaultPartition<Operator>(operator, p1Keys, 1, 
null));
     }
 
-    ArrayList<Partition<Operator>> lowLoadPartitions = new 
ArrayList<Partition<Operator>>();
+    ArrayList<Partition<Operator>> lowLoadPartitions = new ArrayList<>();
     for (Partition<Operator> p : partitions) {
       lowLoadPartitions.add(new DefaultPartition<>(p.getPartitionedInstance(), 
p.getPartitionKeys(), -1, null));
     }
@@ -793,7 +793,7 @@ public class PhysicalPlanTest
     for (Set<PartitionKeys> expectedKeys: expectedKeysSets) {
       List<Partition<Operator>> clonePartitions = Lists.newArrayList();
       for (PartitionKeys pks: twoBitPartitionKeys) {
-        Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<InputPort<?>, 
PartitionKeys>();
+        Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<>();
         p1Keys.put(operator.inport1, pks);
         int load = expectedKeys.contains(pks) ? 0 : -1;
         clonePartitions.add(new DefaultPartition<Operator>(operator, p1Keys, 
load, null));
@@ -876,7 +876,7 @@ public class PhysicalPlanTest
 
     Assert.assertEquals("operators container 0", 1, 
plan.getContainers().get(0).getOperators().size());
     Set<OperatorMeta> c2ExpNodes = Sets.newHashSet(dag.getMeta(o2), 
dag.getMeta(o3));
-    Set<OperatorMeta> c2ActNodes = new HashSet<OperatorMeta>();
+    Set<OperatorMeta> c2ActNodes = new HashSet<>();
     PTContainer c2 = plan.getContainers().get(1);
     for (PTOperator pNode : c2.getOperators()) {
       c2ActNodes.add(pNode.getOperatorMeta());
@@ -1139,7 +1139,7 @@ public class PhysicalPlanTest
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator o1 = dag.addOperator("o1", 
TestGeneratorInputOperator.class);
-    TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new 
TestPartitioner<TestGeneratorInputOperator>();
+    TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new 
TestPartitioner<>();
     o1Partitioner.setPartitionCount(2);
     dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, o1Partitioner);
     dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
@@ -1312,7 +1312,7 @@ public class PhysicalPlanTest
       }
 
       Assert.assertEquals("repartition event", 1, ctx.events.size());
-      o1Partitioner.extraPartitions.add(new 
DefaultPartition<TestGeneratorInputOperator>(o1));
+      o1Partitioner.extraPartitions.add(new DefaultPartition<>(o1));
       ctx.events.remove(0).run();
       o1Partitioner.extraPartitions.clear();
 
@@ -1607,7 +1607,7 @@ public class PhysicalPlanTest
       }
       T paritionable = first.getPartitionedInstance();
       for (int i = partitions.size(); i < numTotal; ++i) {
-        newPartitions.add(new DefaultPartition<T>(paritionable));
+        newPartitions.add(new DefaultPartition<>(paritionable));
       }
       return newPartitions;
     }
@@ -1767,7 +1767,7 @@ public class PhysicalPlanTest
 
     List<PTOperator> o1Unifiers = plan.getMergeOperators(o1Meta);
     Assert.assertEquals("o1Unifiers " + o1Meta, 3, o1Unifiers.size()); // 2 
cascadingUnifiers and one-downstream partition unifier
-    List<PTOperator> finalUnifiers = new ArrayList<PTOperator>();
+    List<PTOperator> finalUnifiers = new ArrayList<>();
     for (PTOperator o : o1Unifiers) {
       Assert.assertEquals("inputs " + o, 2, o.getInputs().size());
       Assert.assertEquals("outputs " + o, 1, o.getOutputs().size());
@@ -2054,7 +2054,7 @@ public class PhysicalPlanTest
       if (context.getParallelPartitionCount() > 0 && newPartitions.size() < 
context.getParallelPartitionCount()) {
         // parallel partitioned, fill to requested count
         for (int i = newPartitions.size(); i < 
context.getParallelPartitionCount(); i++) {
-          newPartitions.add(new 
DefaultPartition<T>(partitions.iterator().next().getPartitionedInstance()));
+          newPartitions.add(new 
DefaultPartition<>(partitions.iterator().next().getPartitionedInstance()));
         }
       }
       return newPartitions;
@@ -2215,9 +2215,9 @@ public class PhysicalPlanTest
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
     dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
     dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
-    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<>(2));
     
dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB,
 1024);
-    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<>(2));
     dag.setOperatorAttribute(o2, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
     dag.setOperatorAttribute(o2, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
 
@@ -2237,7 +2237,7 @@ public class PhysicalPlanTest
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
     dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
-    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<>(2));
     dag.setInputPortAttribute(o2.inport1, PortContext.PARTITION_PARALLEL, 
true);
     dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
 
b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
index 77334db..2cb3e58 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
@@ -41,7 +41,7 @@ public class BufferServerSubscriberTest
   @Test
   public void testEmergencySinks() throws InterruptedException
   {
-    final List<Object> list = new ArrayList<Object>();
+    final List<Object> list = new ArrayList<>();
     final StreamCodec<Object> myserde = new StreamCodec<Object>()
     {
       @Override

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java 
b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
index e094d44..a487176 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
@@ -50,7 +50,7 @@ public class FastPublisherTest
     byte[] buffer = publisher.consume();
 
     FastSubscriber subscriber = new FastSubscriber("subscriber", 1024);
-    subscriber.serde = subscriber.statefulSerde = new 
DefaultStatefulStreamCodec<Object>();
+    subscriber.serde = subscriber.statefulSerde = new 
DefaultStatefulStreamCodec<>();
     SweepableReservoir sr = subscriber.acquireReservoir("res", 1024);
     sr.setSink(new Sink<Object>()
     {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java 
b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
index fd67121..32e5095 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
@@ -94,7 +94,7 @@ public class FastStreamTest
   @SuppressWarnings({"SleepWhileInLoop"})
   public void testBufferServerStream() throws Exception
   {
-    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
+    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
     final AtomicInteger messageCount = new AtomicInteger();
     Sink<Object> sink = new Sink<Object>()
     {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java 
b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
index ed145c7..9db6fe9 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
@@ -55,13 +55,13 @@ public class InlineStreamTest
   {
     final int totalTupleCount = 5000;
 
-    final PassThroughNode<Object> operator1 = new PassThroughNode<Object>();
+    final PassThroughNode<Object> operator1 = new PassThroughNode<>();
     final GenericNode node1 = new GenericNode(operator1, new 
OperatorContext(1, "operator1", new DefaultAttributeMap(),
         null));
     node1.setId(1);
     operator1.setup(node1.context);
 
-    final PassThroughNode<Object> operator2 = new PassThroughNode<Object>();
+    final PassThroughNode<Object> operator2 = new PassThroughNode<>();
     final GenericNode node2 = new GenericNode(operator2, new 
OperatorContext(2, "operator2", new DefaultAttributeMap(),
         null));
     node2.setId(2);
@@ -115,7 +115,7 @@ public class InlineStreamTest
     AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input", 
1024 * 5);
     node1.connectInputPort("input", reservoir1);
 
-    Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<Integer, 
Node<?>>();
+    Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<>();
     launchNodeThread(node1, activeNodes);
     launchNodeThread(node2, activeNodes);
     stream.activate(streamContext);
@@ -206,7 +206,7 @@ public class InlineStreamTest
       }
 
     };
-    public final DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+    public final DefaultOutputPort<T> output = new DefaultOutputPort<>();
     private boolean logMessages = false;
 
     public boolean isLogMessages()

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java 
b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
index f0748eb..287a796 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
@@ -47,7 +47,7 @@ public class OiOEndWindowTest
 
   public static class TestInputOperator extends BaseOperator implements 
InputOperator
   {
-    public final transient DefaultOutputPort<Long> output = new 
DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new 
DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -60,7 +60,7 @@ public class OiOEndWindowTest
   public static class FirstGenericOperator extends BaseOperator
   {
     public static long endwindowCount;
-    public final transient DefaultOutputPort<Long> output = new 
DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new 
DefaultOutputPort<>();
     public final transient DefaultInputPort<Number> input = new 
DefaultInputPort<Number>()
     {
       @Override

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java 
b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
index e614750..26e913b 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
@@ -298,7 +298,7 @@ public class OiOStreamTest
 
   public static class ThreadIdValidatingInputOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<Long> output = new 
DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new 
DefaultOutputPort<>();
     public static long threadId;
 
     @Override
@@ -416,12 +416,12 @@ public class OiOStreamTest
       assert (threadList.contains(Thread.currentThread().getId()));
     }
 
-    public final transient DefaultOutputPort<Long> output = new 
DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new 
DefaultOutputPort<>();
   }
 
   public static class 
ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts extends 
ThreadIdValidatingGenericIntermediateOperator
   {
-    public final transient DefaultOutputPort<Long> output2 = new 
DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output2 = new 
DefaultOutputPort<>();
   }
 
   public static class ThreadIdValidatingGenericOperatorWithTwoInputPorts 
implements Operator
@@ -621,9 +621,9 @@ public class OiOStreamTest
     slc.run();
 
     Assert.assertEquals("nonOIO: Number of threads 
ThreadIdValidatingGenericIntermediateOperator", 3, 
ThreadIdValidatingGenericIntermediateOperator.threadList.size());
-    Assert.assertEquals("nonOIO: Number of unique threads 
ThreadIdValidatingGenericIntermediateOperator", 3, (new 
HashSet<Long>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
+    Assert.assertEquals("nonOIO: Number of unique threads 
ThreadIdValidatingGenericIntermediateOperator", 3, (new 
HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
     Assert.assertEquals("nonOIO: Number of threads 
ThreadIdValidatingOutputOperator", 4, 
ThreadIdValidatingOutputOperator.threadList.size());
-    Assert.assertEquals("nonOIO: Number of unique threads 
ThreadIdValidatingOutputOperator", 4, (new 
HashSet<Long>(ThreadIdValidatingOutputOperator.threadList)).size());
+    Assert.assertEquals("nonOIO: Number of unique threads 
ThreadIdValidatingOutputOperator", 4, (new 
HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size());
     Assert.assertFalse("nonOIO:: inputOperator1 : 
ThreadIdValidatingOutputOperator", 
ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
     Assert.assertFalse("nonOIO:: inputOperator1 : 
ThreadIdValidatingGenericIntermediateOperator", 
ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
 
@@ -640,9 +640,9 @@ public class OiOStreamTest
     slc.run();
 
     Assert.assertEquals("OIO: Number of threads 
ThreadIdValidatingGenericIntermediateOperator", 3, 
ThreadIdValidatingGenericIntermediateOperator.threadList.size());
-    Assert.assertEquals("OIO: Number of unique threads 
ThreadIdValidatingGenericIntermediateOperator", 1, (new 
HashSet<Long>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
+    Assert.assertEquals("OIO: Number of unique threads 
ThreadIdValidatingGenericIntermediateOperator", 1, (new 
HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
     Assert.assertEquals("OIO: Number of threads 
ThreadIdValidatingOutputOperator", 4, 
ThreadIdValidatingOutputOperator.threadList.size());
-    Assert.assertEquals("OIO: Number of unique threads 
ThreadIdValidatingOutputOperator", 3, (new 
HashSet<Long>(ThreadIdValidatingOutputOperator.threadList)).size());
+    Assert.assertEquals("OIO: Number of unique threads 
ThreadIdValidatingOutputOperator", 3, (new 
HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size());
     Assert.assertTrue("OIO:: inputOperator1 : 
ThreadIdValidatingOutputOperator", 
ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
     Assert.assertTrue("OIO:: inputOperator1 : 
ThreadIdValidatingGenericIntermediateOperator", 
ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java 
b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
index 38460ea..15de177 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
@@ -149,7 +149,7 @@ public class SocketStreamTest
   @Before
   public void init()
   {
-    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
+    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
     messageCount = new AtomicInteger(0);
 
     Sink<Object> sink = new Sink<Object>()

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
 
b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
index 902b8b6..af821ea 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
@@ -37,7 +37,7 @@ public class ManualScheduledExecutorService extends 
ScheduledThreadPoolExecutor
     public long time;
   }
 
-  PriorityQueue<TimedRunnable> queue = new PriorityQueue<TimedRunnable>(16, 
new Comparator<TimedRunnable>()
+  PriorityQueue<TimedRunnable> queue = new PriorityQueue<>(16, new 
Comparator<TimedRunnable>()
   {
     @Override
     public int compare(TimedRunnable o1, TimedRunnable o2)

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java 
b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
index 0326b6a..7399aff 100644
--- a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
+++ b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
@@ -322,7 +322,7 @@ public abstract class StramTestSupport
   public static class TestHomeDirectory extends TestWatcher
   {
 
-    Map<String, String> env = new HashMap<String, String>();
+    Map<String, String> env = new HashMap<>();
     String userHome;
 
     @Override
@@ -444,7 +444,7 @@ public abstract class StramTestSupport
       private static final long serialVersionUID = 201404091805L;
     }
 
-    transient HashMap<OperatorWindowIdPair, Object> store = new 
HashMap<OperatorWindowIdPair, Object>();
+    transient HashMap<OperatorWindowIdPair, Object> store = new HashMap<>();
 
     @Override
     public synchronized void save(Object object, int operatorId, long 
windowId) throws IOException
@@ -467,7 +467,7 @@ public abstract class StramTestSupport
     @Override
     public synchronized long[] getWindowIds(int operatorId) throws IOException
     {
-      ArrayList<Long> windowIds = new ArrayList<Long>();
+      ArrayList<Long> windowIds = new ArrayList<>();
       for (OperatorWindowIdPair key : store.keySet()) {
         if (key.operatorId == operatorId) {
           windowIds.add(key.windowId);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java 
b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
index 5bc70dc..9d12fdc 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
@@ -65,7 +65,7 @@ public class StablePriorityQueueTest
   @Test
   public void testElement()
   {
-    StablePriorityQueue<Integer> instance = new 
StablePriorityQueue<Integer>(1);
+    StablePriorityQueue<Integer> instance = new StablePriorityQueue<>(1);
     Integer i = 10;
     instance.add(i);
     Object result = instance.element();
@@ -78,7 +78,7 @@ public class StablePriorityQueueTest
   @Test
   public void testOffer()
   {
-    StablePriorityQueue<Integer> instance = new 
StablePriorityQueue<Integer>(1);
+    StablePriorityQueue<Integer> instance = new StablePriorityQueue<>(1);
     Integer i = 10;
     assertTrue(instance.offer(i));
     Object result = instance.peek();

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java 
b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
index 9ac28c0..e1fb860 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
@@ -109,9 +109,9 @@ public class OperatorDiscoveryTest
     };
 
     @OutputPortFieldAnnotation(optional = false, error = true)
-    public final transient DefaultOutputPort<String> output = new 
DefaultOutputPort<String>();
+    public final transient DefaultOutputPort<String> output = new 
DefaultOutputPort<>();
 
-    public final transient DefaultOutputPort<Double> output1 = new 
DefaultOutputPort<Double>();
+    public final transient DefaultOutputPort<Double> output1 = new 
DefaultOutputPort<>();
 
     @Override
     public String getName()
@@ -417,7 +417,7 @@ public class OperatorDiscoveryTest
   {
     String classpath = System.getProperty("java.class.path");
     String[] paths = classpath.split(":");
-    List<String> fnames = new LinkedList<String>();
+    List<String> fnames = new LinkedList<>();
     for (String cp : paths) {
       File f = new File(cp);
       if (!f.isDirectory()) {
@@ -480,7 +480,7 @@ public class OperatorDiscoveryTest
   @Test
   public void testValueSerialization() throws Exception
   {
-    TestOperator<String, Map<String, Number>> bean = new TestOperator<String, 
Map<String, Number>>();
+    TestOperator<String, Map<String, Number>> bean = new TestOperator<>();
     bean.map.put("key1", new Structured());
     bean.stringArray = new String[]{"one", "two", "three"};
     bean.stringList = Lists.newArrayList("four", "five");
@@ -491,7 +491,7 @@ public class OperatorDiscoveryTest
     bean.structuredArray[0].name = "s1";
     bean.color = Color.BLUE;
     bean.booleanProp = true;
-    bean.nestedList = new LinkedList<OperatorDiscoveryTest.Structured>();
+    bean.nestedList = new LinkedList<>();
     Structured st = new Structured();
     st.name = "nestedone";
     st.size = 10;
@@ -640,15 +640,15 @@ public class OperatorDiscoveryTest
     private List<Structured> nestedList;
     private Properties props;
     private Structured nested;
-    private Map<String, Structured> map = new HashMap<String, Structured>();
+    private Map<String, Structured> map = new HashMap<>();
     private String[] stringArray;
     private Color color;
     private Structured[] structuredArray;
     private T[] genericArray;
-    private Map<String, List<Map<String, Number>>> nestedParameterizedType = 
new HashMap<String, List<Map<String, Number>>>();
+    private Map<String, List<Map<String, Number>>> nestedParameterizedType = 
new HashMap<>();
     private Map<?, ? super Long> wildcardType = new HashMap<Object, Number>();
-    private List<int[]> listofIntArray = new LinkedList<int[]>();
-    private List<T> parameterizedTypeVariable = new LinkedList<T>();
+    private List<int[]> listofIntArray = new LinkedList<>();
+    private List<T> parameterizedTypeVariable = new LinkedList<>();
     private Z genericType;
     private int[][] multiDimensionPrimitiveArray;
     private Structured[][] multiDimensionComplexArray;
@@ -1056,7 +1056,7 @@ public class OperatorDiscoveryTest
   @Test
   public void testLogicalPlanConfiguration() throws Exception
   {
-    TestOperator<String, Map<String, Number>> bean = new 
InputTestOperator<String, Map<String, Number>>();
+    TestOperator<String, Map<String, Number>> bean = new InputTestOperator<>();
     bean.map.put("key1", new Structured());
     bean.stringArray = new String[]{"one", "two", "three"};
     bean.stringList = Lists.newArrayList("four", "five");
@@ -1113,12 +1113,12 @@ public class OperatorDiscoveryTest
   public static class SchemaRequiredOperator extends BaseOperator implements 
InputOperator
   {
     @OutputPortFieldAnnotation(schemaRequired = true)
-    public final transient DefaultOutputPort<Object> output = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new 
DefaultOutputPort<>();
 
     @OutputPortFieldAnnotation(schemaRequired = false)
-    public final transient DefaultOutputPort<Object> output1 = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output1 = new 
DefaultOutputPort<>();
 
-    public final transient DefaultOutputPort<Object> output2 = new 
DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output2 = new 
DefaultOutputPort<>();
 
     @Override
     public void emitTuples()

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java 
b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
index 9f414dc..76fa963 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
@@ -113,7 +113,7 @@ public class StramWebServicesTest extends JerseyTest
         lastRequests = requests;
 
         // delegate processing to dispatch thread
-        FutureTask<Object> future = new FutureTask<Object>(new 
Callable<Object>()
+        FutureTask<Object> future = new FutureTask<>(new Callable<Object>()
         {
           @Override
           public Object call() throws Exception
@@ -351,7 +351,7 @@ public class StramWebServicesTest extends JerseyTest
   @Test
   public void testSubmitLogicalPlanChange() throws JSONException, Exception
   {
-    List<LogicalPlanRequest> requests = new ArrayList<LogicalPlanRequest>();
+    List<LogicalPlanRequest> requests = new ArrayList<>();
     WebResource r = resource();
 
     CreateOperatorRequest request1 = new CreateOperatorRequest();
@@ -366,7 +366,7 @@ public class StramWebServicesTest extends JerseyTest
     requests.add(request2);
 
     ObjectMapper mapper = new ObjectMapper();
-    final Map<String, Object> m = new HashMap<String, Object>();
+    final Map<String, Object> m = new HashMap<>();
     m.put("requests", requests);
     final JSONObject jsonRequest = new 
JSONObject(mapper.writeValueAsString(m));
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java 
b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
index 09b8785..8674e9f 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
@@ -87,10 +87,10 @@ public class TypeDiscoveryTest
       {
       }
     };
-    final OutputPort<T2> outportT2 = new DefaultOutputPort<T2>();
-    final OutputPort<Number> outportNumberParam = new 
DefaultOutputPort<Number>();
+    final OutputPort<T2> outportT2 = new DefaultOutputPort<>();
+    final OutputPort<Number> outportNumberParam = new DefaultOutputPort<>();
     final StringOutputPort outportString = new StringOutputPort(this);
-    final OutputPort<List<T0>> outportList = new DefaultOutputPort<List<T0>>();
+    final OutputPort<List<T0>> outportList = new DefaultOutputPort<>();
     final GenericSubClassOutputPort outClassObject = new 
GenericSubClassOutputPort(this);
   }
 
@@ -107,8 +107,8 @@ public class TypeDiscoveryTest
       {
       }
     };
-    final OutputPort<Map<String, Number>> outportT2 = new 
DefaultOutputPort<Map<String, Number>>();
-    final OutputPort<Number> outportNumberParam = new 
DefaultOutputPort<Number>();
+    final OutputPort<Map<String, Number>> outportT2 = new 
DefaultOutputPort<>();
+    final OutputPort<Number> outportNumberParam = new DefaultOutputPort<>();
     final StringOutputPort outportString = new StringOutputPort(this);
   }
 
@@ -163,7 +163,7 @@ public class TypeDiscoveryTest
 
   static class ParameterizedTypeOperator<T> extends BaseOperator
   {
-    final OutputPort<T> output = new DefaultOutputPort<T>();
+    final OutputPort<T> output = new DefaultOutputPort<>();
   }
 
   static class StringParameterOperator extends 
ParameterizedTypeOperator<String>

Reply via email to