APEXCORE-691 Use type inference for generic instance creation closes #505
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/aa81bea3 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/aa81bea3 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/aa81bea3 Branch: refs/heads/master Commit: aa81bea306aed51fd881d97ce62a01537eeb2003 Parents: 9383613 Author: Apex Dev <[email protected]> Authored: Fri Apr 7 10:08:45 2017 -0700 Committer: Thomas Weise <[email protected]> Committed: Sun Apr 9 13:38:47 2017 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/api/AffinityRule.java | 2 +- .../java/com/datatorrent/api/Attribute.java | 6 +- .../main/java/com/datatorrent/api/Context.java | 76 ++++++++++---------- .../java/com/datatorrent/api/StringCodec.java | 8 +-- .../org/apache/apex/api/YarnAppLauncher.java | 6 +- .../com/datatorrent/api/AttributeMapTest.java | 2 +- .../bufferserver/internal/DataList.java | 10 +-- .../bufferserver/internal/LogicalNode.java | 4 +- .../datatorrent/bufferserver/server/Server.java | 4 +- .../datatorrent/bufferserver/util/System.java | 2 +- .../packet/SubscribeRequestTupleTest.java | 2 +- .../bufferserver/support/Subscriber.java | 2 +- .../partitioner/StatelessPartitioner.java | 18 ++--- .../common/security/SecurityContext.java | 6 +- .../common/util/DefaultDelayOperator.java | 2 +- .../util/JacksonObjectMapperProvider.java | 6 +- .../common/util/PubSubMessageCodec.java | 14 ++-- .../partitioner/StatelessPartitionerTest.java | 18 ++--- .../common/util/SerializableObjectTest.java | 4 +- .../BlacklistBasedResourceRequestHandler.java | 4 +- .../stram/ResourceRequestHandler.java | 4 +- .../stram/client/PermissionsInfo.java | 8 +-- .../stram/client/StramClientUtils.java | 2 +- .../stram/engine/OperatorContext.java | 4 +- .../stram/plan/logical/LogicalPlan.java | 10 +-- .../stram/plan/physical/PhysicalPlan.java | 4 +- .../stram/util/PubSubWebSocketClient.java | 4 +- .../stram/webapp/asm/CompactUtil.java | 2 +- .../com/datatorrent/stram/webapp/asm/Type.java | 4 +- .../com/datatorrent/stram/CheckpointTest.java | 4 +- .../stram/GenericOperatorPropertyCodecTest.java | 2 +- .../com/datatorrent/stram/PartitioningTest.java | 24 +++---- .../com/datatorrent/stram/StreamCodecTest.java | 8 +-- .../stram/StreamingContainerManagerTest.java | 4 +- .../datatorrent/stram/cli/ApexCliMiscTest.java | 2 +- .../com/datatorrent/stram/cli/ApexCliTest.java | 2 +- .../codec/DefaultStatefulStreamCodecTest.java | 8 +-- .../stram/engine/AtMostOnceTest.java | 2 +- .../stram/engine/GenericNodeTest.java | 12 ++-- .../stram/engine/InputOperatorTest.java | 10 +-- .../com/datatorrent/stram/engine/NodeTest.java | 2 +- .../stram/engine/ProcessingModeTests.java | 8 +-- .../datatorrent/stram/engine/SliderTest.java | 2 +- .../com/datatorrent/stram/engine/StatsTest.java | 6 +- .../stram/engine/StreamingContainerTest.java | 2 +- .../engine/TestGeneratorInputOperator.java | 4 +- .../stram/engine/WindowGeneratorTest.java | 2 +- .../moduleexperiment/InjectConfigTest.java | 2 +- .../stram/plan/StreamPersistanceTests.java | 8 +-- .../datatorrent/stram/plan/TestPlanContext.java | 2 +- .../logical/LogicalPlanConfigurationTest.java | 6 +- .../stram/plan/logical/LogicalPlanTest.java | 12 ++-- .../plan/logical/module/ModuleAppTest.java | 10 +-- .../logical/module/TestModuleExpansion.java | 2 +- .../stram/plan/logical/module/TestModules.java | 4 +- .../stram/plan/physical/PhysicalPlanTest.java | 26 +++---- .../stream/BufferServerSubscriberTest.java | 2 +- .../stram/stream/FastPublisherTest.java | 2 +- .../stram/stream/FastStreamTest.java | 2 +- .../stram/stream/InlineStreamTest.java | 8 +-- .../stram/stream/OiOEndWindowTest.java | 4 +- .../datatorrent/stram/stream/OiOStreamTest.java | 14 ++-- .../stram/stream/SocketStreamTest.java | 2 +- .../support/ManualScheduledExecutorService.java | 2 +- .../stram/support/StramTestSupport.java | 6 +- .../stram/util/StablePriorityQueueTest.java | 4 +- .../stram/webapp/OperatorDiscoveryTest.java | 26 +++---- .../stram/webapp/StramWebServicesTest.java | 6 +- .../stram/webapp/TypeDiscoveryTest.java | 12 ++-- 69 files changed, 252 insertions(+), 252 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/AffinityRule.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/AffinityRule.java b/api/src/main/java/com/datatorrent/api/AffinityRule.java index 5e10ccd..304c086 100644 --- a/api/src/main/java/com/datatorrent/api/AffinityRule.java +++ b/api/src/main/java/com/datatorrent/api/AffinityRule.java @@ -92,7 +92,7 @@ public class AffinityRule implements Serializable public AffinityRule(Type type, Locality locality, boolean relaxLocality, String firstOperator, String... otherOperators) { this(type, locality, relaxLocality); - LinkedList<String> operators = new LinkedList<String>(); + LinkedList<String> operators = new LinkedList<>(); if (firstOperator != null && otherOperators.length >= 1) { operators.add(firstOperator); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/Attribute.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java index 2efc84f..821ecb2 100644 --- a/api/src/main/java/com/datatorrent/api/Attribute.java +++ b/api/src/main/java/com/datatorrent/api/Attribute.java @@ -236,11 +236,11 @@ public class Attribute<T> implements Serializable */ public static class AttributeInitializer { - static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<Class<?>, Set<Attribute<Object>>>(); + static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<>(); public static Map<Attribute<Object>, Object> getAllAttributes(Context context, Class<?> clazz) { - Map<Attribute<Object>, Object> result = new HashMap<Attribute<Object>, Object>(); + Map<Attribute<Object>, Object> result = new HashMap<>(); try { for (Field f: clazz.getDeclaredFields()) { if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) { @@ -273,7 +273,7 @@ public class Attribute<T> implements Serializable if (map.containsKey(clazz)) { return 0; } - Set<Attribute<Object>> set = new HashSet<Attribute<Object>>(); + Set<Attribute<Object>> set = new HashSet<>(); try { for (Field f: clazz.getDeclaredFields()) { if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 3d3cffe..94022ff 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -121,17 +121,17 @@ public interface Context /** * Number of tuples the poll buffer can cache without blocking the input stream to the port. */ - Attribute<Integer> QUEUE_CAPACITY = new Attribute<Integer>(1024); + Attribute<Integer> QUEUE_CAPACITY = new Attribute<>(1024); /** * The amount of buffer memory this port requires. There is a buffer server in each container. This is used to calculate total buffer server memory for container. * Also due to the nature of the application, if buffer server needs to use more RAM, from time to time, this number may * not be adhered to. */ - Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<Integer>(8 * 64); + Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<>(8 * 64); /** * Poll period in milliseconds when the port buffer reaches its limits. */ - Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10); + Attribute<Integer> SPIN_MILLIS = new Attribute<>(10); /** * Input port attribute. Extend partitioning of an upstream operator w/o intermediate merge. * Can be used to form parallel partitions that span a group of operators. @@ -139,7 +139,7 @@ public interface Context * If multiple ports of an operator have the setting, incoming streams must track back to * a common root partition, i.e. the operator join forks of the same origin. */ - Attribute<Boolean> PARTITION_PARALLEL = new Attribute<Boolean>(false); + Attribute<Boolean> PARTITION_PARALLEL = new Attribute<>(false); /** * Attribute of output port to specify how many partitions should be merged by a single unifier instance. If the * number of partitions exceeds the limit set, a cascading unifier plan will be created. For example, 4 partitions @@ -147,7 +147,7 @@ public interface Context * network I/O or other resource requirement for each unifier container (depends on the specific functionality of * the unifier), enabling horizontal scale by overcoming the single unifier bottleneck. */ - Attribute<Integer> UNIFIER_LIMIT = new Attribute<Integer>(Integer.MAX_VALUE); + Attribute<Integer> UNIFIER_LIMIT = new Attribute<>(Integer.MAX_VALUE); /** * Attribute to specify that the final unifier be always a single unifier. This is useful when in MxN partitioning @@ -158,16 +158,16 @@ public interface Context * the inputs. In this case the default unifier behavior can be specified on the output port and individual * exceptions can be specified on the corresponding input ports. */ - Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<Boolean>(Boolean.FALSE); + Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<>(Boolean.FALSE); /** * Whether or not to auto record the tuples */ - Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false); + Attribute<Boolean> AUTO_RECORD = new Attribute<>(false); /** * Whether the output is unified. * This is a read-only attribute to query that whether the output of the operator from multiple instances is being unified. */ - Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<Boolean>(false); + Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<>(false); /** * Provide the codec which can be used to serialize or deserialize the data * that can be received on the port. If it is unspecified the engine may use @@ -193,13 +193,13 @@ public interface Context * of the operator. On subsequent run, it's the windowId of the checkpoint from which the operator state * is recovered. */ - Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<Long>(Stateless.WINDOW_ID); + Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<>(Stateless.WINDOW_ID); /** * It is a maximum poll period in milliseconds when there are no tuples available on any of the input ports of the * operator. Platform uses the heuristic to change poll period from 0 to SPIN_MILLIS seconds. * Default value is 10 milliseconds. */ - Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10); + Attribute<Integer> SPIN_MILLIS = new Attribute<>(10); /** * The maximum number of attempts to restart a failing operator before shutting down the application. * Until this number is reached, when an operator fails to start it is re-spawned in a new container. Once all the @@ -218,15 +218,15 @@ public interface Context * by the engine. The attribute is ignored when the operator was already declared stateless through the * {@link Stateless} annotation. */ - Attribute<Boolean> STATELESS = new Attribute<Boolean>(false); + Attribute<Boolean> STATELESS = new Attribute<>(false); /** * Memory resource that the operator requires for optimal functioning. Used to calculate total memory requirement for containers. */ - Attribute<Integer> MEMORY_MB = new Attribute<Integer>(1024); + Attribute<Integer> MEMORY_MB = new Attribute<>(1024); /** * CPU Cores that the operator requires for optimal functioning. Used to calculate total CPU Cores requirement for containers. */ - Attribute<Integer> VCORES = new Attribute<Integer>(0); + Attribute<Integer> VCORES = new Attribute<>(0); /** * The options to be pass to JVM when launching the operator. Options such as java maximum heap size can be specified here. @@ -235,7 +235,7 @@ public interface Context /** * Attribute of the operator that tells the platform how many streaming windows make 1 application window. */ - Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<Integer>(1); + Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<>(1); /** * When set it changes the computation to sliding window computation where duration is determined using {@link #APPLICATION_WINDOW_COUNT} that is * slided by duration determined using value of this attribute. Default value is null which is equivalent to that of {@link #APPLICATION_WINDOW_COUNT}. @@ -251,7 +251,7 @@ public interface Context * value. Typically user would define this value to be the same as that of APPLICATION_WINDOW_COUNT so checkpointing * will be done at application window boundary. */ - Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(1); + Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(1); /** * Name of host to directly control locality of an operator. Complementary to stream locality (NODE_LOCAL affinity). * For example, the user may wish to specify a locality constraint for an input operator relative to its data source. @@ -274,18 +274,18 @@ public interface Context * If the processing mode for an operator is specified as EXACTLY_ONCE then the processing mode for all downstream operators * should be specified as AT_MOST_ONCE otherwise it will result in an error. */ - Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<Operator.ProcessingMode>(ProcessingMode.AT_LEAST_ONCE); + Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<>(ProcessingMode.AT_LEAST_ONCE); /** * Timeout to identify stalled processing, specified as count of streaming windows. If the last processed * window does not advance within the specified timeout count, the operator will be considered stuck and the * container restart. There are multiple reasons this could happen: clock drift, hardware issue, networking issue, * blocking operator logic, etc. */ - Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<Integer>(120); + Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<>(120); /** * Whether or not to auto record the tuples */ - Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false); + Attribute<Boolean> AUTO_RECORD = new Attribute<>(false); /** * How the operator distributes its state and share the input can be influenced by setting the Partitioner attribute. * If this attribute is set to non null value, the instance of the partitioner is used to partition and merge the @@ -348,7 +348,7 @@ public interface Context * Name under which the application will be shown in the resource manager. * If not set, the default is the configuration Java class or property file name. */ - Attribute<String> APPLICATION_NAME = new Attribute<String>("unknown-application-name"); + Attribute<String> APPLICATION_NAME = new Attribute<>("unknown-application-name"); /** * URL to the application's documentation. */ @@ -387,7 +387,7 @@ public interface Context /** * Dump extra debug information in launcher, master and containers. */ - Attribute<Boolean> DEBUG = new Attribute<Boolean>(false); + Attribute<Boolean> DEBUG = new Attribute<>(false); /** * The options to be pass to JVM when launching the containers. Options such as java maximum heap size can be specified here. */ @@ -396,20 +396,20 @@ public interface Context * The amount of memory to be requested for the application master. Not used in local mode. * Default value is 1GB. */ - Attribute<Integer> MASTER_MEMORY_MB = new Attribute<Integer>(1024); + Attribute<Integer> MASTER_MEMORY_MB = new Attribute<>(1024); /** * Where to spool the data once the buffer server capacity is reached. */ - Attribute<Boolean> BUFFER_SPOOLING = new Attribute<Boolean>(true); + Attribute<Boolean> BUFFER_SPOOLING = new Attribute<>(true); /** * The streaming window size to use for the application. It is specified in milliseconds. Default value is 500ms. */ - Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<Integer>(500); + Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<>(500); /** * The time interval for saving the operator state. It is specified as a multiple of streaming windows. The operator * state is saved periodically with interval equal to the checkpoint interval. Default value is 60 streaming windows. */ - Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(60); + Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(60); /** * The path to store application dependencies, recording and other generated files for application master and containers. */ @@ -418,13 +418,13 @@ public interface Context * The size limit for a file where tuple recordings are stored. When tuples are being recorded they are stored * in files. When a file size reaches this limit a new file is created and tuples start getting stored in the new file. Default value is 128k. */ - Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<Integer>(128 * 1024); + Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<>(128 * 1024); /** * The time limit for a file where tuple recordings are stored. When tuples are being recorded they are stored * in files. When a tuple recording file creation time falls beyond the time limit window from the current time a new file * is created and the tuples start getting stored in the new file. Default value is 30hrs. */ - Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<Integer>(30 * 60 * 60 * 1000); + Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<>(30 * 60 * 60 * 1000); /** * Address to which the application side connects to DT Gateway, in the form of host:port. This will override "dt.gateway.listenAddress" in the configuration. */ @@ -432,7 +432,7 @@ public interface Context /** * Whether or not gateway is expecting SSL connection. */ - Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<Boolean>(false); + Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<>(false); /** * The username for logging in to the gateway, if authentication is enabled. */ @@ -448,48 +448,48 @@ public interface Context /** * Maximum number of simultaneous heartbeat connections to process. Default value is 30. */ - Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<Integer>(30); + Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<>(30); /** * How frequently should operators heartbeat to stram. Recommended setting is * 1000ms. Value 0 will disable heartbeat (for unit testing). Default value is 1000ms. */ - Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<Integer>(1000); + Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<>(1000); /** * Timeout for master to identify a hung container (full GC etc.). Timeout will result in container restart. * Default value is 30s. */ - Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<Integer>(30 * 1000); + Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<>(30 * 1000); /** * Timeout for allocating container resources. Default value is 60s. */ - Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<Integer>(Integer.MAX_VALUE); + Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<>(Integer.MAX_VALUE); /** * Maximum number of windows that can be pending for statistics calculation. Statistics are computed when * the metrics are available from all operators for a window. If the information is not available from all operators then * the window is pending. When the number of pending windows reaches this limit the information for the oldest window * is purged. Default value is 1000 windows. */ - Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<Integer>(1000); + Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<>(1000); /** * Whether or not we record statistics. The statistics are recorded for each heartbeat if enabled. The default value is false. */ - Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<Boolean>(false); + Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<>(false); /** * The time interval for throughput calculation. The throughput is periodically calculated with interval greater than or * equal to the throughput calculation interval. The default value is 10s. */ - Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<Integer>(10000); + Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<>(10000); /** * The maximum number of samples to use when calculating throughput. In practice fewer samples may be used * if the THROUGHPUT_CALCULATION_INTERVAL is exceeded. Default value is 1000 samples. */ - Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<Integer>(1000); + Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<>(1000); /** * The number of samples to use when using RPC latency to compensate for clock skews and network latency when * calculating stats. Specify 0 if RPC latency should not be used at all to calculate stats. Default value is 100 * samples. */ - Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<Integer>(100); + Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<>(100); /** * The agent which can be used to find the jvm options for the container. */ @@ -511,12 +511,12 @@ public interface Context * blacklisting of nodes by application master * Blacklisting for nodes is disabled for the default value */ - Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<Integer>(Integer.MAX_VALUE); + Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<>(Integer.MAX_VALUE); /** * The amount of time to wait before removing failed nodes from blacklist */ - Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<Long>(new Long(60 * 60 * 1000)); + Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<>(new Long(60 * 60 * 1000)); /** * Affinity rules for specifying affinity and anti-affinity between logical operators http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/com/datatorrent/api/StringCodec.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/StringCodec.java b/api/src/main/java/com/datatorrent/api/StringCodec.java index d4a0a41..fa8ab23 100644 --- a/api/src/main/java/com/datatorrent/api/StringCodec.java +++ b/api/src/main/java/com/datatorrent/api/StringCodec.java @@ -302,7 +302,7 @@ public interface StringCodec<T> return clazz.getConstructor(String.class).newInstance(parts[1]); } else { T object = clazz.getConstructor(String.class).newInstance(parts[1]); - HashMap<String, String> hashMap = new HashMap<String, String>(); + HashMap<String, String> hashMap = new HashMap<>(); for (int i = 2; i < parts.length; i++) { String[] keyValPair = parts[i].split(propertySeparator, 2); hashMap.put(keyValPair[0], keyValPair[1]); @@ -365,11 +365,11 @@ public interface StringCodec<T> } if (string.isEmpty()) { - return new HashMap<K, V>(); + return new HashMap<>(); } String[] parts = string.split(separator); - HashMap<K, V> map = new HashMap<K, V>(); + HashMap<K, V> map = new HashMap<>(); for (String part : parts) { String[] kvpair = part.split(equal, 2); map.put(keyCodec.fromString(kvpair[0]), valueCodec.fromString(kvpair[1])); @@ -433,7 +433,7 @@ public interface StringCodec<T> } String[] parts = string.split(separator); - ArrayList<T> arrayList = new ArrayList<T>(parts.length); + ArrayList<T> arrayList = new ArrayList<>(parts.length); for (String part : parts) { arrayList.add(codec.fromString(part)); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java index 82cf50e..8ff0205 100644 --- a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java +++ b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java @@ -36,17 +36,17 @@ public abstract class YarnAppLauncher<H extends YarnAppLauncher.YarnAppHandle> e /** * Parameter to specify extra jars for launch. */ - public static final Attribute<String> LIB_JARS = new Attribute<String>(new StringCodec.String2String()); + public static final Attribute<String> LIB_JARS = new Attribute<>(new StringCodec.String2String()); /** * Parameter to specify the previous application id to use to resume launch from. */ - public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<String>(new StringCodec.String2String()); + public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<>(new StringCodec.String2String()); /** * Parameter to specify the queue name to use for launch. */ - public static final Attribute<String> QUEUE_NAME = new Attribute<String>(new StringCodec.String2String()); + public static final Attribute<String> QUEUE_NAME = new Attribute<>(new StringCodec.String2String()); static { Attribute.AttributeMap.AttributeInitializer.initialize(YarnAppLauncher.class); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/api/src/test/java/com/datatorrent/api/AttributeMapTest.java ---------------------------------------------------------------------- diff --git a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java index fcb1809..b463619 100644 --- a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java +++ b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java @@ -51,7 +51,7 @@ public class AttributeMapTest interface iface { - Attribute<Greeting> greeting = new Attribute<Greeting>(Greeting.hello); + Attribute<Greeting> greeting = new Attribute<>(Greeting.hello); } @Test http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 84999fa..d08b9fc 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -336,14 +336,14 @@ public class DataList { all_listeners.add(dl); //logger.debug("total {} listeners {} -> {}", all_listeners.size(), dl, this); - ArrayList<BitVector> partitions = new ArrayList<BitVector>(); + ArrayList<BitVector> partitions = new ArrayList<>(); if (dl.getPartitions(partitions) > 0) { for (BitVector partition : partitions) { HashSet<DataListener> set; if (listeners.containsKey(partition)) { set = listeners.get(partition); } else { - set = new HashSet<DataListener>(); + set = new HashSet<>(); listeners.put(partition, set); } set.add(dl); @@ -353,7 +353,7 @@ public class DataList if (listeners.containsKey(DataListener.NULL_PARTITION)) { set = listeners.get(DataListener.NULL_PARTITION); } else { - set = new HashSet<DataListener>(); + set = new HashSet<>(); listeners.put(DataListener.NULL_PARTITION, set); } @@ -363,7 +363,7 @@ public class DataList public void removeDataListener(DataListener dl) { - ArrayList<BitVector> partitions = new ArrayList<BitVector>(); + ArrayList<BitVector> partitions = new ArrayList<>(); if (dl.getPartitions(partitions) > 0) { for (BitVector partition : partitions) { if (listeners.containsKey(partition)) { @@ -459,7 +459,7 @@ public class DataList // When the number of subscribers becomes high or the number of blocks becomes high, consider optimize it. Block b = first; - Map<Block, Integer> indices = new HashMap<Block, Integer>(); + Map<Block, Integer> indices = new HashMap<>(); int i = 0; while (b != null) { indices.put(b, i++); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index 08a483a..b06e60a 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -71,8 +71,8 @@ public class LogicalNode implements DataListener this.identifier = identifier; this.upstream = upstream; this.group = group; - this.physicalNodes = new HashSet<PhysicalNode>(); - this.partitions = new HashSet<BitVector>(); + this.physicalNodes = new HashSet<>(); + this.partitions = new HashSet<>(); this.iterator = iterator; this.skipWindowId = skipWindowId; this.eventloop = eventloop; http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 7ac518b..8a56b51 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -260,7 +260,7 @@ public class Server extends AbstractServer } private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1); - private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>(); + private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>(); private final int blockSize; private final int numberOfCacheBlocks; @@ -883,7 +883,7 @@ public class Server extends AbstractServer } } - ArrayList<LogicalNode> list = new ArrayList<LogicalNode>(); + ArrayList<LogicalNode> list = new ArrayList<>(); String publisherIdentifier = datalist.getIdentifier(); Iterator<LogicalNode> iterator = subscriberGroups.values().iterator(); while (iterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java index 124cc5f..000ce00 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java @@ -31,7 +31,7 @@ import com.datatorrent.netlet.EventLoop; */ public class System { - private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<String, DefaultEventLoop>(); + private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<>(); public static void startup(String identifier) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java index f7b8829..371f98a 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java @@ -42,7 +42,7 @@ public class SubscribeRequestTupleTest String down_type = "SubscriberId/StreamType"; String upstream_id = "PublisherId"; int mask = 7; - ArrayList<Integer> partitions = new ArrayList<Integer>(); + ArrayList<Integer> partitions = new ArrayList<>(); partitions.add(5); long startingWindowId = 0xcafebabe00000078L; http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java index 3c0cb0e..d6e9b1a 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java @@ -32,7 +32,7 @@ import com.datatorrent.bufferserver.packet.Tuple; */ public class Subscriber extends com.datatorrent.bufferserver.client.Subscriber { - public final ArrayList<Object> resetPayloads = new ArrayList<Object>(); + public final ArrayList<Object> resetPayloads = new ArrayList<>(); public AtomicInteger tupleCount = new AtomicInteger(0); public WindowIdHolder firstPayload; public WindowIdHolder lastPayload; http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java index 165d8cf..d77b1ae 100644 --- a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java +++ b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java @@ -114,7 +114,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>, newPartitions = Lists.newArrayList(); for (int partitionCounter = 0; partitionCounter < newPartitionCount; partitionCounter++) { - newPartitions.add(new DefaultPartition<T>(partition.getPartitionedInstance())); + newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance())); } // partition the stream that was first connected in the DAG and send full data to remaining input ports @@ -156,8 +156,8 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>, */ public static <T extends Operator> Collection<Partition<T>> repartition(Collection<Partition<T>> partitions) { - List<Partition<T>> newPartitions = new ArrayList<Partition<T>>(); - HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<Integer, Partition<T>>(); + List<Partition<T>> newPartitions = new ArrayList<>(); + HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<>(); for (Partition<T> p: partitions) { int load = p.getLoad(); if (load < 0) { @@ -201,7 +201,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>, } for (int key: newKeys) { - Partition<T> newPartition = new DefaultPartition<T>(p.getPartitionedInstance()); + Partition<T> newPartition = new DefaultPartition<>(p.getPartitionedInstance()); newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key))); newPartitions.add(newPartition); } @@ -224,8 +224,8 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>, */ public static <T extends Operator> Collection<Partition<T>> repartitionInputOperator(Collection<Partition<T>> partitions) { - List<Partition<T>> newPartitions = new ArrayList<Partition<T>>(); - List<Partition<T>> lowLoadPartitions = new ArrayList<Partition<T>>(); + List<Partition<T>> newPartitions = new ArrayList<>(); + List<Partition<T>> lowLoadPartitions = new ArrayList<>(); for (Partition<T> p: partitions) { int load = p.getLoad(); if (load < 0) { @@ -235,8 +235,8 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>, lowLoadPartitions.add(p); } } else if (load > 0) { - newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance())); - newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance())); + newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance())); + newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance())); } else { newPartitions.add(p); } @@ -274,7 +274,7 @@ public class StatelessPartitioner<T extends Operator> implements Partitioner<T>, T anOperator = newPartitions.iterator().next().getPartitionedInstance(); while (morePartitionsToCreate-- > 0) { - DefaultPartition<T> partition = new DefaultPartition<T>(anOperator); + DefaultPartition<T> partition = new DefaultPartition<>(anOperator); newPartitions.add(partition); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/security/SecurityContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java index dccd7b7..3dc4dda 100644 --- a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java +++ b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java @@ -32,17 +32,17 @@ public interface SecurityContext extends Context /** * Attribute for the user name for login. */ - Attribute<String> USER_NAME = new Attribute<String>((String)null); + Attribute<String> USER_NAME = new Attribute<>((String)null); /** * Attribute for the password for login. */ - Attribute<char[]> PASSWORD = new Attribute<char[]>((char[])null); + Attribute<char[]> PASSWORD = new Attribute<>((char[])null); /** * Attribute for the realm for login. */ - Attribute<String> REALM = new Attribute<String>((String)null); + Attribute<String> REALM = new Attribute<>((String)null); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java index ca7490d..f90a888 100644 --- a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java +++ b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java @@ -48,7 +48,7 @@ public class DefaultDelayOperator<T> extends BaseOperator implements Operator.De } }; - public transient DefaultOutputPort<T> output = new DefaultOutputPort<T>(); + public transient DefaultOutputPort<T> output = new DefaultOutputPort<>(); protected List<T> lastWindowTuples = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java index 7723fed..ef837a8 100644 --- a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java +++ b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java @@ -55,9 +55,9 @@ public class JacksonObjectMapperProvider implements ContextResolver<ObjectMapper this.objectMapper = new ObjectMapper(); objectMapper.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, true); objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); - module.addSerializer(ObjectMapperString.class, new RawSerializer<Object>(Object.class)); - module.addSerializer(JSONObject.class, new RawSerializer<Object>(Object.class)); - module.addSerializer(JSONArray.class, new RawSerializer<Object>(Object.class)); + module.addSerializer(ObjectMapperString.class, new RawSerializer<>(Object.class)); + module.addSerializer(JSONObject.class, new RawSerializer<>(Object.class)); + module.addSerializer(JSONArray.class, new RawSerializer<>(Object.class)); objectMapper.registerModule(module); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java index 63d1646..af5e10e 100644 --- a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java +++ b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java @@ -53,7 +53,7 @@ public class PubSubMessageCodec<T> */ public static <T> String constructPublishMessage(String topic, T data, PubSubMessageCodec<T> codec) throws IOException { - PubSubMessage<T> pubSubMessage = new PubSubMessage<T>(); + PubSubMessage<T> pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.PUBLISH); pubSubMessage.setTopic(topic); pubSubMessage.setData(data); @@ -72,7 +72,7 @@ public class PubSubMessageCodec<T> */ public static <T> String constructSubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException { - PubSubMessage<T> pubSubMessage = new PubSubMessage<T>(); + PubSubMessage<T> pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.SUBSCRIBE); pubSubMessage.setTopic(topic); @@ -90,7 +90,7 @@ public class PubSubMessageCodec<T> */ public static <T> String constructUnsubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException { - PubSubMessage<T> pubSubMessage = new PubSubMessage<T>(); + PubSubMessage<T> pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE); pubSubMessage.setTopic(topic); @@ -108,7 +108,7 @@ public class PubSubMessageCodec<T> */ public static <T> String constructSubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException { - PubSubMessage<T> pubSubMessage = new PubSubMessage<T>(); + PubSubMessage<T> pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS); pubSubMessage.setTopic(topic); @@ -126,7 +126,7 @@ public class PubSubMessageCodec<T> */ public static <T> String constructUnsubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException { - PubSubMessage<T> pubSubMessage = new PubSubMessage<T>(); + PubSubMessage<T> pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS); pubSubMessage.setTopic(topic); @@ -135,7 +135,7 @@ public class PubSubMessageCodec<T> public String formatMessage(PubSubMessage<T> pubSubMessage) throws IOException { - HashMap<String, Object> map = new HashMap<String, Object>(); + HashMap<String, Object> map = new HashMap<>(); map.put(PubSubMessage.TYPE_KEY, pubSubMessage.getType().getIdentifier()); map.put(PubSubMessage.TOPIC_KEY, pubSubMessage.getTopic()); T data = pubSubMessage.getData(); @@ -156,7 +156,7 @@ public class PubSubMessageCodec<T> public PubSubMessage<T> parseMessage(String message) throws IOException { HashMap<String, Object> map = mapper.readValue(message, HashMap.class); - PubSubMessage<T> pubSubMessage = new PubSubMessage<T>(); + PubSubMessage<T> pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.getPubSubMessageType((String)map.get(PubSubMessage.TYPE_KEY))); pubSubMessage.setTopic((String)map.get(PubSubMessage.TOPIC_KEY)); pubSubMessage.setData((T)map.get(PubSubMessage.DATA_KEY)); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java index e7c4887..2e48f54 100644 --- a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java +++ b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java @@ -41,7 +41,7 @@ public class StatelessPartitionerTest public static class DummyOperator implements Operator { - public final DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>(); + public final DefaultOutputPort<Integer> output = new DefaultOutputPort<>(); private Integer value; @@ -93,10 +93,10 @@ public class StatelessPartitionerTest public void partition1Test() { DummyOperator dummyOperator = new DummyOperator(5); - StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>(); + StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>(); Collection<Partition<DummyOperator>> partitions = Lists.newArrayList(); - DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator); + DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator); partitions.add(defaultPartition); Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0)); @@ -111,10 +111,10 @@ public class StatelessPartitionerTest public void partition5Test() { DummyOperator dummyOperator = new DummyOperator(5); - StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>(5); + StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>(5); Collection<Partition<DummyOperator>> partitions = Lists.newArrayList(); - DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator); + DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator); partitions.add(defaultPartition); Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0)); @@ -137,10 +137,10 @@ public class StatelessPartitionerTest public void testParallelPartitionScaleUP() { DummyOperator dummyOperator = new DummyOperator(5); - StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>(); + StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>(); Collection<Partition<DummyOperator>> partitions = Lists.newArrayList(); - partitions.add(new DefaultPartition<DummyOperator>(dummyOperator)); + partitions.add(new DefaultPartition<>(dummyOperator)); Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 5)); @@ -151,12 +151,12 @@ public class StatelessPartitionerTest public void testParallelPartitionScaleDown() { DummyOperator dummyOperator = new DummyOperator(5); - StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>(); + StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>(); Collection<Partition<DummyOperator>> partitions = Lists.newArrayList(); for (int i = 5; i-- > 0; ) { - partitions.add(new DefaultPartition<DummyOperator>(dummyOperator)); + partitions.add(new DefaultPartition<>(dummyOperator)); } Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java index 97debe3..79fdac7 100644 --- a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java +++ b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java @@ -50,7 +50,7 @@ public class SerializableObjectTest } }; - public final transient OutputPort<T> output = new DefaultOutputPort<T>(); + public final transient OutputPort<T> output = new DefaultOutputPort<>(); private int i; public void setI(int i) @@ -109,7 +109,7 @@ public class SerializableObjectTest @Test public void testReadResolve() throws Exception { - SerializableOperator<Object> pre = new SerializableOperator<Object>(); + SerializableOperator<Object> pre = new SerializableOperator<>(); pre.setI(10); FileOutputStream fos = new FileOutputStream(filename); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java index 53d91a5..80314c7 100644 --- a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java @@ -73,7 +73,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler for (ContainerRequest cr : requests) { ContainerStartRequest csr = hostSpecificRequests.get(cr); ContainerRequest newCr = new ContainerRequest(cr.getCapability(), null, null, cr.getPriority()); - MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, newCr); + MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, newCr); requestedResources.put(csr, pair); containerRequests.add(newCr); hostSpecificRequests.remove(cr); @@ -91,7 +91,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler for (Entry<ContainerRequest, ContainerStartRequest> entry : otherContainerRequests.entrySet()) { ContainerRequest cr = entry.getKey(); ContainerStartRequest csr = entry.getValue(); - MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr); + MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr); requestedResources.put(csr, pair); containerRequests.add(cr); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java index e7f9672..45206bc 100644 --- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java @@ -102,7 +102,7 @@ public class ResourceRequestHandler */ public void addContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, List<ContainerRequest> containerRequests, StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr) { - MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr); + MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr); requestedResources.put(csr, pair); containerRequests.add(cr); } @@ -164,7 +164,7 @@ public class ResourceRequestHandler public List<String> getNodesExceptHost(List<String> hostNames) { - List<String> nodesList = new ArrayList<String>(); + List<String> nodesList = new ArrayList<>(); Set<String> hostNameSet = Sets.newHashSet(); hostNameSet.addAll(hostNames); for (String host : nodeReportMap.keySet()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java index 3a61ee6..b374447 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java +++ b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java @@ -32,10 +32,10 @@ import org.codehaus.jettison.json.JSONObject; public class PermissionsInfo { - private final Set<String> readOnlyRoles = new TreeSet<String>(); - private final Set<String> readOnlyUsers = new TreeSet<String>(); - private final Set<String> readWriteRoles = new TreeSet<String>(); - private final Set<String> readWriteUsers = new TreeSet<String>(); + private final Set<String> readOnlyRoles = new TreeSet<>(); + private final Set<String> readOnlyUsers = new TreeSet<>(); + private final Set<String> readWriteRoles = new TreeSet<>(); + private final Set<String> readWriteUsers = new TreeSet<>(); private boolean readOnlyEveryone = false; private boolean readWriteEveryone = false; http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java index 050729d..8076d4a 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java @@ -271,7 +271,7 @@ public class StramClientUtils } Text rmTokenService = new Text(Joiner.on(',').join(services)); - return new Token<RMDelegationTokenIdentifier>( + return new Token<>( rmDelegationToken.getIdentifier().array(), rmDelegationToken.getPassword().array(), new Text(rmDelegationToken.getKind()), http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java index 7113280..284aefb 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java @@ -50,8 +50,8 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont private final int id; private final String name; // the size of the circular queue should be configurable. hardcoded to 1024 for now. - private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<ContainerStats.OperatorStats>(1024); - private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<OperatorRequest>(1024); + private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<>(1024); + private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<>(1024); public final boolean stateless; private int windowsFromCheckpoint; http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 401eea9..62c4fd8 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -760,7 +760,7 @@ public class LogicalPlan implements Serializable, DAG codecs.put(sinkToPersistPortMeta, inputStreamCodec); InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port); StreamCodec<Object> specifiedCodecForPersistOperator = (StreamCodec<Object>)persistOperatorPortMeta.getStreamCodec(); - StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator); + StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<>(codecs, specifiedCodecForPersistOperator); persistOperatorPortMeta.setStreamCodec(codec); } } @@ -1907,14 +1907,14 @@ public class LogicalPlan implements Serializable, DAG HashMap<OperatorPair, AffinityRule> antiAffinities = new HashMap<>(); HashMap<OperatorPair, AffinityRule> threadLocalAffinities = new HashMap<>(); - List<String> operatorNames = new ArrayList<String>(); + List<String> operatorNames = new ArrayList<>(); for (OperatorMeta operator : getAllOperators()) { operatorNames.add(operator.getName()); - Set<String> containerSet = new HashSet<String>(); + Set<String> containerSet = new HashSet<>(); containerSet.add(operator.getName()); containerAffinities.put(operator.getName(), containerSet); - Set<String> nodeSet = new HashSet<String>(); + Set<String> nodeSet = new HashSet<>(); nodeSet.add(operator.getName()); nodeAffinities.put(operator.getName(), nodeSet); @@ -2073,7 +2073,7 @@ public class LogicalPlan implements Serializable, DAG */ public void convertRegexToList(List<String> operatorNames, AffinityRule rule) { - List<String> operators = new LinkedList<String>(); + List<String> operators = new LinkedList<>(); Pattern p = Pattern.compile(rule.getOperatorRegex()); for (String name : operatorNames) { if (p.matcher(name).matches()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index ce22bfd..a1da94a 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -477,13 +477,13 @@ public class PhysicalPlan implements Serializable // Log container anti-affinity if (LOG.isDebugEnabled()) { for (PTContainer container : containers) { - List<String> antiOperators = new ArrayList<String>(); + List<String> antiOperators = new ArrayList<>(); for (PTContainer c : container.getStrictAntiPrefs()) { for (PTOperator operator : c.getOperators()) { antiOperators.add(operator.getName()); } } - List<String> containerOperators = new ArrayList<String>(); + List<String> containerOperators = new ArrayList<>(); for (PTOperator operator : container.getOperators()) { containerOperators.add(operator.getName()); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java index 38c0f41..47986cb 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java +++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java @@ -116,10 +116,10 @@ public abstract class PubSubWebSocketClient implements Component<Context> */ public PubSubWebSocketClient() { - throwable = new AtomicReference<Throwable>(); + throwable = new AtomicReference<>(); ioThreadMultiplier = 1; mapper = (new JacksonObjectMapperProvider()).getContext(null); - codec = new PubSubMessageCodec<Object>(mapper); + codec = new PubSubMessageCodec<>(mapper); AsyncHttpClientConfigBean config = new AsyncHttpClientConfigBean(); config.setIoThreadMultiplier(ioThreadMultiplier); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java index dd75857..9fbb54d 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java @@ -148,7 +148,7 @@ public class CompactUtil List<CompactAnnotationNode> annotations = new LinkedList<>(); for (Object visibleAnnotation : fn.visibleAnnotations) { CompactAnnotationNode node = new CompactAnnotationNode(); - Map<String, Object> annotationMap = new HashMap<String, Object>(); + Map<String, Object> annotationMap = new HashMap<>(); if (visibleAnnotation instanceof AnnotationNode) { AnnotationNode annotation = (AnnotationNode)visibleAnnotation; if (annotation.desc.contains("InputPortFieldAnnotation") http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java index 1e87b31..91a3cf3 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java @@ -87,7 +87,7 @@ public interface Type char boundChar; - ArrayList<Type> bounds = new ArrayList<Type>(); + ArrayList<Type> bounds = new ArrayList<>(); public Type[] getUpperBounds() { @@ -154,7 +154,7 @@ public interface Type class ParameterizedTypeNode extends TypeNode { - ArrayList<Type> actualTypeArguments = new ArrayList<Type>(); + ArrayList<Type> actualTypeArguments = new ArrayList<>(); public Type[] getActualTypeArguments() { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java index d7f96d4..0c997ec 100644 --- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java @@ -78,7 +78,7 @@ public class CheckpointTest private static class MockInputOperator extends BaseOperator implements InputOperator, Operator.CheckpointNotificationListener { @OutputPortFieldAnnotation( optional = true) - public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>(); private transient int windowCount; private int checkpointState; @@ -326,7 +326,7 @@ public class CheckpointTest public List<Checkpoint> getCheckpoints(Long... windowIds) { - List<Checkpoint> list = new ArrayList<Checkpoint>(windowIds.length); + List<Checkpoint> list = new ArrayList<>(windowIds.length); for (Long windowId : windowIds) { list.add(new Checkpoint(windowId, 0, 0)); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java index b1509e0..b1f3363 100644 --- a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java @@ -41,7 +41,7 @@ public class GenericOperatorPropertyCodecTest public void testGenericOperatorPropertyCodec() { LogicalPlan dag = new LogicalPlan(); - Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<Class<?>, Class<? extends StringCodec<?>>>(); + Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<>(); codecs.put(GenericOperatorProperty.class, GenericOperatorProperty.GenericOperatorPropertyStringCodec.class); dag.setAttribute(com.datatorrent.api.Context.DAGContext.STRING_CODECS, codecs); dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java index ecbeeb6..f0199f9 100644 --- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java +++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java @@ -86,7 +86,7 @@ public class PartitioningTest /* * Received tuples are stored in a map keyed with the system assigned operator id. */ - public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<String, List<Object>>(); + public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<>(); private transient int operatorId; public String prefix = ""; @@ -107,7 +107,7 @@ public class PartitioningTest synchronized (receivedTuples) { List<Object> l = receivedTuples.get(id); if (l == null) { - l = Collections.synchronizedList(new ArrayList<Object>()); + l = Collections.synchronizedList(new ArrayList<>()); //LOG.debug("adding {} {}", id, l); receivedTuples.put(id, l); } @@ -121,12 +121,12 @@ public class PartitioningTest }; @OutputPortFieldAnnotation( optional = true) - public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>(); } public static class TestInputOperator<T> extends BaseOperator implements InputOperator { - public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>(); + public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>(); transient boolean first; transient long windowId; boolean blockEndStream = false; @@ -178,9 +178,9 @@ public class PartitioningTest CollectorOperator.receivedTuples.clear(); TestInputOperator<Integer> input = dag.addOperator("input", new TestInputOperator<Integer>()); - input.testTuples = new ArrayList<List<Integer>>(); + input.testTuples = new ArrayList<>(); for (Integer[] tuples: testData) { - input.testTuples.add(new ArrayList<Integer>(Arrays.asList(tuples))); + input.testTuples.add(new ArrayList<>(Arrays.asList(tuples))); } CollectorOperator collector = dag.addOperator("collector", new CollectorOperator()); collector.prefix = "" + System.identityHashCode(collector); @@ -234,7 +234,7 @@ public class PartitioningTest { Map<Integer, Integer> m = loadIndicators.get(); if (m == null) { - loadIndicators.set(m = new ConcurrentHashMap<Integer, Integer>()); + loadIndicators.set(m = new ConcurrentHashMap<>()); } m.put(oper.getId(), load); } @@ -341,7 +341,7 @@ public class PartitioningTest Assert.assertNotNull("" + nodeMap, inputDeployed); // add tuple that matches the partition key and check that each partition receives it - ArrayList<Integer> inputTuples = new ArrayList<Integer>(); + ArrayList<Integer> inputTuples = new ArrayList<>(); LOG.debug("Number of partitions {}", partitions.size()); for (PTOperator p: partitions) { // default partitioning has one port mapping with a single partition key @@ -391,7 +391,7 @@ public class PartitioningTest @Override public Collection<Partition<PartitionableInputOperator>> definePartitions(Collection<Partition<PartitionableInputOperator>> partitions, PartitioningContext context) { - List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<Partition<PartitionableInputOperator>>(3); + List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<>(3); Iterator<? extends Partition<PartitionableInputOperator>> iterator = partitions.iterator(); Partition<PartitionableInputOperator> templatePartition; for (int i = 0; i < 3; i++) { @@ -401,7 +401,7 @@ public class PartitioningTest op.partitionProperty = templatePartition.getPartitionedInstance().partitionProperty; } op.partitionProperty += "_" + i; - newPartitions.add(new DefaultPartition<PartitionableInputOperator>(op)); + newPartitions.add(new DefaultPartition<>(op)); } return newPartitions; } @@ -431,7 +431,7 @@ public class PartitioningTest lc.runAsync(); List<PTOperator> partitions = assertNumberPartitions(3, lc, dag.getMeta(input)); - Set<String> partProperties = new HashSet<String>(); + Set<String> partProperties = new HashSet<>(); for (PTOperator p : partitions) { LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p); Map<Integer, Node<?>> nodeMap = c.getNodes(); @@ -460,7 +460,7 @@ public class PartitioningTest PartitionLoadWatch.remove(partitions.get(0)); partitions = assertNumberPartitions(3, lc, dag.getMeta(input)); - partProperties = new HashSet<String>(); + partProperties = new HashSet<>(); for (PTOperator p: partitions) { LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p); Map<Integer, Node<?>> nodeMap = c.getNodes(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java index 35bb363..cee8247 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java @@ -1007,7 +1007,7 @@ public class StreamCodecTest lastId = assignNewContainers(dnm, lastId); List<PTOperator> operators = plan.getOperators(n2meta); - List<PTOperator> upstreamOperators = new ArrayList<PTOperator>(); + List<PTOperator> upstreamOperators = new ArrayList<>(); for (PTOperator operator : operators) { upstreamOperators.addAll(operator.upstreamMerge.values()); /* @@ -1036,7 +1036,7 @@ public class StreamCodecTest lastId = assignNewContainers(dnm, lastId); List<PTOperator> operators = plan.getOperators(n3meta); - List<PTOperator> upstreamOperators = new ArrayList<PTOperator>(); + List<PTOperator> upstreamOperators = new ArrayList<>(); for (PTOperator operator : operators) { upstreamOperators.addAll(operator.upstreamMerge.values()); } @@ -1063,7 +1063,7 @@ public class StreamCodecTest lastId = assignNewContainers(dnm, lastId); List<PTOperator> operators = plan.getOperators(n2meta); - List<PTOperator> upstreamOperators = new ArrayList<PTOperator>(); + List<PTOperator> upstreamOperators = new ArrayList<>(); for (PTOperator operator : operators) { upstreamOperators.addAll(operator.upstreamMerge.values()); /* @@ -1144,7 +1144,7 @@ public class StreamCodecTest private Set<PTOperator> getUnifiers(PhysicalPlan plan) { - Set<PTOperator> unifiers = new HashSet<PTOperator>(); + Set<PTOperator> unifiers = new HashSet<>(); for (PTContainer container : plan.getContainers()) { for (PTOperator operator : container.getOperators()) { if (operator.isUnifier()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index c606f47..cb2d760 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -124,14 +124,14 @@ public class StreamingContainerManagerTest input.portName = "inputPortNameOnNode"; input.sourceNodeId = 99; - ndi.inputs = new ArrayList<OperatorDeployInfo.InputDeployInfo>(); + ndi.inputs = new ArrayList<>(); ndi.inputs.add(input); OperatorDeployInfo.OutputDeployInfo output = new OperatorDeployInfo.OutputDeployInfo(); output.declaredStreamId = "streamFromNode"; output.portName = "outputPortNameOnNode"; - ndi.outputs = new ArrayList<OperatorDeployInfo.OutputDeployInfo>(); + ndi.outputs = new ArrayList<>(); ndi.outputs.add(output); ContainerHeartbeatResponse scc = new ContainerHeartbeatResponse(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java index f6b7277..59f9dcc 100644 --- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java +++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java @@ -34,7 +34,7 @@ public class ApexCliMiscTest { ApexCli cli; - static Map<String, String> env = new HashMap<String, String>(); + static Map<String, String> env = new HashMap<>(); static String userHome; @Before http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java index 2ac1c50..f1356df 100644 --- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java +++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java @@ -59,7 +59,7 @@ public class ApexCliTest static TemporaryFolder testFolder = new TemporaryFolder(); ApexCli cli; - static Map<String, String> env = new HashMap<String, String>(); + static Map<String, String> env = new HashMap<>(); static String userHome; @BeforeClass http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java index d1a18ae..26aced8 100644 --- a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java @@ -112,8 +112,8 @@ public class DefaultStatefulStreamCodecTest @Test public void testString() { - StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<Object>(); - StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<Object>(); + StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<>(); + StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<>(); String hello = "hello"; @@ -182,7 +182,7 @@ public class DefaultStatefulStreamCodecTest public void testFinalFieldSerialization() throws Exception { TestTuple t1 = new TestTuple(5); - DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>(); + DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>(); DataStatePair dsp = c.toDataStatePair(t1); TestTuple t2 = (TestTuple)c.fromDataStatePair(dsp); Assert.assertEquals("", t1.finalField, t2.finalField); @@ -208,7 +208,7 @@ public class DefaultStatefulStreamCodecTest Object inner = outer.new InnerClass(); for (Object o: new Object[] {outer, inner}) { - DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>(); + DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>(); DataStatePair dsp = c.toDataStatePair(o); c.fromDataStatePair(dsp); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java index cc777f7..64298de 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java @@ -77,7 +77,7 @@ public class AtMostOnceTest extends ProcessingModeTests @Override public void testNonLinearOperatorRecovery() throws InterruptedException { - final HashSet<Object> collection = new HashSet<Object>(); + final HashSet<Object> collection = new HashSet<>(); com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0); http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index da5c7b7..66f1b84 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -197,7 +197,7 @@ public class GenericNodeTest }; @OutputPortFieldAnnotation( optional = true) - DefaultOutputPort<Object> op = new DefaultOutputPort<Object>(); + DefaultOutputPort<Object> op = new DefaultOutputPort<>(); @Override public void beginWindow(long windowId) @@ -226,7 +226,7 @@ public class GenericNodeTest public static class CheckpointDistanceOperator extends GenericOperator { - List<Integer> distances = new ArrayList<Integer>(); + List<Integer> distances = new ArrayList<>(); int numWindows = 0; int maxWindows = 0; @@ -245,7 +245,7 @@ public class GenericNodeTest public void testSynchingLogic() throws InterruptedException { long sleeptime = 25L; - final ArrayList<Object> list = new ArrayList<Object>(); + final ArrayList<Object> list = new ArrayList<>(); GenericOperator go = new GenericOperator(); final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator", new DefaultAttributeMap(), null)); @@ -376,8 +376,8 @@ public class GenericNodeTest final Server bufferServer = new Server(eventloop, 0); // find random port final int bufferServerPort = bufferServer.run().getPort(); - final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>(); - final BlockingQueue<Object> tuples = new ArrayBlockingQueue<Object>(10); + final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>(); + final BlockingQueue<Object> tuples = new ArrayBlockingQueue<>(10); GenericTestOperator go = new GenericTestOperator(); final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator", @@ -905,7 +905,7 @@ public class GenericNodeTest CheckpointDistanceOperator go = new CheckpointDistanceOperator(); go.maxWindows = maxWindows; - List<Integer> checkpoints = new ArrayList<Integer>(); + List<Integer> checkpoints = new ArrayList<>(); int window = 0; while (window < maxWindows) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java index 84217eb..bb2e72f 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java @@ -51,10 +51,10 @@ public class InputOperatorTest public static class EvenOddIntegerGeneratorInputOperator implements InputOperator, com.datatorrent.api.Operator.ActivationListener<OperatorContext> { - public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<Integer>(); - public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<Integer>(); - private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<Integer>(1024); - private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<Integer>(1024); + public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<>(); + public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<>(); + private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<>(1024); + private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<>(1024); private volatile Thread dataGeneratorThread; @Override @@ -179,7 +179,7 @@ public class InputOperatorTest public void setConnected(boolean flag) { if (flag) { - collections.put(id, list = new ArrayList<T>()); + collections.put(id, list = new ArrayList<>()); } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/aa81bea3/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java index 55b5eab..f669832 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java @@ -171,7 +171,7 @@ public class NodeTest } - static final ArrayList<Call> calls = new ArrayList<Call>(); + static final ArrayList<Call> calls = new ArrayList<>(); @Override public void save(Object object, int operatorId, long windowId) throws IOException
