Repository: apex-core Updated Branches: refs/heads/release-3.3 8655ffabe -> 232eba368
APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec() Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/232eba36 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/232eba36 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/232eba36 Branch: refs/heads/release-3.3 Commit: 232eba3681f2a7bf43a7967bacd33f5f1f35b98f Parents: 8655ffa Author: Vlad Rozov <[email protected]> Authored: Wed Feb 1 08:03:45 2017 -0800 Committer: Vlad Rozov <[email protected]> Committed: Wed Feb 1 08:03:45 2017 -0800 ---------------------------------------------------------------------- .../stram/StreamingContainerAgent.java | 39 +++++++++----------- .../stram/StreamingContainerManager.java | 4 +- .../stram/plan/logical/LogicalPlan.java | 30 +++++++++++++-- .../stram/plan/physical/StreamMapping.java | 7 ++-- .../com/datatorrent/stram/StreamCodecTest.java | 2 +- 5 files changed, 49 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/232eba36/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java index 81dc96e..a5fc3a5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java @@ -185,7 +185,11 @@ public class StreamingContainerAgent { if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { // input attributes of the downstream operator for (InputPortMeta sink : streamMeta.getSinks()) { - portInfo.contextAttributes = sink.getAttributes(); + try { + portInfo.contextAttributes = sink.getAttributes().clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); + } break; } } @@ -199,7 +203,7 @@ public class StreamingContainerAgent { // Create mappings for all non-inline operators if (input.target.getContainer() != out.source.getContainer()) { InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input); - StreamCodec<?> streamCodecInfo = getStreamCodec(inputPortMeta); + StreamCodec<?> streamCodecInfo = inputPortMeta.getStreamCodec(); Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo); if (!portInfo.streamCodecs.containsKey(id)) { portInfo.streamCodecs.put(id, streamCodecInfo); @@ -231,11 +235,19 @@ public class StreamingContainerAgent { InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta); if (inputPortMeta != null) { - inputInfo.contextAttributes = inputPortMeta.getAttributes(); + try { + inputInfo.contextAttributes = inputPortMeta.getAttributes().clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); + } } if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { - inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes(); + try { + inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); + } } inputInfo.sourceNodeId = sourceOutput.source.getId(); @@ -272,7 +284,7 @@ public class StreamingContainerAgent { // On the input side there is a unlikely scenario of partitions even for inline stream that is being // handled. Always specifying a stream codec configuration in case that scenario happens. InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in); - StreamCodec<?> streamCodecInfo = getStreamCodec(idInputPortMeta); + StreamCodec<?> streamCodecInfo = idInputPortMeta.getStreamCodec(); Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo); inputInfo.streamCodecs.put(id, streamCodecInfo); ndi.inputs.add(inputInfo); @@ -327,23 +339,6 @@ public class StreamingContainerAgent { return operator; } - public static StreamCodec<?> getStreamCodec(InputPortMeta inputPortMeta) - { - if (inputPortMeta != null) { - StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC); - if (codec == null) { - // it cannot be this object that gets returned. Depending on this value is dangerous - codec = inputPortMeta.getPortObject().getStreamCodec(); - if (codec != null) { - // don't create codec multiple times - it will assign a new identifier - inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec); - } - } - return codec; - } - return null; - } - /** * Create deploy info for operator. * <p> http://git-wip-us.apache.org/repos/asf/apex-core/blob/232eba36/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index aa79243..2fe8107 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -2146,7 +2146,7 @@ public class StreamingContainerManager implements PlanContext } for (InputPortMeta ipm : out.logicalStream.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); + StreamCodec<?> streamCodecInfo = ipm.getStreamCodec(); Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo); // following needs to match the concat logic in StreamingContainer String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString()); @@ -2243,7 +2243,7 @@ public class StreamingContainerManager implements PlanContext for (PTOperator.PTOutput out : operator.getOutputs()) { if (!out.isDownStreamInline()) { for (InputPortMeta ipm : out.logicalStream.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); + StreamCodec<?> streamCodecInfo = ipm.getStreamCodec(); Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo); // following needs to match the concat logic in StreamingContainer String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString()); http://git-wip-us.apache.org/repos/asf/apex-core/blob/232eba36/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 3825505..01d9a1c 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -61,6 +61,8 @@ import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.stram.engine.DefaultUnifier; import com.datatorrent.stram.engine.Slider; +import static com.datatorrent.api.Context.PortContext.STREAM_CODEC; + /** * DAG contains the logical declarations of operators and streams. * <p> @@ -270,6 +272,20 @@ public class LogicalPlan implements Serializable, DAG throw new UnsupportedOperationException("Not supported yet."); } + public StreamCodec<?> getStreamCodec() + { + return attributes.get(STREAM_CODEC); + } + + void setStreamCodec(StreamCodec<?> streamCodec) + { + if (streamCodec != null) { + StreamCodec<?> oldStreamCodec = attributes.put(STREAM_CODEC, streamCodec); + if (oldStreamCodec != null && oldStreamCodec != streamCodec) { + LOG.warn("Input port {} stream codec was changed from {} to {}", this.getPortName(), oldStreamCodec, streamCodec); + } + } + } } public final class OutputPortMeta implements DAG.OutputPortMeta, Serializable @@ -681,14 +697,14 @@ public class LogicalPlan implements Serializable, DAG private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, InputPort<?> port) { - StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null ? (StreamCodec<Object>) sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec(); + StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(STREAM_CODEC) != null ? (StreamCodec<Object>) sinkToPersistPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec(); if (inputStreamCodec != null) { Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<InputPortMeta, StreamCodec<Object>>(); codecs.put(sinkToPersistPortMeta, inputStreamCodec); InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port); - StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) != null) ? (StreamCodec<Object>) persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) port.getStreamCodec(); + StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(STREAM_CODEC) != null) ? (StreamCodec<Object>) persistOperatorPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) port.getStreamCodec(); StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator); - setInputPortAttribute(port, PortContext.STREAM_CODEC, codec); + setInputPortAttribute(port, STREAM_CODEC, codec); } } @@ -980,6 +996,12 @@ public class LogicalPlan implements Serializable, DAG metaPort.adqAnnotation = adqAnnotation; inPortMap.put(portObject, metaPort); markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass()); + if (metaPort.getStreamCodec() == null) { + metaPort.setStreamCodec(portObject.getStreamCodec()); + } else if (portObject.getStreamCodec() != null) { + LOG.info("Input port {} attribute {} overrides codec {} with {} codec", metaPort.getPortName(), STREAM_CODEC.getSimpleName(), + portObject.getStreamCodec(), metaPort.getStreamCodec()); + } } @Override @@ -1612,7 +1634,7 @@ public class LogicalPlan implements Serializable, DAG } for (StreamMeta n: this.streams.values()) { for (InputPortMeta sink : n.getSinks()) { - StreamCodec<?> streamCodec = sink.getValue(PortContext.STREAM_CODEC); + StreamCodec<?> streamCodec = sink.getValue(STREAM_CODEC); if (streamCodec != null) { classNames.add(streamCodec.getClass().getName()); } else { http://git-wip-us.apache.org/repos/asf/apex-core/blob/232eba36/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java index f30ceb6..7efce6c 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java @@ -38,7 +38,6 @@ import com.datatorrent.api.Partitioner.PartitionKeys; import com.datatorrent.api.StreamCodec; import com.datatorrent.common.util.Pair; -import com.datatorrent.stram.StreamingContainerAgent; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; @@ -230,7 +229,7 @@ public class StreamMapping implements java.io.Serializable boolean separateUnifiers = false; Integer lastId = null; for (InputPortMeta ipm : streamMeta.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); + StreamCodec<?> streamCodecInfo = ipm.getStreamCodec(); Integer id = plan.getStreamCodecIdentifier(streamCodecInfo); if (lastId == null) { lastId = id; @@ -249,7 +248,7 @@ public class StreamMapping implements java.io.Serializable unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0); } else { for (InputPortMeta ipm : streamMeta.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); + StreamCodec<?> streamCodecInfo = ipm.getStreamCodec(); if (!cascadeUnifierSourcesMap.containsKey(streamCodecInfo)) { unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0); cascadeUnifierSourcesMap.put(streamCodecInfo, unifierSources); @@ -303,7 +302,7 @@ public class StreamMapping implements java.io.Serializable unifier.inputs.clear(); List<PTOutput> doperUnifierSources = unifierSources; if (separateUnifiers) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(doperEntry.second); + StreamCodec<?> streamCodecInfo = doperEntry.second.getStreamCodec(); List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(streamCodecInfo); if (cascadeSources != null) { doperUnifierSources = cascadeSources; http://git-wip-us.apache.org/repos/asf/apex-core/blob/232eba36/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java index ddf3448..da5456e 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java @@ -1152,7 +1152,7 @@ public class StreamCodecTest Map<Integer, StreamCodec<?>> streamCodecs, String id, PhysicalPlan plan ) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(operatorMeta.getMeta(inputPort)); + StreamCodec<?> streamCodecInfo = operatorMeta.getMeta(inputPort).getStreamCodec(); Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodecInfo, plan)); Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodecInfo); checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodecInfo);
