Repository: apex-core Updated Branches: refs/heads/release-3.4 4fb1a5097 -> 2f34efd3f
APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec() Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/2f34efd3 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/2f34efd3 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/2f34efd3 Branch: refs/heads/release-3.4 Commit: 2f34efd3f8be4a8852471c61f0b596586bc2c51b Parents: 4fb1a50 Author: Vlad Rozov <[email protected]> Authored: Wed Feb 1 08:03:45 2017 -0800 Committer: Vlad Rozov <[email protected]> Committed: Wed Feb 1 11:12:46 2017 -0800 ---------------------------------------------------------------------- .../stram/StreamingContainerAgent.java | 39 +++++++++----------- .../stram/StreamingContainerManager.java | 4 +- .../stram/plan/logical/LogicalPlan.java | 34 ++++++++++++++--- .../stram/plan/physical/StreamMapping.java | 7 ++-- .../com/datatorrent/stram/StreamCodecTest.java | 2 +- 5 files changed, 51 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/2f34efd3/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java index 598fea5..b4349f5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java @@ -200,7 +200,11 @@ public class StreamingContainerAgent if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { // input attributes of the downstream operator for (InputPortMeta sink : streamMeta.getSinks()) { - portInfo.contextAttributes = sink.getAttributes(); + try { + portInfo.contextAttributes = sink.getAttributes().clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); + } break; } } @@ -214,7 +218,7 @@ public class StreamingContainerAgent // Create mappings for all non-inline operators if (input.target.getContainer() != out.source.getContainer()) { InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input); - StreamCodec<?> streamCodecInfo = getStreamCodec(inputPortMeta); + StreamCodec<?> streamCodecInfo = inputPortMeta.getStreamCodec(); Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo); if (!portInfo.streamCodecs.containsKey(id)) { portInfo.streamCodecs.put(id, streamCodecInfo); @@ -246,11 +250,19 @@ public class StreamingContainerAgent InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta); if (inputPortMeta != null) { - inputInfo.contextAttributes = inputPortMeta.getAttributes(); + try { + inputInfo.contextAttributes = inputPortMeta.getAttributes().clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); + } } if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { - inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes(); + try { + inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); + } } inputInfo.sourceNodeId = sourceOutput.source.getId(); @@ -287,7 +299,7 @@ public class StreamingContainerAgent // On the input side there is a unlikely scenario of partitions even for inline stream that is being // handled. Always specifying a stream codec configuration in case that scenario happens. InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in); - StreamCodec<?> streamCodecInfo = getStreamCodec(idInputPortMeta); + StreamCodec<?> streamCodecInfo = idInputPortMeta.getStreamCodec(); Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo); inputInfo.streamCodecs.put(id, streamCodecInfo); ndi.inputs.add(inputInfo); @@ -342,23 +354,6 @@ public class StreamingContainerAgent return operator; } - public static StreamCodec<?> getStreamCodec(InputPortMeta inputPortMeta) - { - if (inputPortMeta != null) { - StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC); - if (codec == null) { - // it cannot be this object that gets returned. Depending on this value is dangerous - codec = inputPortMeta.getPortObject().getStreamCodec(); - if (codec != null) { - // don't create codec multiple times - it will assign a new identifier - inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec); - } - } - return codec; - } - return null; - } - /** * Create deploy info for operator. * <p> http://git-wip-us.apache.org/repos/asf/apex-core/blob/2f34efd3/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index b12709e..7226039 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -2183,7 +2183,7 @@ public class StreamingContainerManager implements PlanContext } for (InputPortMeta ipm : out.logicalStream.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); + StreamCodec<?> streamCodecInfo = ipm.getStreamCodec(); Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo); // following needs to match the concat logic in StreamingContainer String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString()); @@ -2279,7 +2279,7 @@ public class StreamingContainerManager implements PlanContext for (PTOperator.PTOutput out : operator.getOutputs()) { if (!out.isDownStreamInline()) { for (InputPortMeta ipm : out.logicalStream.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); + StreamCodec<?> streamCodecInfo = ipm.getStreamCodec(); Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo); // following needs to match the concat logic in StreamingContainer String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString()); http://git-wip-us.apache.org/repos/asf/apex-core/blob/2f34efd3/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index af6b1bc..5ccee5c 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -103,6 +103,8 @@ import com.datatorrent.common.util.Pair; import com.datatorrent.stram.engine.DefaultUnifier; import com.datatorrent.stram.engine.Slider; +import static com.datatorrent.api.Context.PortContext.STREAM_CODEC; + /** * DAG contains the logical declarations of operators and streams. * <p> @@ -306,6 +308,20 @@ public class LogicalPlan implements Serializable, DAG throw new UnsupportedOperationException("Not supported yet."); } + public StreamCodec<?> getStreamCodec() + { + return attributes.get(STREAM_CODEC); + } + + void setStreamCodec(StreamCodec<?> streamCodec) + { + if (streamCodec != null) { + StreamCodec<?> oldStreamCodec = attributes.put(STREAM_CODEC, streamCodec); + if (oldStreamCodec != null && oldStreamCodec != streamCodec) { + LOG.warn("Input port {} stream codec was changed from {} to {}", this.getPortName(), oldStreamCodec, streamCodec); + } + } + } } public final class OutputPortMeta implements DAG.OutputPortMeta, Serializable @@ -732,18 +748,18 @@ public class LogicalPlan implements Serializable, DAG private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, InputPort<?> port) { - StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null - ? (StreamCodec<Object>)sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) + StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(STREAM_CODEC) != null + ? (StreamCodec<Object>)sinkToPersistPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>)sinkToPersistPortMeta.getPortObject().getStreamCodec(); if (inputStreamCodec != null) { Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<>(); codecs.put(sinkToPersistPortMeta, inputStreamCodec); InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port); - StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) != null) - ? (StreamCodec<Object>)persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) + StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(STREAM_CODEC) != null) + ? (StreamCodec<Object>)persistOperatorPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>)port.getStreamCodec(); StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator); - setInputPortAttribute(port, PortContext.STREAM_CODEC, codec); + setInputPortAttribute(port, STREAM_CODEC, codec); } } @@ -1064,6 +1080,12 @@ public class LogicalPlan implements Serializable, DAG metaPort.adqAnnotation = adqAnnotation; inPortMap.put(portObject, metaPort); markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass()); + if (metaPort.getStreamCodec() == null) { + metaPort.setStreamCodec(portObject.getStreamCodec()); + } else if (portObject.getStreamCodec() != null) { + LOG.info("Input port {} attribute {} overrides codec {} with {} codec", metaPort.getPortName(), STREAM_CODEC.getSimpleName(), + portObject.getStreamCodec(), metaPort.getStreamCodec()); + } } @Override @@ -1607,7 +1629,7 @@ public class LogicalPlan implements Serializable, DAG } for (StreamMeta n: this.streams.values()) { for (InputPortMeta sink : n.getSinks()) { - StreamCodec<?> streamCodec = sink.getValue(PortContext.STREAM_CODEC); + StreamCodec<?> streamCodec = sink.getValue(STREAM_CODEC); if (streamCodec != null) { classNames.add(streamCodec.getClass().getName()); } else { http://git-wip-us.apache.org/repos/asf/apex-core/blob/2f34efd3/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java index e404d5a..76a934e 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java @@ -38,7 +38,6 @@ import com.datatorrent.api.Partitioner.PartitionKeys; import com.datatorrent.api.StreamCodec; import com.datatorrent.common.util.Pair; -import com.datatorrent.stram.StreamingContainerAgent; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; @@ -235,7 +234,7 @@ public class StreamMapping implements java.io.Serializable boolean separateUnifiers = false; Integer lastId = null; for (InputPortMeta ipm : streamMeta.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); + StreamCodec<?> streamCodecInfo = ipm.getStreamCodec(); Integer id = plan.getStreamCodecIdentifier(streamCodecInfo); if (lastId == null) { lastId = id; @@ -254,7 +253,7 @@ public class StreamMapping implements java.io.Serializable unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0); } else { for (InputPortMeta ipm : streamMeta.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); + StreamCodec<?> streamCodecInfo = ipm.getStreamCodec(); if (!cascadeUnifierSourcesMap.containsKey(streamCodecInfo)) { unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0); cascadeUnifierSourcesMap.put(streamCodecInfo, unifierSources); @@ -308,7 +307,7 @@ public class StreamMapping implements java.io.Serializable unifier.inputs.clear(); List<PTOutput> doperUnifierSources = unifierSources; if (separateUnifiers) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(doperEntry.second); + StreamCodec<?> streamCodecInfo = doperEntry.second.getStreamCodec(); List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(streamCodecInfo); if (cascadeSources != null) { doperUnifierSources = cascadeSources; http://git-wip-us.apache.org/repos/asf/apex-core/blob/2f34efd3/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java index d9a9ee4..7a372ee 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java @@ -1160,7 +1160,7 @@ public class StreamCodecTest Map<Integer, StreamCodec<?>> streamCodecs, String id, PhysicalPlan plan) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(operatorMeta.getMeta(inputPort)); + StreamCodec<?> streamCodecInfo = operatorMeta.getMeta(inputPort).getStreamCodec(); Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodecInfo, plan)); Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodecInfo); checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodecInfo);
