Repository: samza
Updated Branches:
  refs/heads/master 711dd8dc3 -> 1296c7ff9


http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 1426444..5b3c3a0 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -198,12 +198,12 @@ public class TestMessageStreamImpl {
   public void testRepartition() {
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
-
-    String streamName = String.format("%s-%s", 
OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
+    String mockOpName = "mockName";
+    when(mockGraph.getNextOpId(anyObject(), 
anyObject())).thenReturn(mockOpName);
     OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
     KVSerde mockKVSerde = mock(KVSerde.class);
     IntermediateMessageStreamImpl mockIntermediateStream = 
mock(IntermediateMessageStreamImpl.class);
-    when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKVSerde)))
+    when(mockGraph.getIntermediateStream(eq(mockOpName), eq(mockKVSerde)))
         .thenReturn(mockIntermediateStream);
     when(mockIntermediateStream.getOutputStream())
         .thenReturn(mockOutputStreamImpl);
@@ -211,7 +211,7 @@ public class TestMessageStreamImpl {
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
     Function mockKeyFunction = mock(Function.class);
     Function mockValueFunction = mock(Function.class);
-    inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde);
+    inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde, 
"p1");
 
     ArgumentCaptor<OperatorSpec> registeredOpCaptor = 
ArgumentCaptor.forClass(OperatorSpec.class);
     verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
