Fix rawtype warnings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d0b4bdb2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d0b4bdb2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d0b4bdb2 Branch: refs/heads/master Commit: d0b4bdb24e77a08181e07f13d7c5794d39c6203c Parents: c284c39 Author: Thomas Weise <[email protected]> Authored: Wed Jan 20 10:57:54 2016 -0800 Committer: Thomas Weise <[email protected]> Committed: Wed Jan 20 10:59:36 2016 -0800 ---------------------------------------------------------------------- .../stram/plan/physical/PhysicalPlan.java | 24 +++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d0b4bdb2/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index 7858ea0..829a6fd 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -206,10 +206,11 @@ public class PhysicalPlan implements Serializable p.statsListeners = this.statsHandlers; } + /** + * Return all partitions and unifiers, except MxN unifiers + * @return + */ private Collection<PTOperator> getAllOperators() { -// if (partitions.size() == 1) { -// return Collections.singletonList(partitions.get(0)); -// } Collection<PTOperator> c = new ArrayList<PTOperator>(partitions.size() + 1); c.addAll(partitions); for (StreamMapping ug : outputStreams.values()) { @@ -391,7 +392,7 @@ public class PhysicalPlan implements Serializable for (StreamMeta s : n.getOutputStreams().values()) { if (s.getPersistOperator() != null) { InputPortMeta persistInputPort = s.getPersistOperatorInputPort(); - StreamCodecWrapperForPersistance persistCodec = (StreamCodecWrapperForPersistance) persistInputPort.getAttributes().get(PortContext.STREAM_CODEC); + StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>) persistInputPort.getAttributes().get(PortContext.STREAM_CODEC); if (persistCodec == null) continue; // Logging is enabled for the stream @@ -403,10 +404,10 @@ public class PhysicalPlan implements Serializable // Check partitioning for persist operators per sink too for (Entry<InputPortMeta, InputPortMeta> entry : s.sinkSpecificPersistInputPortMap.entrySet()) { InputPortMeta persistInputPort = entry.getValue(); - StreamCodec codec = persistInputPort.getAttributes().get(PortContext.STREAM_CODEC); + StreamCodec<?> codec = persistInputPort.getAttributes().get(PortContext.STREAM_CODEC); if (codec != null) { if (codec instanceof StreamCodecWrapperForPersistance) { - StreamCodecWrapperForPersistance persistCodec = (StreamCodecWrapperForPersistance) codec; + StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>) codec; updatePersistOperatorWithSinkPartitions(persistInputPort, s.sinkSpecificPersistOperatorMap.get(entry.getKey()), persistCodec, entry.getKey()); } } @@ -418,7 +419,7 @@ public class PhysicalPlan implements Serializable } } - private void updatePersistOperatorWithSinkPartitions(InputPortMeta persistInputPort, OperatorMeta persistOperatorMeta, StreamCodecWrapperForPersistance persistCodec, InputPortMeta sinkPortMeta) + private void updatePersistOperatorWithSinkPartitions(InputPortMeta persistInputPort, OperatorMeta persistOperatorMeta, StreamCodecWrapperForPersistance<?> persistCodec, InputPortMeta sinkPortMeta) { Collection<PTOperator> ptOperators = getOperators(sinkPortMeta.getOperatorWrapper()); Collection<PartitionKeys> partitionKeysList = new ArrayList<PartitionKeys>(); @@ -437,7 +438,7 @@ public class PhysicalPlan implements Serializable for (OperatorMeta n : dag.getAllOperators()) { for (StreamMeta s : n.getOutputStreams().values()) { if (s.getPersistOperator() != null) { - Map<InputPortMeta, StreamCodec<Object>> inputStreamCodecs = new HashMap<InputPortMeta, StreamCodec<Object>>(); + Map<InputPortMeta, StreamCodec<?>> inputStreamCodecs = new HashMap<>(); // Logging is enabled for the stream for (InputPortMeta portMeta : s.getSinksToPersist()) { InputPort<?> port = portMeta.getPortObject(); @@ -452,7 +453,7 @@ public class PhysicalPlan implements Serializable } } if (!alreadyAdded) { - inputStreamCodecs.put(portMeta, (StreamCodec<Object>) inputStreamCodec); + inputStreamCodecs.put(portMeta, inputStreamCodec); } } } @@ -466,8 +467,9 @@ public class PhysicalPlan implements Serializable // Create Wrapper codec for Stream persistence using all unique // stream codecs // Logger should write merged or union of all input stream codecs - StreamCodec<Object> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? (StreamCodec<Object>) s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) s.getPersistOperatorInputPort().getPortObject().getStreamCodec(); - StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(inputStreamCodecs, specifiedCodecForLogger); + StreamCodec<?> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : s.getPersistOperatorInputPort().getPortObject().getStreamCodec(); + @SuppressWarnings({ "unchecked", "rawtypes" }) + StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance(inputStreamCodecs, specifiedCodecForLogger); streamMetaToCodecMap.put(s, codec); } }
