Repository: apex-core Updated Branches: refs/heads/release-3.5 bd8f7bade -> fdebb1954
APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec() Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/fdebb195 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/fdebb195 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/fdebb195 Branch: refs/heads/release-3.5 Commit: fdebb1954fb87cadbef18c6d1e8c4e2b6b9752e0 Parents: bd8f7ba Author: Vlad Rozov <[email protected]> Authored: Mon Jan 30 17:24:45 2017 -0800 Committer: Vlad Rozov <[email protected]> Committed: Sat Mar 25 17:09:01 2017 -0700 ---------------------------------------------------------------------- .../stram/StreamingContainerAgent.java | 53 +++++++++----------- .../stram/StreamingContainerManager.java | 4 +- .../stram/plan/logical/LogicalPlan.java | 39 +++++++++----- .../stram/plan/physical/PhysicalPlan.java | 16 +++--- .../stram/plan/physical/StreamMapping.java | 13 ++--- .../com/datatorrent/stram/StreamCodecTest.java | 8 +-- .../stram/plan/StreamPersistanceTests.java | 2 +- 7 files changed, 69 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/fdebb195/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java index 2ea37f4..1effd15 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java @@ -201,7 +201,11 @@ public class StreamingContainerAgent if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { // input attributes of the downstream operator for (InputPortMeta sink : streamMeta.getSinks()) { - portInfo.contextAttributes = sink.getAttributes(); + try { + portInfo.contextAttributes = sink.getAttributes().clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); + } break; } } @@ -214,11 +218,11 @@ public class StreamingContainerAgent for (PTOperator.PTInput input : out.sinks) { // Create mappings for all non-inline operators if (input.target.getContainer() != out.source.getContainer()) { - InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input); - StreamCodec<?> streamCodecInfo = getStreamCodec(inputPortMeta); - Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo); + final StreamCodec<?> streamCodec = getIdentifyingInputPortMeta(input).getStreamCodec(); + final Integer id = physicalPlan.getStreamCodecIdentifier(streamCodec); + // TODO: replace with inputInfo.streamCodecs.putIfAbsent() after support for JDK 1.7 is dropped. if (!portInfo.streamCodecs.containsKey(id)) { - portInfo.streamCodecs.put(id, streamCodecInfo); + portInfo.streamCodecs.put(id, streamCodec); } } } @@ -247,11 +251,19 @@ public class StreamingContainerAgent InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta); if (inputPortMeta != null) { - inputInfo.contextAttributes = inputPortMeta.getAttributes(); + try { + inputInfo.contextAttributes = inputPortMeta.getAttributes().clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); + } } if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { - inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes(); + try { + inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); + } } inputInfo.sourceNodeId = sourceOutput.source.getId(); @@ -287,10 +299,12 @@ public class StreamingContainerAgent // On the input side there is a unlikely scenario of partitions even for inline stream that is being // handled. Always specifying a stream codec configuration in case that scenario happens. - InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in); - StreamCodec<?> streamCodecInfo = getStreamCodec(idInputPortMeta); - Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo); - inputInfo.streamCodecs.put(id, streamCodecInfo); + final StreamCodec<?> streamCodec = getIdentifyingInputPortMeta(in).getStreamCodec(); + final Integer id = physicalPlan.getStreamCodecIdentifier(streamCodec); + // TODO: replace with inputInfo.streamCodecs.putIfAbsent() after support for JDK 1.7 is dropped. + if (!inputInfo.streamCodecs.containsKey(id)) { + inputInfo.streamCodecs.put(id, streamCodec); + } ndi.inputs.add(inputInfo); } } @@ -343,23 +357,6 @@ public class StreamingContainerAgent return operator; } - public static StreamCodec<?> getStreamCodec(InputPortMeta inputPortMeta) - { - if (inputPortMeta != null) { - StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC); - if (codec == null) { - // it cannot be this object that gets returned. Depending on this value is dangerous - codec = inputPortMeta.getPortObject().getStreamCodec(); - if (codec != null) { - // don't create codec multiple times - it will assign a new identifier - inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec); - } - } - return codec; - } - return null; - } - /** * Create deploy info for operator. * <p> http://git-wip-us.apache.org/repos/asf/apex-core/blob/fdebb195/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 4d193d5..82dc887 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -2321,8 +2321,8 @@ public class StreamingContainerManager implements PlanContext for (PTOperator.PTOutput out : operator.getOutputs()) { if (!out.isDownStreamInline()) { for (InputPortMeta ipm : out.logicalStream.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); - Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo); + StreamCodec<?> streamCodec = ipm.getStreamCodec(); + Integer codecId = plan.getStreamCodecIdentifier(streamCodec); // following needs to match the concat logic in StreamingContainer String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString()); if (operator.getContainer().getState() == PTContainer.State.ACTIVE) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/fdebb195/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index f1ccaef..79226c6 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -103,6 +103,8 @@ import com.datatorrent.common.util.Pair; import com.datatorrent.stram.engine.DefaultUnifier; import com.datatorrent.stram.engine.Slider; +import static com.datatorrent.api.Context.PortContext.STREAM_CODEC; + /** * DAG contains the logical declarations of operators and streams. * <p> @@ -307,6 +309,20 @@ public class LogicalPlan implements Serializable, DAG throw new UnsupportedOperationException("Not supported yet."); } + public StreamCodec<?> getStreamCodec() + { + return attributes.get(STREAM_CODEC); + } + + void setStreamCodec(StreamCodec<?> streamCodec) + { + if (streamCodec != null) { + StreamCodec<?> oldStreamCodec = attributes.put(STREAM_CODEC, streamCodec); + if (oldStreamCodec != null && oldStreamCodec != streamCodec) { // once input port codec is set, it is not expected that it will be changed. + LOG.warn("Operator {} input port {} stream codec was changed from {} to {}", getOperatorWrapper().getName(), getPortName(), oldStreamCodec, streamCodec); + } + } + } } public final class OutputPortMeta implements DAG.OutputPortMeta, Serializable @@ -733,18 +749,14 @@ public class LogicalPlan implements Serializable, DAG private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, InputPort<?> port) { - StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null - ? (StreamCodec<Object>)sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) - : (StreamCodec<Object>)sinkToPersistPortMeta.getPortObject().getStreamCodec(); + StreamCodec<Object> inputStreamCodec = (StreamCodec<Object>)sinkToPersistPortMeta.getStreamCodec(); if (inputStreamCodec != null) { Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<>(); codecs.put(sinkToPersistPortMeta, inputStreamCodec); InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port); - StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) != null) - ? (StreamCodec<Object>)persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) - : (StreamCodec<Object>)port.getStreamCodec(); + StreamCodec<Object> specifiedCodecForPersistOperator = (StreamCodec<Object>)persistOperatorPortMeta.getStreamCodec(); StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator); - setInputPortAttribute(port, PortContext.STREAM_CODEC, codec); + persistOperatorPortMeta.setStreamCodec(codec); } } @@ -1065,6 +1077,12 @@ public class LogicalPlan implements Serializable, DAG metaPort.adqAnnotation = adqAnnotation; inPortMap.put(portObject, metaPort); markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass()); + if (metaPort.getStreamCodec() == null) { + metaPort.setStreamCodec(portObject.getStreamCodec()); + } else if (portObject.getStreamCodec() != null) { + LOG.info("Operator {} input port {} attribute {} overrides codec {} with {} codec", metaPort.getOperatorWrapper().getName(), + metaPort.getPortName(), STREAM_CODEC.getSimpleName(), portObject.getStreamCodec(), metaPort.getStreamCodec()); + } } @Override @@ -1614,14 +1632,9 @@ public class LogicalPlan implements Serializable, DAG } for (StreamMeta n: this.streams.values()) { for (InputPortMeta sink : n.getSinks()) { - StreamCodec<?> streamCodec = sink.getValue(PortContext.STREAM_CODEC); + StreamCodec<?> streamCodec = sink.getStreamCodec(); if (streamCodec != null) { classNames.add(streamCodec.getClass().getName()); - } else { - StreamCodec<?> codec = sink.getPortObject().getStreamCodec(); - if (codec != null) { - classNames.add(codec.getClass().getName()); - } } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/fdebb195/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index 4181971..909ee87 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -543,7 +543,7 @@ public class PhysicalPlan implements Serializable for (StreamMeta s : n.getOutputStreams().values()) { if (s.getPersistOperator() != null) { InputPortMeta persistInputPort = s.getPersistOperatorInputPort(); - StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>)persistInputPort.getAttributes().get(PortContext.STREAM_CODEC); + StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>)persistInputPort.getStreamCodec(); if (persistCodec == null) { continue; } @@ -556,12 +556,9 @@ public class PhysicalPlan implements Serializable // Check partitioning for persist operators per sink too for (Map.Entry<InputPortMeta, InputPortMeta> entry : s.sinkSpecificPersistInputPortMap.entrySet()) { InputPortMeta persistInputPort = entry.getValue(); - StreamCodec<?> codec = persistInputPort.getAttributes().get(PortContext.STREAM_CODEC); - if (codec != null) { - if (codec instanceof StreamCodecWrapperForPersistance) { - StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>)codec; - updatePersistOperatorWithSinkPartitions(persistInputPort, s.sinkSpecificPersistOperatorMap.get(entry.getKey()), persistCodec, entry.getKey()); - } + StreamCodec<?> streamCodec = persistInputPort.getStreamCodec(); + if (streamCodec != null && streamCodec instanceof StreamCodecWrapperForPersistance) { + updatePersistOperatorWithSinkPartitions(persistInputPort, s.sinkSpecificPersistOperatorMap.get(entry.getKey()), (StreamCodecWrapperForPersistance<?>)streamCodec, entry.getKey()); } } } @@ -593,8 +590,7 @@ public class PhysicalPlan implements Serializable Map<InputPortMeta, StreamCodec<?>> inputStreamCodecs = new HashMap<>(); // Logging is enabled for the stream for (InputPortMeta portMeta : s.getSinksToPersist()) { - InputPort<?> port = portMeta.getPortObject(); - StreamCodec<?> inputStreamCodec = (portMeta.getValue(PortContext.STREAM_CODEC) != null) ? portMeta.getValue(PortContext.STREAM_CODEC) : port.getStreamCodec(); + StreamCodec<?> inputStreamCodec = portMeta.getStreamCodec(); if (inputStreamCodec != null) { boolean alreadyAdded = false; @@ -619,7 +615,7 @@ public class PhysicalPlan implements Serializable // Create Wrapper codec for Stream persistence using all unique // stream codecs // Logger should write merged or union of all input stream codecs - StreamCodec<?> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : s.getPersistOperatorInputPort().getPortObject().getStreamCodec(); + StreamCodec<?> specifiedCodecForLogger = s.getPersistOperatorInputPort().getStreamCodec(); @SuppressWarnings({ "unchecked", "rawtypes" }) StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance(inputStreamCodecs, specifiedCodecForLogger); streamMetaToCodecMap.put(s, codec); http://git-wip-us.apache.org/repos/asf/apex-core/blob/fdebb195/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java index 81d6d44..ef7363c 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java @@ -39,7 +39,6 @@ import com.datatorrent.api.Partitioner.PartitionKeys; import com.datatorrent.api.StreamCodec; import com.datatorrent.common.util.Pair; -import com.datatorrent.stram.StreamingContainerAgent; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; @@ -236,8 +235,7 @@ public class StreamMapping implements java.io.Serializable boolean separateUnifiers = false; Integer lastId = null; for (InputPortMeta ipm : streamMeta.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); - Integer id = plan.getStreamCodecIdentifier(streamCodecInfo); + Integer id = plan.getStreamCodecIdentifier(ipm.getStreamCodec()); if (lastId == null) { lastId = id; } else if (!id.equals(lastId)) { @@ -255,10 +253,10 @@ public class StreamMapping implements java.io.Serializable unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0); } else { for (InputPortMeta ipm : streamMeta.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); - if (!cascadeUnifierSourcesMap.containsKey(streamCodecInfo)) { + StreamCodec<?> streamCodec = ipm.getStreamCodec(); + if (!cascadeUnifierSourcesMap.containsKey(streamCodec)) { unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0); - cascadeUnifierSourcesMap.put(streamCodecInfo, unifierSources); + cascadeUnifierSourcesMap.put(streamCodec, unifierSources); } } } @@ -320,8 +318,7 @@ public class StreamMapping implements java.io.Serializable unifier.inputs.clear(); List<PTOutput> doperUnifierSources = unifierSources; if (separateUnifiers) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(doperEntry.second); - List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(streamCodecInfo); + List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(doperEntry.second.getStreamCodec()); if (cascadeSources != null) { doperUnifierSources = cascadeSources; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/fdebb195/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java index 4ff9e51..5ca5a8c 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java @@ -1159,10 +1159,10 @@ public class StreamCodecTest Map<Integer, StreamCodec<?>> streamCodecs, String id, PhysicalPlan plan) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(operatorMeta.getMeta(inputPort)); - Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodecInfo, plan)); - Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodecInfo); - checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodecInfo); + StreamCodec<?> streamCodec = operatorMeta.getMeta(inputPort).getStreamCodec(); + Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodec, plan)); + Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodec); + checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodec); } private void checkPresentStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id, http://git-wip-us.apache.org/repos/asf/apex-core/blob/fdebb195/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java index 4472743..d40fd7b 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java @@ -994,7 +994,7 @@ public class StreamPersistanceTests assertTrue("persist operator should be part of the operators to be redeployed", operators.contains(persistOperatorContainer)); LogicalPlan.StreamMeta s1 = (LogicalPlan.StreamMeta)s; - StreamCodec codec = s1.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC); + StreamCodec codec = s1.getPersistOperatorInputPort().getStreamCodec(); assertEquals("Codec should be instance of StreamCodecWrapper", codec instanceof StreamCodecWrapperForPersistance, true); StreamCodecWrapperForPersistance wrapperCodec = (StreamCodecWrapperForPersistance)codec;