@@ -228,11 +228,11 @@ public class TestMessageStreamImpl {
   public void testRepartitionWithoutSerde() {
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
-
-    String streamName = String.format("%s-%s", 
OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
+    String mockOpName = "mockName";
+    when(mockGraph.getNextOpId(anyObject(), 
anyObject())).thenReturn(mockOpName);
     OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
     IntermediateMessageStreamImpl mockIntermediateStream = 
mock(IntermediateMessageStreamImpl.class);
-    when(mockGraph.getIntermediateStream(eq(streamName), eq(null)))
+    when(mockGraph.getIntermediateStream(eq(mockOpName), eq(null)))
         .thenReturn(mockIntermediateStream);
     when(mockIntermediateStream.getOutputStream())
         .thenReturn(mockOutputStreamImpl);
@@ -240,7 +240,7 @@ public class TestMessageStreamImpl {
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
     Function mockKeyFunction = mock(Function.class);
     Function mockValueFunction = mock(Function.class);
-    inputStream.partitionBy(mockKeyFunction, mockValueFunction);
+    inputStream.partitionBy(mockKeyFunction, mockValueFunction, "p1");
 
     ArgumentCaptor<OperatorSpec> registeredOpCaptor = 
ArgumentCaptor.forClass(OperatorSpec.class);
     verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
@@ -264,9 +264,10 @@ public class TestMessageStreamImpl {
     Supplier<Integer> initialValue = () -> 0;
 
     // should compile since TestMessageEnvelope (input for functions) is base 
class of TestInputMessageEnvelope (M)
-    Window<TestInputMessageEnvelope, String, Integer> window = Windows
-        .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, 
aggregator, null, mock(Serde.class));
-    MessageStream<WindowPane<String, Integer>> windowedStream = 
inputStream.window(window);
+    Window<TestInputMessageEnvelope, String, Integer> window =
+        Windows.keyedTumblingWindow(keyExtractor, Duration.ofHours(1), 
initialValue, aggregator,
+            null, mock(Serde.class));
+    MessageStream<WindowPane<String, Integer>> windowedStream = 
inputStream.window(window, "w1");
 
     ArgumentCaptor<OperatorSpec> registeredOpCaptor = 
ArgumentCaptor.forClass(OperatorSpec.class);
     verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
@@ -289,7 +290,8 @@ public class TestMessageStreamImpl {
         mock(JoinFunction.class);
 
     Duration joinTtl = Duration.ofMinutes(1);
-    source1.join(source2, mockJoinFn, mock(Serde.class), mock(Serde.class), 
mock(Serde.class), joinTtl);
+    source1.join(source2, mockJoinFn,
+        mock(Serde.class), mock(Serde.class), mock(Serde.class), joinTtl, 
"j1");
 
     ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = 
ArgumentCaptor.forClass(OperatorSpec.class);
     
verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture());

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 45583c2..e0152a0 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators;
 
 import junit.framework.Assert;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.data.TestMessageEnvelope;
@@ -38,6 +39,8 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -363,15 +366,14 @@ public class TestStreamGraphImpl {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
-    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
-    
when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+    String mockStreamName = "mockStreamName";
+    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
 
     Serde mockValueSerde = mock(Serde.class);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream("test-stream-1", mockValueSerde);
+        graph.getIntermediateStream(mockStreamName, mockValueSerde);
 
     assertEquals(graph.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
     assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
@@ -387,9 +389,8 @@ public class TestStreamGraphImpl {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
-    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
-    
when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+    String mockStreamName = "mockStreamName";
+    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
 
@@ -399,7 +400,7 @@ public class TestStreamGraphImpl {
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream("test-stream-1", mockKVSerde);
+        graph.getIntermediateStream(mockStreamName, mockKVSerde);
 
     assertEquals(graph.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
     assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
@@ -415,16 +416,15 @@ public class TestStreamGraphImpl {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
-    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
-    
when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+    String mockStreamName = "mockStreamName";
+    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
 
     Serde mockValueSerde = mock(Serde.class);
     graph.setDefaultSerde(mockValueSerde);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream("test-stream-1", null);
+        graph.getIntermediateStream(mockStreamName, null);
 
     assertEquals(graph.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
     assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
@@ -440,9 +440,8 @@ public class TestStreamGraphImpl {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
-    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
-    
when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+    String mockStreamName = "mockStreamName";
+    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
 
@@ -453,7 +452,7 @@ public class TestStreamGraphImpl {
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     graph.setDefaultSerde(mockKVSerde);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream("test-stream-1", null);
+        graph.getIntermediateStream(mockStreamName, null);
 
     assertEquals(graph.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
     assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
@@ -469,13 +468,12 @@ public class TestStreamGraphImpl {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
-    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
-    
when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+    String mockStreamName = "mockStreamName";
+    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream("test-stream-1", null);
+        graph.getIntermediateStream(mockStreamName, null);
 
     assertEquals(graph.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
     assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
@@ -499,9 +497,26 @@ public class TestStreamGraphImpl {
   @Test
   public void testGetNextOpIdIncrementsId() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-    assertEquals(graph.getNextOpId(), 0);
-    assertEquals(graph.getNextOpId(), 1);
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+    assertEquals("jobName-1234-merge-0", graph.getNextOpId(OpCode.MERGE, 
null));
+    assertEquals("jobName-1234-join-customName", 
graph.getNextOpId(OpCode.JOIN, "customName"));
+    assertEquals("jobName-1234-map-2", graph.getNextOpId(OpCode.MAP, null));
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testGetNextOpIdRejectsDuplicates() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+    assertEquals("jobName-1234-join-customName", 
graph.getNextOpId(OpCode.JOIN, "customName"));
+    graph.getNextOpId(OpCode.JOIN, "customName"); // should throw
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
index aee457e..2140af1 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -26,6 +26,7 @@ import junit.framework.Assert;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.impl.store.TestInMemoryStore;
@@ -58,6 +59,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.function.Function;
 
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -71,6 +74,8 @@ public class TestWindowOperator {
   @Before
   public void setup() throws Exception {
     config = mock(Config.class);
+    when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
+    when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
     taskContext = mock(TaskContextImpl.class);
     runner = mock(ApplicationRunner.class);
     Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
@@ -79,7 +84,8 @@ public class TestWindowOperator {
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
     when(taskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
-    when(taskContext.getStore("window-3")).thenReturn(new 
TestInMemoryStore<>(storeKeySerde, storeValSerde));
+    when(taskContext.getStore("jobName-jobId-window-w1"))
+        .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
     when(runner.getStreamSpec("integers")).thenReturn(new 
StreamSpec("integers", "integers", "kafka"));
   }
 
@@ -93,7 +99,8 @@ public class TestWindowOperator {
     TestClock testClock = new TestClock();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
-    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, 
Collection<IntegerEnvelope>>) envelope.getMessage());
     integers.forEach(n -> task.process(new IntegerEnvelope(n), 
messageCollector, taskCoordinator));
     testClock.advanceTime(Duration.ofSeconds(1));
 
@@ -126,7 +133,8 @@ public class TestWindowOperator {
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
 
-    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, 
Collection<IntegerEnvelope>>) envelope.getMessage());
     Assert.assertEquals(windowPanes.size(), 0);
 
     integers.forEach(n -> task.process(new IntegerEnvelope(n), 
messageCollector, taskCoordinator));
@@ -150,7 +158,8 @@ public class TestWindowOperator {
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
 
-    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, 
Collection<IntegerEnvelope>>) envelope.getMessage());
     integers.forEach(n -> task.process(new IntegerEnvelope(n), 
messageCollector, taskCoordinator));
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
@@ -176,7 +185,8 @@ public class TestWindowOperator {
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
-    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, 
Collection<IntegerEnvelope>>) envelope.getMessage());
     task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     testClock.advanceTime(Duration.ofSeconds(1));
@@ -222,7 +232,8 @@ public class TestWindowOperator {
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
 
-    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, 
Collection<IntegerEnvelope>>) envelope.getMessage());
     task.init(config, taskContext);
 
     task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
@@ -254,7 +265,8 @@ public class TestWindowOperator {
     task.init(config, taskContext);
 
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
-    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, 
Collection<IntegerEnvelope>>) envelope.getMessage());
     task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     Assert.assertEquals(windowPanes.size(), 1);
@@ -297,7 +309,8 @@ public class TestWindowOperator {
     task.init(config, taskContext);
 
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
-    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, 
Collection<IntegerEnvelope>>) envelope.getMessage());
     task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     //assert that the count trigger fired
@@ -351,7 +364,8 @@ public class TestWindowOperator {
         Triggers.repeat(Triggers.any(Triggers.count(2), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
 
-    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    MessageCollector messageCollector =
+        envelope -> windowPanes.add((WindowPane<Integer, 
Collection<IntegerEnvelope>>) envelope.getMessage());
 
     TestClock testClock = new TestClock();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
@@ -403,9 +417,10 @@ public class TestWindowOperator {
               .map(kv -> new IntegerEnvelope(kv.getKey()));
       Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
-        .map(m -> m)
-        .window(Windows.keyedTumblingWindow(keyFn, duration, new 
IntegerSerde(), new IntegerEnvelopeSerde()).setEarlyTrigger(earlyTrigger)
-          .setAccumulationMode(mode))
+          .map(m -> m)
+          .window(Windows.keyedTumblingWindow(keyFn, duration, new 
IntegerSerde(), new IntegerEnvelopeSerde())
+              .setEarlyTrigger(earlyTrigger)
+              .setAccumulationMode(mode), "w1")
           .sink((message, messageCollector, taskCoordinator) -> {
               messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
             });
@@ -434,8 +449,9 @@ public class TestWindowOperator {
       Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
           .map(m -> m)
-          .window(Windows.tumblingWindow(duration, new 
IntegerEnvelopeSerde()).setEarlyTrigger(earlyTrigger)
-              .setAccumulationMode(mode))
+          .window(Windows.tumblingWindow(duration, new IntegerEnvelopeSerde())
+              .setEarlyTrigger(earlyTrigger)
+              .setAccumulationMode(mode), "w1")
           .sink((message, messageCollector, taskCoordinator) -> {
               messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
             });
@@ -463,7 +479,7 @@ public class TestWindowOperator {
       inStream
           .map(m -> m)
           .window(Windows.keyedSessionWindow(keyFn, duration, new 
IntegerSerde(), new IntegerEnvelopeSerde())
-              .setAccumulationMode(mode))
+              .setAccumulationMode(mode), "w1")
           .sink((message, messageCollector, taskCoordinator) -> {
               messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
             });

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 4a78da8..904367b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -209,7 +209,7 @@ public class TestOperatorImpl {
 
   private static class TestOpSpec extends OperatorSpec<Object, Object> {
     TestOpSpec() {
-     super(OpCode.INPUT, 1);
+     super(OpCode.INPUT, "1");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 1c14fb4..47e55a8 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -71,6 +71,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -129,15 +130,19 @@ public class TestOperatorImplGraph {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new 
StreamSpec("input", "input-stream", "input-system"));
     when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new 
StreamSpec("output", "output-stream", "output-system"));
-    when(mockRunner.getStreamSpec(eq("null-null-partition_by-1")))
+    when(mockRunner.getStreamSpec(eq("jobName-jobId-partition_by-p1")))
         .thenReturn(new StreamSpec("intermediate", "intermediate-stream", 
"intermediate-system"));
-    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
+    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("jobId");
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
     MessageStream<Object> inputStream = streamGraph.getInputStream("input");
     OutputStream<KV<Integer, String>> outputStream = streamGraph
         .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), 
mock(StringSerde.class)));
 
     inputStream
-        .partitionBy(Object::hashCode, Object::toString, 
KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)))
+        .partitionBy(Object::hashCode, Object::toString,
+            KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)), 
"p1")
         .sendTo(outputStream);
 
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
@@ -147,7 +152,7 @@ public class TestOperatorImplGraph {
     when(jobModel.getContainers()).thenReturn(Collections.EMPTY_MAP);
     when(mockTaskContext.getJobModel()).thenReturn(jobModel);
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(streamGraph, mock(Config.class), 
mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, 
mock(Clock.class));
 
     InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream"));
     assertEquals(1, inputOpImpl.registeredOperators.size());
@@ -215,22 +220,25 @@ public class TestOperatorImplGraph {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     when(mockRunner.getStreamSpec(eq("input1"))).thenReturn(new 
StreamSpec("input1", "input-stream1", "input-system"));
     when(mockRunner.getStreamSpec(eq("input2"))).thenReturn(new 
StreamSpec("input2", "input-stream2", "input-system"));
-    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
+    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("jobId");
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
 
     JoinFunction mockJoinFunction = mock(JoinFunction.class);
     MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", 
new NoOpSerde<>());
     MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", 
new NoOpSerde<>());
     inputStream1.join(inputStream2, mockJoinFunction,
-        mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1));
+        mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1), "j1");
 
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     KeyValueStore mockLeftStore = mock(KeyValueStore.class);
-    when(mockTaskContext.getStore(eq("join-2-L"))).thenReturn(mockLeftStore);
+    
when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-L"))).thenReturn(mockLeftStore);
     KeyValueStore mockRightStore = mock(KeyValueStore.class);
-    when(mockTaskContext.getStore(eq("join-2-R"))).thenReturn(mockRightStore);
+    
when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore);
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(streamGraph, mock(Config.class), 
mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, 
mock(Clock.class));
 
     // verify that join function is initialized once.
     verify(mockJoinFunction, times(1)).init(any(Config.class), 
any(TaskContextImpl.class));
@@ -388,29 +396,30 @@ public class TestOperatorImplGraph {
     when(runner.getStreamSpec("output2")).thenReturn(output2);
 
     // intermediate streams used in tests
-    StreamSpec int1 = new StreamSpec("test-app-1-partition_by-10", 
"test-app-1-partition_by-10", "default-system");
-    StreamSpec int2 = new StreamSpec("test-app-1-partition_by-6", 
"test-app-1-partition_by-6", "default-system");
-    when(runner.getStreamSpec("test-app-1-partition_by-10"))
-        .thenReturn(int1);
-    when(runner.getStreamSpec("test-app-1-partition_by-6"))
-        .thenReturn(int2);
+    StreamSpec int1 = new StreamSpec("test-app-1-partition_by-p2", 
"test-app-1-partition_by-p2", "default-system");
+    StreamSpec int2 = new StreamSpec("test-app-1-partition_by-p1", 
"test-app-1-partition_by-p1", "default-system");
+    when(runner.getStreamSpec("test-app-1-partition_by-p2")).thenReturn(int1);
+    when(runner.getStreamSpec("test-app-1-partition_by-p1")).thenReturn(int2);
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
     MessageStream messageStream1 = streamGraph.getInputStream("input1").map(m 
-> m);
     MessageStream messageStream2 = 
streamGraph.getInputStream("input2").filter(m -> true);
     MessageStream messageStream3 =
-        streamGraph.getInputStream("input3").filter(m -> true).partitionBy(m 
-> "hehe", m -> m).map(m -> m);
+        streamGraph.getInputStream("input3")
+            .filter(m -> true)
+            .partitionBy(m -> "hehe", m -> m, "p1")
+            .map(m -> m);
     OutputStream<Object> outputStream1 = 
streamGraph.getOutputStream("output1");
     OutputStream<Object> outputStream2 = 
streamGraph.getOutputStream("output2");
 
     messageStream1
         .join(messageStream2, mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(2))
-        .partitionBy(m -> "haha", m -> m)
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(2), "j1")
+        .partitionBy(m -> "haha", m -> m, "p2")
         .sendTo(outputStream1);
     messageStream3
         .join(messageStream2, mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1))
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1), "j2")
         .sendTo(outputStream2);
 
     Multimap<SystemStream, SystemStream> outputToInput = 
OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph);

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
index f1fb8e2..65f1dc6 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
@@ -49,7 +49,7 @@ public class TestWindowOperatorSpec {
     window.setEarlyTrigger(earlyTrigger);
     window.setLateTrigger(lateTrigger);
 
-    WindowOperatorSpec spec = new WindowOperatorSpec(window, 0);
+    WindowOperatorSpec spec = new WindowOperatorSpec(window, "0");
     Assert.assertEquals(spec.getDefaultTriggerMs(), 5);
   }
 
@@ -62,7 +62,7 @@ public class TestWindowOperatorSpec {
             null, WindowType.SESSION, null, null, mock(Serde.class));
     window.setEarlyTrigger(earlyTrigger);
 
-    WindowOperatorSpec spec = new WindowOperatorSpec(window, 0);
+    WindowOperatorSpec spec = new WindowOperatorSpec(window, "0");
     Assert.assertEquals(spec.getDefaultTriggerMs(), 150);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index d2f0184..29c509d 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -93,7 +93,7 @@ public class EndOfStreamIntegrationTest extends 
AbstractIntegrationTestHarness {
     final StreamApplication app = (streamGraph, cfg) -> {
       streamGraph.<KV<String, PageView>>getInputStream("PageView")
         .map(Values.create())
-        .partitionBy(pv -> pv.getMemberId(), pv -> pv)
+        .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1")
         .sink((m, collector, coordinator) -> {
             received.add(m.getValue());
           });

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 7da0e77..dda3d24 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -145,7 +145,7 @@ public class WatermarkIntegrationTest extends 
AbstractIntegrationTestHarness {
     final StreamApplication app = (streamGraph, cfg) -> {
       streamGraph.<KV<String, PageView>>getInputStream("PageView")
           .map(EndOfStreamIntegrationTest.Values.create())
-          .partitionBy(pv -> pv.getMemberId(), pv -> pv)
+          .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1")
           .sink((m, collector, coordinator) -> {
               received.add(m.getValue());
             });

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index e35dfb7..346e958 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -52,23 +52,26 @@ public class RepartitionJoinWindowApp implements 
StreamApplication {
         graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), 
new StringSerde()));
 
     MessageStream<PageView> pageViewsRepartitionedByViewId = pageViews
-        .partitionBy(PageView::getViewId, pv -> pv, new KVSerde<>(new 
StringSerde(), new JsonSerdeV2<>(PageView.class)))
+        .partitionBy(PageView::getViewId, pv -> pv,
+            new KVSerde<>(new StringSerde(), new 
JsonSerdeV2<>(PageView.class)), "pageViewsByViewId")
         .map(KV::getValue);
 
     MessageStream<AdClick> adClicksRepartitionedByViewId = adClicks
-        .partitionBy(AdClick::getViewId, ac -> ac, new KVSerde<>(new 
StringSerde(), new JsonSerdeV2<>(AdClick.class)))
+        .partitionBy(AdClick::getViewId, ac -> ac,
+            new KVSerde<>(new StringSerde(), new 
JsonSerdeV2<>(AdClick.class)), "adClicksByViewId")
         .map(KV::getValue);
 
     MessageStream<UserPageAdClick> userPageAdClicks = 
pageViewsRepartitionedByViewId
         .join(adClicksRepartitionedByViewId, new UserPageViewAdClicksJoiner(),
             new StringSerde(), new JsonSerdeV2<>(PageView.class), new 
JsonSerdeV2<>(AdClick.class),
-            Duration.ofMinutes(1));
+            Duration.ofMinutes(1), "pageViewAdClickJoin");
 
     userPageAdClicks
         .partitionBy(UserPageAdClick::getUserId, upac -> upac,
-            KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(UserPageAdClick.class)))
+            KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(UserPageAdClick.class)), "userPageAdClicksByUserId")
         .map(KV::getValue)
-        .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, 
Duration.ofSeconds(3), new StringSerde(), new 
JsonSerdeV2<>(UserPageAdClick.class)))
+        .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, 
Duration.ofSeconds(3),
+            new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), 
"userAdClickWindow")
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), 
String.valueOf(windowPane.getMessage().size())))
         .sendTo(outputStream);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java 
b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 6410e7d..997127e 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -50,8 +50,8 @@ public class SessionWindowApp implements StreamApplication {
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))
-        .window(Windows.keyedSessionWindow(PageView::getUserId, 
Duration.ofSeconds(3), new StringSerde(),
-            new JsonSerdeV2<>(PageView.class)))
+        .window(Windows.keyedSessionWindow(PageView::getUserId, 
Duration.ofSeconds(3),
+            new StringSerde(), new JsonSerdeV2<>(PageView.class)), 
"sessionWindow")
         .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
         .sendTo(outputStream);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index 5d04f21..5d2a17c 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -51,7 +51,8 @@ public class TumblingWindowApp implements StreamApplication {
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))
-        .window(Windows.keyedTumblingWindow(PageView::getUserId, 
Duration.ofSeconds(3), null, null))
+        .window(Windows.keyedTumblingWindow(PageView::getUserId, 
Duration.ofSeconds(3),
+            new StringSerde(), new JsonSerdeV2<>(PageView.class)), 
"tumblingWindow")
         .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
         .sendTo(outputStream);
   }

Reply via email to