Repository: apex-core Updated Branches: refs/heads/master a42c67bc2 -> 3119aba89
APEXCORE-448 made operator name available in the operator context Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/9bcde2e3 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/9bcde2e3 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/9bcde2e3 Branch: refs/heads/master Commit: 9bcde2e306ce7f6c3199112ad5bb737370f6624e Parents: 130ce6b Author: Chandni Singh <[email protected]> Authored: Fri Aug 5 21:19:22 2016 -0700 Committer: Chandni Singh <[email protected]> Committed: Fri Aug 5 21:19:22 2016 -0700 ---------------------------------------------------------------------- .../main/java/com/datatorrent/api/Context.java | 5 ++ .../stram/api/OperatorDeployInfo.java | 6 ++ .../stram/engine/OperatorContext.java | 15 +++- .../stram/engine/StreamingContainer.java | 5 +- .../stram/engine/AtMostOnceTest.java | 3 +- .../stram/engine/GenericNodeTest.java | 14 ++-- .../datatorrent/stram/engine/InputNodeTest.java | 2 +- .../com/datatorrent/stram/engine/NodeTest.java | 13 ++-- .../stram/engine/OperatorContextTest.java | 79 ++++++++++++++++++++ .../stram/engine/ProcessingModeTests.java | 2 +- .../stram/stream/InlineStreamTest.java | 6 +- 11 files changed, 130 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index a0f3ad3..187bf08 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -323,6 +323,11 @@ public interface Context int getId(); /** + * @return the logical operator name which was used to add the operator in tha DAG. + */ + String getName(); + + /** * Return the number of windows before the next checkpoint including the current window. * @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window */ http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java index bccbedf..219017b 100644 --- a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java +++ b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java @@ -52,6 +52,12 @@ public class OperatorDeployInfo implements Serializable, OperatorContext } @Override + public String getName() + { + return name; + } + + @Override public AttributeMap getAttributes() { return contextAttributes; http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java index 424ffcc..7113280 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java @@ -21,9 +21,13 @@ package com.datatorrent.stram.engine; import java.util.Collection; import java.util.concurrent.BlockingQueue; +import javax.validation.constraints.NotNull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Context; import com.datatorrent.api.StatsListener.OperatorRequest; @@ -44,6 +48,7 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont private Thread thread; private long lastProcessedWindowId; private final int id; + private final String name; // the size of the circular queue should be configurable. hardcoded to 1024 for now. private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<ContainerStats.OperatorStats>(1024); private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<OperatorRequest>(1024); @@ -81,14 +86,16 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont /** * * @param id the value of id + * @param name name of the operator * @param attributes the value of attributes * @param parentContext */ - public OperatorContext(int id, AttributeMap attributes, Context parentContext) + public OperatorContext(int id, @NotNull String name, AttributeMap attributes, Context parentContext) { super(attributes, parentContext); this.lastProcessedWindowId = Stateless.WINDOW_ID; this.id = id; + this.name = Preconditions.checkNotNull(name, "operator name"); this.stateless = super.getValue(OperatorContext.STATELESS); } @@ -99,6 +106,12 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont } @Override + public String getName() + { + return name; + } + + @Override public int getWindowsFromCheckpoint() { return windowsFromCheckpoint; http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 54b8a6e..27688e3 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -902,14 +902,15 @@ public class StreamingContainer extends YarnContainerMain Context parentContext; if (ndi instanceof UnifierDeployInfo) { - OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext); + OperatorContext unifiedOperatorContext = new OperatorContext(0, ndi.name, + ((UnifierDeployInfo)ndi).operatorAttributes, containerContext); parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext); massageUnifierDeployInfo(ndi); } else { parentContext = containerContext; } - OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext); + OperatorContext ctx = new OperatorContext(ndi.id, ndi.name, ndi.contextAttributes, parentContext); ctx.attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, ndi.checkpoint.windowId); logger.debug("Restoring operator {} to checkpoint {} stateless={}.", ndi.id, Codec.getStringWindowId(ndi.checkpoint.windowId), ctx.stateless); Node<?> node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type); http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java index b1296c8..cc777f7 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java @@ -83,7 +83,8 @@ public class AtMostOnceTest extends ProcessingModeTests map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0); map.put(OperatorContext.PROCESSING_MODE, processingMode); - final GenericNode node = new GenericNode(new MultiInputOperator(), new com.datatorrent.stram.engine.OperatorContext(1, map, null)); + final GenericNode node = new GenericNode(new MultiInputOperator(), + new com.datatorrent.stram.engine.OperatorContext(1, "operator", map, null)); AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input1", 1024); AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("input1", 1024); node.connectInputPort("input1", reservoir1); http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index b7f6362..5dfa5f3 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -145,9 +145,9 @@ public class GenericNodeTest public volatile List<Checkpoint> checkpoints = Lists.newArrayList(); - public TestStatsOperatorContext(int id, AttributeMap attributes, Context parentContext) + public TestStatsOperatorContext(int id, String name, AttributeMap attributes, Context parentContext) { - super(id, attributes, parentContext); + super(id, name, attributes, parentContext); } @Override @@ -276,7 +276,8 @@ public class GenericNodeTest long sleeptime = 25L; final ArrayList<Object> list = new ArrayList<Object>(); GenericOperator go = new GenericOperator(); - final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), null)); + final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator", + new DefaultAttributeMap(), null)); gn.setId(1); AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024); AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024); @@ -397,7 +398,8 @@ public class GenericNodeTest long maxSleep = 5000; long sleeptime = 25L; GenericOperator go = new GenericOperator(); - final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), null)); + final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator", + new DefaultAttributeMap(), null)); gn.setId(1); AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024); AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024); @@ -537,7 +539,7 @@ public class GenericNodeTest dam.put(OperatorContext.STORAGE_AGENT, storageAgent); - TestStatsOperatorContext operatorContext = new TestStatsOperatorContext(0, dam, null); + TestStatsOperatorContext operatorContext = new TestStatsOperatorContext(0, "operator", dam, null); final GenericNode gn = new GenericNode(go, operatorContext); gn.setId(1); @@ -635,7 +637,7 @@ public class GenericNodeTest DefaultAttributeMap attrMap = new DefaultAttributeMap(); attrMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, dagCheckPoint); attrMap.put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, opCheckPoint); - final OperatorContext context = new com.datatorrent.stram.engine.OperatorContext(0, attrMap, null); + final OperatorContext context = new com.datatorrent.stram.engine.OperatorContext(0, "operator", attrMap, null); final GenericNode gn = new GenericNode(go, context); gn.setId(1); http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java index 1c51abb..e182b75 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java @@ -69,7 +69,7 @@ public class InputNodeTest dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 10); dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 10); - final InputNode in = new InputNode(tio, new com.datatorrent.stram.engine.OperatorContext(0, dam, null)); + final InputNode in = new InputNode(tio, new com.datatorrent.stram.engine.OperatorContext(0, "operator", dam, null)); in.setId(1); TestSink testSink = new TestSink(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java index 48a819f..26bd7a0 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java @@ -206,7 +206,7 @@ public class NodeTest attributeMap.put(OperatorContext.STORAGE_AGENT, new StorageAgentImpl()); attributeMap.put(OperatorContext.STATELESS, true); Node<StatelessOperator> node = new Node<StatelessOperator>(new StatelessOperator(), - new com.datatorrent.stram.engine.OperatorContext(0, attributeMap, null)) + new com.datatorrent.stram.engine.OperatorContext(0,"operator", attributeMap, null)) { @Override public void connectInputPort(String port, SweepableReservoir reservoir) @@ -239,18 +239,18 @@ public class NodeTest DefaultAttributeMap attributeMap = new DefaultAttributeMap(); attributeMap.put(OperatorContext.STORAGE_AGENT, new StorageAgentImpl()); Node<TestGenericOperator> node = new Node<TestGenericOperator>(new TestGenericOperator(), - new com.datatorrent.stram.engine.OperatorContext(0, attributeMap, null)) + new com.datatorrent.stram.engine.OperatorContext(0, "operator", attributeMap, null)) { @Override public void connectInputPort(String port, SweepableReservoir reservoir) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public void run() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } }; @@ -292,9 +292,10 @@ public class NodeTest final Node in; if (trueGenericFalseInput) { - in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null)); + in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, "operator", dam, null)); } else { - in = new InputNode((InputCheckpointOperator)gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null)); + in = new InputNode((InputCheckpointOperator)gco, new com.datatorrent.stram.engine.OperatorContext(0, "operator", + dam, null)); } in.setId(1); http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java b/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java new file mode 100644 index 0000000..aa48bbe --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.engine; + +import java.util.concurrent.CountDownLatch; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; + +/** + * Tests for {@link OperatorContext} + */ +public class OperatorContextTest +{ + private static CountDownLatch latch = new CountDownLatch(1); + private static volatile String operatorName; + + private static class MockInputOperator extends BaseOperator implements InputOperator + { + @Override + public void setup(Context.OperatorContext context) + { + operatorName = Preconditions.checkNotNull(context.getName(), "operator name"); + latch.countDown(); + } + + @Override + public void emitTuples() + { + } + } + + @Test + public void testInjectionOfOperatorName() throws Exception + { + StreamingApplication application = new StreamingApplication() + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + dag.addOperator("input", new MockInputOperator()); + } + }; + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(application, new Configuration()); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + latch.await(); + Assert.assertEquals("operator name", "input", operatorName); + lc.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java index 55d8d5f..3e208c8 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java @@ -261,7 +261,7 @@ public class ProcessingModeTests map.put(OperatorContext.PROCESSING_MODE, processingMode); final GenericNode node = new GenericNode(new MultiInputOperator(), - new com.datatorrent.stram.engine.OperatorContext(1, map, null)); + new com.datatorrent.stram.engine.OperatorContext(1, "operator", map, null)); AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input1", 1024); AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("input1", 1024); node.connectInputPort("input1", reservoir1); http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java index ef63a62..ed145c7 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java @@ -56,12 +56,14 @@ public class InlineStreamTest final int totalTupleCount = 5000; final PassThroughNode<Object> operator1 = new PassThroughNode<Object>(); - final GenericNode node1 = new GenericNode(operator1, new OperatorContext(1, new DefaultAttributeMap(), null)); + final GenericNode node1 = new GenericNode(operator1, new OperatorContext(1, "operator1", new DefaultAttributeMap(), + null)); node1.setId(1); operator1.setup(node1.context); final PassThroughNode<Object> operator2 = new PassThroughNode<Object>(); - final GenericNode node2 = new GenericNode(operator2, new OperatorContext(2, new DefaultAttributeMap(), null)); + final GenericNode node2 = new GenericNode(operator2, new OperatorContext(2, "operator2", new DefaultAttributeMap(), + null)); node2.setId(2); operator2.setup(node2.context);
