Repository: apex-core Updated Branches: refs/heads/master 938361374 -> aa81bea30
http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java index 3e208c8..952a36b 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java @@ -152,8 +152,8 @@ public class ProcessingModeTests public static class CollectorOperator extends BaseOperator implements com.datatorrent.api.Operator.CheckpointListener { - public static HashSet<Long> collection = new HashSet<Long>(20); - public static ArrayList<Long> duplicates = new ArrayList<Long>(); + public static HashSet<Long> collection = new HashSet<>(20); + public static ArrayList<Long> duplicates = new ArrayList<>(); private boolean simulateFailure; private long checkPointWindowId; public final transient DefaultInputPort<Long> input = new DefaultInputPort<Long>() @@ -211,7 +211,7 @@ public class ProcessingModeTests { public final transient MyInputPort input1 = new MyInputPort(100); public final transient MyInputPort input2 = new MyInputPort(200); - public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>(); + public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>(); public class MyInputPort extends DefaultInputPort<Integer> { @@ -255,7 +255,7 @@ public class ProcessingModeTests @SuppressWarnings("SleepWhileInLoop") public void testNonLinearOperatorRecovery() throws InterruptedException { - final HashSet<Object> collection = new HashSet<Object>(); + final HashSet<Object> collection = new HashSet<>(); com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0); map.put(OperatorContext.PROCESSING_MODE, processingMode); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java index 8ba57be..d8d97e0 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java @@ -47,7 +47,7 @@ public class SliderTest emit = true; } - public final transient DefaultOutputPort<Integer> defaultOutputPort = new DefaultOutputPort<Integer>(); + public final transient DefaultOutputPort<Integer> defaultOutputPort = new DefaultOutputPort<>(); @Override public void emitTuples() http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java index 4625368..47f1ea0 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java @@ -83,7 +83,7 @@ public class StatsTest public static class TestInputStatsListener implements StatsListener, Serializable { private static final long serialVersionUID = 1L; - private List<OperatorStats> inputOperatorStats = new ArrayList<OperatorStats>(); + private List<OperatorStats> inputOperatorStats = new ArrayList<>(); @Override public Response processStats(BatchedOperatorStats stats) @@ -101,7 +101,7 @@ public class StatsTest public static class TestCollector extends GenericTestOperator implements StatsListener { transient long windowId; - List<OperatorStats> collectorOperatorStats = new ArrayList<OperatorStats>(); + List<OperatorStats> collectorOperatorStats = new ArrayList<>(); @Override public Response processStats(BatchedOperatorStats stats) @@ -129,7 +129,7 @@ public class StatsTest public static class TestCollectorStatsListener implements StatsListener, Serializable { private static final long serialVersionUID = 1L; - List<OperatorStats> collectorOperatorStats = new ArrayList<OperatorStats>(); + List<OperatorStats> collectorOperatorStats = new ArrayList<>(); @Override public Response processStats(BatchedOperatorStats stats) http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java index 451972e..36f9d63 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java @@ -93,7 +93,7 @@ public class StreamingContainerTest private static class CommitAwareOperator extends BaseOperator implements CheckpointListener, InputOperator { private transient String name; - public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); @InputPortFieldAnnotation(optional = true) public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java index e6b5cd5..9e6c788 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java @@ -41,9 +41,9 @@ public class TestGeneratorInputOperator implements InputOperator private int emitInterval = 1000; private final int spinMillis = 50; private String myStringProperty; - private final ConcurrentLinkedQueue<String> externallyAddedTuples = new ConcurrentLinkedQueue<String>(); + private final ConcurrentLinkedQueue<String> externallyAddedTuples = new ConcurrentLinkedQueue<>(); @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>(); public int getMaxTuples() { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java index 0c95b75..85125c7 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java @@ -256,7 +256,7 @@ public class WindowGeneratorTest static class RandomNumberGenerator implements InputOperator { - public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>(); + public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>(); @Override public void emitTuples() http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java index 0730289..7dc0686 100644 --- a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java +++ b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java @@ -220,7 +220,7 @@ public class InjectConfigTest public transient String transientProperty = "transientProperty"; public java.util.concurrent.ConcurrentHashMap<String, String> mapProperty = new java.util.concurrent - .ConcurrentHashMap<String, String>(); + .ConcurrentHashMap<>(); public java.util.concurrent.ConcurrentHashMap<String, String> getMapProperty() { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java index d40fd7b..9d66ca3 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java @@ -570,12 +570,12 @@ public class StreamPersistanceTests } }; - public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>(); @Override public Collection definePartitions(Collection partitions, PartitioningContext context) { - Collection<Partition> newPartitions = new ArrayList<Partition>(); + Collection<Partition> newPartitions = new ArrayList<>(); // Mostly for 1 partition we dont need to do this int partitionBits = (Integer.numberOfLeadingZeros(0) - Integer.numberOfLeadingZeros(1)); @@ -590,7 +590,7 @@ public class StreamPersistanceTests // No partitioning done so far.. // Single partition again, but with only even numbers ok? PassThruOperatorWithCodec newInstance = new PassThruOperatorWithCodec(); - Partition partition = new DefaultPartition<PassThruOperatorWithCodec>(newInstance); + Partition partition = new DefaultPartition<>(newInstance); // Consider partitions are 1 & 2 and we are sending only 1 partition // Partition 1 = even numbers @@ -796,7 +796,7 @@ public class StreamPersistanceTests } }; - public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>(); @Override public Collection definePartitions(Collection partitions, PartitioningContext context) http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java index 14f2b1b..705904e 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java @@ -45,7 +45,7 @@ import com.datatorrent.stram.plan.physical.PhysicalPlan.PlanContext; public class TestPlanContext implements PlanContext, StorageAgent { - public List<Runnable> events = new ArrayList<Runnable>(); + public List<Runnable> events = new ArrayList<>(); public Collection<PTOperator> undeploy; public Collection<PTOperator> deploy; public Set<PTContainer> releaseContainers; http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java index 18dfd99..3999ace 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java @@ -625,7 +625,7 @@ public class LogicalPlanConfigurationTest output.emit(tuple); } }; - public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>(); + public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>(); } class DummyOutputOperator extends BaseOperator @@ -644,8 +644,8 @@ public class LogicalPlanConfigurationTest class TestUnifierAttributeModule implements Module { - public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<Integer>(); - public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>(); + public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<>(); + public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<>(); @Override public void populateDAG(DAG dag, Configuration conf) http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java index dd32cc7..1507e2d 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java @@ -173,9 +173,9 @@ public class LogicalPlanTest public static class ValidationOperator extends BaseOperator { - public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<>(); - public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<>(); } public static class CounterOperator extends BaseOperator @@ -641,7 +641,7 @@ public class LogicalPlanTest private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator { - public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>(); @Override public void emitTuples() @@ -653,9 +653,9 @@ public class LogicalPlanTest private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator { @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>(); @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<>(); @Override public void emitTuples() @@ -773,7 +773,7 @@ public class LogicalPlanTest public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException { LogicalPlan dag = new LogicalPlan(); - Attribute<Object> attr = new Attribute<Object>(new TestAttributeValue(), new Object2String()); + Attribute<Object> attr = new Attribute<>(new TestAttributeValue(), new Object2String()); Field nameField = Attribute.class.getDeclaredField("name"); nameField.setAccessible(true); nameField.set(attr, "Test_Attribute"); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java index 1966678..64aaa44 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java @@ -50,7 +50,7 @@ public class ModuleAppTest { Random r = new Random(); - public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>(); + public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>(); @Override public void emitTuples() @@ -73,7 +73,7 @@ public class ModuleAppTest output.emit(tuple); } }; - public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>(); + public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>(); } /* @@ -92,7 +92,7 @@ public class ModuleAppTest output.emit(tuple); } }; - public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>(); + public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>(); } /* @@ -118,8 +118,8 @@ public class ModuleAppTest static class TestModule implements Module { - public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<Integer>(); - public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>(); + public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<>(); + public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<>(); @Override public void populateDAG(DAG dag, Configuration conf) http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java index 5b5583a..7759363 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java @@ -59,7 +59,7 @@ public class TestModuleExpansion private int inputOperatorProp = 0; Random r = new Random(); - public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>(); + public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<>(); @Override public void emitTuples() http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java index 8fad613..956db88 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java @@ -49,10 +49,10 @@ public class TestModules public volatile Object inport1Tuple = null; @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>(); @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<>(); private String emitFormat; http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java index 61a85a5..941f5a4 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java @@ -476,7 +476,7 @@ public class PhysicalPlanTest OperatorMeta o1Meta = dag.getMeta(o1); dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); - TestPartitioner<TestInputOperator<Object>> partitioner = new TestPartitioner<TestInputOperator<Object>>(); + TestPartitioner<TestInputOperator<Object>> partitioner = new TestPartitioner<>(); dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, partitioner); TestPlanContext ctx = new TestPlanContext(); @@ -497,7 +497,7 @@ public class PhysicalPlanTest plan.onStatusUpdate(o1p1); Assert.assertEquals("scale up triggered", 1, ctx.events.size()); // add another partition, keep existing as is - partitioner.extraPartitions.add(new DefaultPartition<TestInputOperator<Object>>(o1)); + partitioner.extraPartitions.add(new DefaultPartition<>(o1)); Runnable r = ctx.events.remove(0); r.run(); partitioner.extraPartitions.clear(); @@ -745,7 +745,7 @@ public class PhysicalPlanTest partitions.add(new DefaultPartition<Operator>(operator, p1Keys, 1, null)); } - ArrayList<Partition<Operator>> lowLoadPartitions = new ArrayList<Partition<Operator>>(); + ArrayList<Partition<Operator>> lowLoadPartitions = new ArrayList<>(); for (Partition<Operator> p : partitions) { lowLoadPartitions.add(new DefaultPartition<>(p.getPartitionedInstance(), p.getPartitionKeys(), -1, null)); } @@ -793,7 +793,7 @@ public class PhysicalPlanTest for (Set<PartitionKeys> expectedKeys: expectedKeysSets) { List<Partition<Operator>> clonePartitions = Lists.newArrayList(); for (PartitionKeys pks: twoBitPartitionKeys) { - Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<InputPort<?>, PartitionKeys>(); + Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<>(); p1Keys.put(operator.inport1, pks); int load = expectedKeys.contains(pks) ? 0 : -1; clonePartitions.add(new DefaultPartition<Operator>(operator, p1Keys, load, null)); @@ -876,7 +876,7 @@ public class PhysicalPlanTest Assert.assertEquals("operators container 0", 1, plan.getContainers().get(0).getOperators().size()); Set<OperatorMeta> c2ExpNodes = Sets.newHashSet(dag.getMeta(o2), dag.getMeta(o3)); - Set<OperatorMeta> c2ActNodes = new HashSet<OperatorMeta>(); + Set<OperatorMeta> c2ActNodes = new HashSet<>(); PTContainer c2 = plan.getContainers().get(1); for (PTOperator pNode : c2.getOperators()) { c2ActNodes.add(pNode.getOperatorMeta()); @@ -1139,7 +1139,7 @@ public class PhysicalPlanTest LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); - TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new TestPartitioner<TestGeneratorInputOperator>(); + TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new TestPartitioner<>(); o1Partitioner.setPartitionCount(2); dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, o1Partitioner); dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); @@ -1312,7 +1312,7 @@ public class PhysicalPlanTest } Assert.assertEquals("repartition event", 1, ctx.events.size()); - o1Partitioner.extraPartitions.add(new DefaultPartition<TestGeneratorInputOperator>(o1)); + o1Partitioner.extraPartitions.add(new DefaultPartition<>(o1)); ctx.events.remove(0).run(); o1Partitioner.extraPartitions.clear(); @@ -1607,7 +1607,7 @@ public class PhysicalPlanTest } T paritionable = first.getPartitionedInstance(); for (int i = partitions.size(); i < numTotal; ++i) { - newPartitions.add(new DefaultPartition<T>(paritionable)); + newPartitions.add(new DefaultPartition<>(paritionable)); } return newPartitions; } @@ -1767,7 +1767,7 @@ public class PhysicalPlanTest List<PTOperator> o1Unifiers = plan.getMergeOperators(o1Meta); Assert.assertEquals("o1Unifiers " + o1Meta, 3, o1Unifiers.size()); // 2 cascadingUnifiers and one-downstream partition unifier - List<PTOperator> finalUnifiers = new ArrayList<PTOperator>(); + List<PTOperator> finalUnifiers = new ArrayList<>(); for (PTOperator o : o1Unifiers) { Assert.assertEquals("inputs " + o, 2, o.getInputs().size()); Assert.assertEquals("outputs " + o, 1, o.getOutputs().size()); @@ -2054,7 +2054,7 @@ public class PhysicalPlanTest if (context.getParallelPartitionCount() > 0 && newPartitions.size() < context.getParallelPartitionCount()) { // parallel partitioned, fill to requested count for (int i = newPartitions.size(); i < context.getParallelPartitionCount(); i++) { - newPartitions.add(new DefaultPartition<T>(partitions.iterator().next().getPartitionedInstance())); + newPartitions.add(new DefaultPartition<>(partitions.iterator().next().getPartitionedInstance())); } } return newPartitions; @@ -2215,9 +2215,9 @@ public class PhysicalPlanTest GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4); dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); - dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2)); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2)); dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 1024); - dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2)); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2)); dag.setOperatorAttribute(o2, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); dag.setOperatorAttribute(o2, OperatorContext.APPLICATION_WINDOW_COUNT, 4); @@ -2237,7 +2237,7 @@ public class PhysicalPlanTest GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); - dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2)); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2)); dag.setInputPortAttribute(o2.inport1, PortContext.PARTITION_PARALLEL, true); dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java index 77334db..2cb3e58 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java @@ -41,7 +41,7 @@ public class BufferServerSubscriberTest @Test public void testEmergencySinks() throws InterruptedException { - final List<Object> list = new ArrayList<Object>(); + final List<Object> list = new ArrayList<>(); final StreamCodec<Object> myserde = new StreamCodec<Object>() { @Override http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java index e094d44..a487176 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java @@ -50,7 +50,7 @@ public class FastPublisherTest byte[] buffer = publisher.consume(); FastSubscriber subscriber = new FastSubscriber("subscriber", 1024); - subscriber.serde = subscriber.statefulSerde = new DefaultStatefulStreamCodec<Object>(); + subscriber.serde = subscriber.statefulSerde = new DefaultStatefulStreamCodec<>(); SweepableReservoir sr = subscriber.acquireReservoir("res", 1024); sr.setSink(new Sink<Object>() { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java index fd67121..32e5095 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java @@ -94,7 +94,7 @@ public class FastStreamTest @SuppressWarnings({"SleepWhileInLoop"}) public void testBufferServerStream() throws Exception { - final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>(); + final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>(); final AtomicInteger messageCount = new AtomicInteger(); Sink<Object> sink = new Sink<Object>() { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java index ed145c7..9db6fe9 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java @@ -55,13 +55,13 @@ public class InlineStreamTest { final int totalTupleCount = 5000; - final PassThroughNode<Object> operator1 = new PassThroughNode<Object>(); + final PassThroughNode<Object> operator1 = new PassThroughNode<>(); final GenericNode node1 = new GenericNode(operator1, new OperatorContext(1, "operator1", new DefaultAttributeMap(), null)); node1.setId(1); operator1.setup(node1.context); - final PassThroughNode<Object> operator2 = new PassThroughNode<Object>(); + final PassThroughNode<Object> operator2 = new PassThroughNode<>(); final GenericNode node2 = new GenericNode(operator2, new OperatorContext(2, "operator2", new DefaultAttributeMap(), null)); node2.setId(2); @@ -115,7 +115,7 @@ public class InlineStreamTest AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input", 1024 * 5); node1.connectInputPort("input", reservoir1); - Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<Integer, Node<?>>(); + Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<>(); launchNodeThread(node1, activeNodes); launchNodeThread(node2, activeNodes); stream.activate(streamContext); @@ -206,7 +206,7 @@ public class InlineStreamTest } }; - public final DefaultOutputPort<T> output = new DefaultOutputPort<T>(); + public final DefaultOutputPort<T> output = new DefaultOutputPort<>(); private boolean logMessages = false; public boolean isLogMessages() http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java index f0748eb..287a796 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java @@ -47,7 +47,7 @@ public class OiOEndWindowTest public static class TestInputOperator extends BaseOperator implements InputOperator { - public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>(); + public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>(); @Override public void emitTuples() @@ -60,7 +60,7 @@ public class OiOEndWindowTest public static class FirstGenericOperator extends BaseOperator { public static long endwindowCount; - public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>(); + public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>(); public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>() { @Override http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java index e614750..26e913b 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java @@ -298,7 +298,7 @@ public class OiOStreamTest public static class ThreadIdValidatingInputOperator implements InputOperator { - public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>(); + public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>(); public static long threadId; @Override @@ -416,12 +416,12 @@ public class OiOStreamTest assert (threadList.contains(Thread.currentThread().getId())); } - public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>(); + public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>(); } public static class ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts extends ThreadIdValidatingGenericIntermediateOperator { - public final transient DefaultOutputPort<Long> output2 = new DefaultOutputPort<Long>(); + public final transient DefaultOutputPort<Long> output2 = new DefaultOutputPort<>(); } public static class ThreadIdValidatingGenericOperatorWithTwoInputPorts implements Operator @@ -621,9 +621,9 @@ public class OiOStreamTest slc.run(); Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size()); - Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3, (new HashSet<Long>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size()); + Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size()); Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size()); - Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4, (new HashSet<Long>(ThreadIdValidatingOutputOperator.threadList)).size()); + Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size()); Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId)); Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId)); @@ -640,9 +640,9 @@ public class OiOStreamTest slc.run(); Assert.assertEquals("OIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size()); - Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1, (new HashSet<Long>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size()); + Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size()); Assert.assertEquals("OIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size()); - Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3, (new HashSet<Long>(ThreadIdValidatingOutputOperator.threadList)).size()); + Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size()); Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId)); Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId)); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java index 38460ea..15de177 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java @@ -149,7 +149,7 @@ public class SocketStreamTest @Before public void init() { - final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>(); + final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>(); messageCount = new AtomicInteger(0); Sink<Object> sink = new Sink<Object>() http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java index 902b8b6..af821ea 100644 --- a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java +++ b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java @@ -37,7 +37,7 @@ public class ManualScheduledExecutorService extends ScheduledThreadPoolExecutor public long time; } - PriorityQueue<TimedRunnable> queue = new PriorityQueue<TimedRunnable>(16, new Comparator<TimedRunnable>() + PriorityQueue<TimedRunnable> queue = new PriorityQueue<>(16, new Comparator<TimedRunnable>() { @Override public int compare(TimedRunnable o1, TimedRunnable o2) http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java index 0326b6a..7399aff 100644 --- a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java +++ b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java @@ -322,7 +322,7 @@ public abstract class StramTestSupport public static class TestHomeDirectory extends TestWatcher { - Map<String, String> env = new HashMap<String, String>(); + Map<String, String> env = new HashMap<>(); String userHome; @Override @@ -444,7 +444,7 @@ public abstract class StramTestSupport private static final long serialVersionUID = 201404091805L; } - transient HashMap<OperatorWindowIdPair, Object> store = new HashMap<OperatorWindowIdPair, Object>(); + transient HashMap<OperatorWindowIdPair, Object> store = new HashMap<>(); @Override public synchronized void save(Object object, int operatorId, long windowId) throws IOException @@ -467,7 +467,7 @@ public abstract class StramTestSupport @Override public synchronized long[] getWindowIds(int operatorId) throws IOException { - ArrayList<Long> windowIds = new ArrayList<Long>(); + ArrayList<Long> windowIds = new ArrayList<>(); for (OperatorWindowIdPair key : store.keySet()) { if (key.operatorId == operatorId) { windowIds.add(key.windowId); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java index 5bc70dc..9d12fdc 100644 --- a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java +++ b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java @@ -65,7 +65,7 @@ public class StablePriorityQueueTest @Test public void testElement() { - StablePriorityQueue<Integer> instance = new StablePriorityQueue<Integer>(1); + StablePriorityQueue<Integer> instance = new StablePriorityQueue<>(1); Integer i = 10; instance.add(i); Object result = instance.element(); @@ -78,7 +78,7 @@ public class StablePriorityQueueTest @Test public void testOffer() { - StablePriorityQueue<Integer> instance = new StablePriorityQueue<Integer>(1); + StablePriorityQueue<Integer> instance = new StablePriorityQueue<>(1); Integer i = 10; assertTrue(instance.offer(i)); Object result = instance.peek(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java index 9ac28c0..e1fb860 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java @@ -109,9 +109,9 @@ public class OperatorDiscoveryTest }; @OutputPortFieldAnnotation(optional = false, error = true) - public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); - public final transient DefaultOutputPort<Double> output1 = new DefaultOutputPort<Double>(); + public final transient DefaultOutputPort<Double> output1 = new DefaultOutputPort<>(); @Override public String getName() @@ -417,7 +417,7 @@ public class OperatorDiscoveryTest { String classpath = System.getProperty("java.class.path"); String[] paths = classpath.split(":"); - List<String> fnames = new LinkedList<String>(); + List<String> fnames = new LinkedList<>(); for (String cp : paths) { File f = new File(cp); if (!f.isDirectory()) { @@ -480,7 +480,7 @@ public class OperatorDiscoveryTest @Test public void testValueSerialization() throws Exception { - TestOperator<String, Map<String, Number>> bean = new TestOperator<String, Map<String, Number>>(); + TestOperator<String, Map<String, Number>> bean = new TestOperator<>(); bean.map.put("key1", new Structured()); bean.stringArray = new String[]{"one", "two", "three"}; bean.stringList = Lists.newArrayList("four", "five"); @@ -491,7 +491,7 @@ public class OperatorDiscoveryTest bean.structuredArray[0].name = "s1"; bean.color = Color.BLUE; bean.booleanProp = true; - bean.nestedList = new LinkedList<OperatorDiscoveryTest.Structured>(); + bean.nestedList = new LinkedList<>(); Structured st = new Structured(); st.name = "nestedone"; st.size = 10; @@ -640,15 +640,15 @@ public class OperatorDiscoveryTest private List<Structured> nestedList; private Properties props; private Structured nested; - private Map<String, Structured> map = new HashMap<String, Structured>(); + private Map<String, Structured> map = new HashMap<>(); private String[] stringArray; private Color color; private Structured[] structuredArray; private T[] genericArray; - private Map<String, List<Map<String, Number>>> nestedParameterizedType = new HashMap<String, List<Map<String, Number>>>(); + private Map<String, List<Map<String, Number>>> nestedParameterizedType = new HashMap<>(); private Map<?, ? super Long> wildcardType = new HashMap<Object, Number>(); - private List<int[]> listofIntArray = new LinkedList<int[]>(); - private List<T> parameterizedTypeVariable = new LinkedList<T>(); + private List<int[]> listofIntArray = new LinkedList<>(); + private List<T> parameterizedTypeVariable = new LinkedList<>(); private Z genericType; private int[][] multiDimensionPrimitiveArray; private Structured[][] multiDimensionComplexArray; @@ -1056,7 +1056,7 @@ public class OperatorDiscoveryTest @Test public void testLogicalPlanConfiguration() throws Exception { - TestOperator<String, Map<String, Number>> bean = new InputTestOperator<String, Map<String, Number>>(); + TestOperator<String, Map<String, Number>> bean = new InputTestOperator<>(); bean.map.put("key1", new Structured()); bean.stringArray = new String[]{"one", "two", "three"}; bean.stringList = Lists.newArrayList("four", "five"); @@ -1113,12 +1113,12 @@ public class OperatorDiscoveryTest public static class SchemaRequiredOperator extends BaseOperator implements InputOperator { @OutputPortFieldAnnotation(schemaRequired = true) - public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>(); @OutputPortFieldAnnotation(schemaRequired = false) - public final transient DefaultOutputPort<Object> output1 = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> output1 = new DefaultOutputPort<>(); - public final transient DefaultOutputPort<Object> output2 = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> output2 = new DefaultOutputPort<>(); @Override public void emitTuples() http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java index 9f414dc..76fa963 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java @@ -113,7 +113,7 @@ public class StramWebServicesTest extends JerseyTest lastRequests = requests; // delegate processing to dispatch thread - FutureTask<Object> future = new FutureTask<Object>(new Callable<Object>() + FutureTask<Object> future = new FutureTask<>(new Callable<Object>() { @Override public Object call() throws Exception @@ -351,7 +351,7 @@ public class StramWebServicesTest extends JerseyTest @Test public void testSubmitLogicalPlanChange() throws JSONException, Exception { - List<LogicalPlanRequest> requests = new ArrayList<LogicalPlanRequest>(); + List<LogicalPlanRequest> requests = new ArrayList<>(); WebResource r = resource(); CreateOperatorRequest request1 = new CreateOperatorRequest(); @@ -366,7 +366,7 @@ public class StramWebServicesTest extends JerseyTest requests.add(request2); ObjectMapper mapper = new ObjectMapper(); - final Map<String, Object> m = new HashMap<String, Object>(); + final Map<String, Object> m = new HashMap<>(); m.put("requests", requests); final JSONObject jsonRequest = new JSONObject(mapper.writeValueAsString(m)); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java index 09b8785..8674e9f 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java @@ -87,10 +87,10 @@ public class TypeDiscoveryTest { } }; - final OutputPort<T2> outportT2 = new DefaultOutputPort<T2>(); - final OutputPort<Number> outportNumberParam = new DefaultOutputPort<Number>(); + final OutputPort<T2> outportT2 = new DefaultOutputPort<>(); + final OutputPort<Number> outportNumberParam = new DefaultOutputPort<>(); final StringOutputPort outportString = new StringOutputPort(this); - final OutputPort<List<T0>> outportList = new DefaultOutputPort<List<T0>>(); + final OutputPort<List<T0>> outportList = new DefaultOutputPort<>(); final GenericSubClassOutputPort outClassObject = new GenericSubClassOutputPort(this); } @@ -107,8 +107,8 @@ public class TypeDiscoveryTest { } }; - final OutputPort<Map<String, Number>> outportT2 = new DefaultOutputPort<Map<String, Number>>(); - final OutputPort<Number> outportNumberParam = new DefaultOutputPort<Number>(); + final OutputPort<Map<String, Number>> outportT2 = new DefaultOutputPort<>(); + final OutputPort<Number> outportNumberParam = new DefaultOutputPort<>(); final StringOutputPort outportString = new StringOutputPort(this); } @@ -163,7 +163,7 @@ public class TypeDiscoveryTest static class ParameterizedTypeOperator<T> extends BaseOperator { - final OutputPort<T> output = new DefaultOutputPort<T>(); + final OutputPort<T> output = new DefaultOutputPort<>(); } static class StringParameterOperator extends ParameterizedTypeOperator<String>
