http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 779d299..61289af 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -24,12 +24,18 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.LegacyTaskApplication;
+import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.TaskApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -37,9 +43,13 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.KVSerde;
@@ -54,8 +64,12 @@ import org.apache.samza.testUtils.StreamTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class TestExecutionPlanner {
@@ -63,6 +77,11 @@ public class TestExecutionPlanner {
   private static final String DEFAULT_SYSTEM = "test-system";
   private static final int DEFAULT_PARTITIONS = 10;
 
+  private final Set<SystemDescriptor> systemDescriptors = new HashSet<>();
+  private final Map<String, InputDescriptor> inputDescriptors = new 
HashMap<>();
+  private final Map<String, OutputDescriptor> outputDescriptors = new 
HashMap<>();
+  private final Set<TableDescriptor> tableDescriptors = new HashSet<>();
+
   private SystemAdmins systemAdmins;
   private StreamManager streamManager;
   private Config config;
@@ -78,6 +97,8 @@ public class TestExecutionPlanner {
   private GenericOutputDescriptor<KV<Object, Object>> output1Descriptor;
   private StreamSpec output2Spec;
   private GenericOutputDescriptor<KV<Object, Object>> output2Descriptor;
+  private GenericSystemDescriptor system1Descriptor;
+  private GenericSystemDescriptor system2Descriptor;
 
   static SystemAdmin createSystemAdmin(Map<String, Integer> 
streamToPartitions) {
 
@@ -236,20 +257,35 @@ public class TestExecutionPlanner {
 
     KVSerde<Object, Object> kvSerde = new KVSerde<>(new NoOpSerde(), new 
NoOpSerde());
     String mockSystemFactoryClass = "factory.class.name";
-    GenericSystemDescriptor system1 = new GenericSystemDescriptor("system1", 
mockSystemFactoryClass);
-    GenericSystemDescriptor system2 = new GenericSystemDescriptor("system2", 
mockSystemFactoryClass);
-    input1Descriptor = system1.getInputDescriptor("input1", kvSerde);
-    input2Descriptor = system2.getInputDescriptor("input2", kvSerde);
-    input3Descriptor = system2.getInputDescriptor("input3", kvSerde);
-    input4Descriptor = system1.getInputDescriptor("input4", kvSerde);
-    output1Descriptor = system1.getOutputDescriptor("output1", kvSerde);
-    output2Descriptor = system2.getOutputDescriptor("output2", kvSerde);
+    system1Descriptor = new GenericSystemDescriptor("system1", 
mockSystemFactoryClass);
+    system2Descriptor = new GenericSystemDescriptor("system2", 
mockSystemFactoryClass);
+    input1Descriptor = system1Descriptor.getInputDescriptor("input1", kvSerde);
+    input2Descriptor = system2Descriptor.getInputDescriptor("input2", kvSerde);
+    input3Descriptor = system2Descriptor.getInputDescriptor("input3", kvSerde);
+    input4Descriptor = system1Descriptor.getInputDescriptor("input4", kvSerde);
+    output1Descriptor = system1Descriptor.getOutputDescriptor("output1", 
kvSerde);
+    output2Descriptor = system2Descriptor.getOutputDescriptor("output2", 
kvSerde);
+
+    // clean and set up sets and maps of descriptors
+    systemDescriptors.clear();
+    inputDescriptors.clear();
+    outputDescriptors.clear();
+    tableDescriptors.clear();
+    systemDescriptors.add(system1Descriptor);
+    systemDescriptors.add(system2Descriptor);
+    inputDescriptors.put(input1Descriptor.getStreamId(), input1Descriptor);
+    inputDescriptors.put(input2Descriptor.getStreamId(), input2Descriptor);
+    inputDescriptors.put(input3Descriptor.getStreamId(), input3Descriptor);
+    inputDescriptors.put(input4Descriptor.getStreamId(), input4Descriptor);
+    outputDescriptors.put(output1Descriptor.getStreamId(), output1Descriptor);
+    outputDescriptors.put(output2Descriptor.getStreamId(), output2Descriptor);
+
 
     // set up external partition count
     Map<String, Integer> system1Map = new HashMap<>();
     system1Map.put("input1", 64);
     system1Map.put("output1", 8);
-    system1Map.put("input4", ExecutionPlanner.MAX_INFERRED_PARTITIONS * 2);
+    system1Map.put("input4", IntermediateStreamManager.MAX_INFERRED_PARTITIONS 
* 2);
     Map<String, Integer> system2Map = new HashMap<>();
     system2Map.put("input2", 16);
     system2Map.put("input3", 32);
@@ -268,7 +304,7 @@ public class TestExecutionPlanner {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
 
-    JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
+    JobGraph jobGraph = planner.createJobGraph(graphSpec.getConfig(), 
graphSpec);
     assertTrue(jobGraph.getInputStreams().size() == 3);
     assertTrue(jobGraph.getOutputStreams().size() == 2);
     assertTrue(jobGraph.getIntermediateStreams().size() == 2); // two streams 
generated by partitionBy
@@ -278,9 +314,9 @@ public class TestExecutionPlanner {
   public void testFetchExistingStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
-    JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
+    JobGraph jobGraph = planner.createJobGraph(graphSpec.getConfig(), 
graphSpec);
 
-    planner.fetchInputAndOutputStreamPartitions(jobGraph);
+    ExecutionPlanner.setInputAndOutputStreamPartitionCount(jobGraph, 
streamManager);
     assertTrue(jobGraph.getOrCreateStreamEdge(input1Spec).getPartitionCount() 
== 64);
     assertTrue(jobGraph.getOrCreateStreamEdge(input2Spec).getPartitionCount() 
== 16);
     assertTrue(jobGraph.getOrCreateStreamEdge(input3Spec).getPartitionCount() 
== 32);
@@ -296,7 +332,10 @@ public class TestExecutionPlanner {
   public void testCalculateJoinInputPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
-    JobGraph jobGraph = (JobGraph) 
planner.plan(graphSpec.getOperatorSpecGraph());
+    JobGraph jobGraph = planner.createJobGraph(graphSpec.getConfig(), 
graphSpec);
+
+    ExecutionPlanner.setInputAndOutputStreamPartitionCount(jobGraph, 
streamManager);
+    new IntermediateStreamManager(config, 
graphSpec).calculatePartitions(jobGraph);
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -309,7 +348,7 @@ public class TestExecutionPlanner {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamApplicationDescriptorImpl graphSpec = 
createStreamGraphWithInvalidJoin();
 
-    planner.plan(graphSpec.getOperatorSpecGraph());
+    planner.plan(graphSpec);
   }
 
   @Test
@@ -320,7 +359,7 @@ public class TestExecutionPlanner {
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
-    JobGraph jobGraph = (JobGraph) 
planner.plan(graphSpec.getOperatorSpecGraph());
+    JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -336,7 +375,7 @@ public class TestExecutionPlanner {
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
-    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
+    ExecutionPlan plan = planner.plan(graphSpec);
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     for (JobConfig config : jobConfigs) {
       System.out.println(config);
@@ -351,7 +390,7 @@ public class TestExecutionPlanner {
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
     StreamApplicationDescriptorImpl graphSpec = 
createStreamGraphWithJoinAndWindow();
-    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
+    ExecutionPlan plan = planner.plan(graphSpec);
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
 
@@ -368,7 +407,7 @@ public class TestExecutionPlanner {
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
     StreamApplicationDescriptorImpl graphSpec = 
createStreamGraphWithJoinAndWindow();
-    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
+    ExecutionPlan plan = planner.plan(graphSpec);
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
 
@@ -384,7 +423,7 @@ public class TestExecutionPlanner {
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
-    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
+    ExecutionPlan plan = planner.plan(graphSpec);
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
     assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS()));
@@ -399,7 +438,7 @@ public class TestExecutionPlanner {
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
-    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
+    ExecutionPlan plan = planner.plan(graphSpec);
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
     assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
@@ -409,7 +448,7 @@ public class TestExecutionPlanner {
   public void testCalculateIntStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
-    JobGraph jobGraph = (JobGraph) 
planner.plan(graphSpec.getOperatorSpecGraph());
+    JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -430,15 +469,15 @@ public class TestExecutionPlanner {
     edge.setPartitionCount(16);
     edges.add(edge);
 
-    assertEquals(32, ExecutionPlanner.maxPartitions(edges));
+    assertEquals(32, IntermediateStreamManager.maxPartitions(edges));
 
     edges = Collections.emptyList();
-    assertEquals(StreamEdge.PARTITIONS_UNKNOWN, 
ExecutionPlanner.maxPartitions(edges));
+    assertEquals(StreamEdge.PARTITIONS_UNKNOWN, 
IntermediateStreamManager.maxPartitions(edges));
   }
 
   @Test
   public void testMaxPartitionLimit() throws Exception {
-    int partitionLimit = ExecutionPlanner.MAX_INFERRED_PARTITIONS;
+    int partitionLimit = IntermediateStreamManager.MAX_INFERRED_PARTITIONS;
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamApplicationDescriptorImpl graphSpec = new 
StreamApplicationDescriptorImpl(appDesc -> {
@@ -447,11 +486,99 @@ public class TestExecutionPlanner {
         input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> 
kv).sendTo(output1);
       }, config);
 
-    JobGraph jobGraph = (JobGraph) 
planner.plan(graphSpec.getOperatorSpecGraph());
+    JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
         assertEquals(partitionLimit, edge.getPartitionCount()); // max of 
input1 and output1
       });
   }
+
+  @Test
+  public void testCreateJobGraphForTaskApplication() {
+    TaskApplicationDescriptorImpl taskAppDesc = 
mock(TaskApplicationDescriptorImpl.class);
+    // add interemediate streams
+    String intermediateStream1 = "intermediate-stream1";
+    String intermediateBroadcast = "intermediate-broadcast1";
+    // intermediate stream1, not broadcast
+    GenericInputDescriptor<KV<Object, Object>> intermediateInput1 = 
system1Descriptor.getInputDescriptor(
+        intermediateStream1, new KVSerde<>(new NoOpSerde(), new NoOpSerde()));
+    GenericOutputDescriptor<KV<Object, Object>> intermediateOutput1 = 
system1Descriptor.getOutputDescriptor(
+        intermediateStream1, new KVSerde<>(new NoOpSerde(), new NoOpSerde()));
+    // intermediate stream2, broadcast
+    GenericInputDescriptor<KV<Object, Object>> intermediateBroacastInput1 = 
system1Descriptor.getInputDescriptor(
+        intermediateBroadcast, new KVSerde<>(new NoOpSerde<>(), new 
NoOpSerde<>()));
+    GenericOutputDescriptor<KV<Object, Object>> intermediateBroacastOutput1 = 
system1Descriptor.getOutputDescriptor(
+        intermediateBroadcast, new KVSerde<>(new NoOpSerde<>(), new 
NoOpSerde<>()));
+    inputDescriptors.put(intermediateStream1, intermediateInput1);
+    outputDescriptors.put(intermediateStream1, intermediateOutput1);
+    inputDescriptors.put(intermediateBroadcast, intermediateBroacastInput1);
+    outputDescriptors.put(intermediateBroadcast, intermediateBroacastOutput1);
+    Set<String> broadcastStreams = new HashSet<>();
+    broadcastStreams.add(intermediateBroadcast);
+
+    when(taskAppDesc.getInputDescriptors()).thenReturn(inputDescriptors);
+    
when(taskAppDesc.getInputStreamIds()).thenReturn(inputDescriptors.keySet());
+    when(taskAppDesc.getOutputDescriptors()).thenReturn(outputDescriptors);
+    
when(taskAppDesc.getOutputStreamIds()).thenReturn(outputDescriptors.keySet());
+    when(taskAppDesc.getTableDescriptors()).thenReturn(Collections.emptySet());
+    when(taskAppDesc.getSystemDescriptors()).thenReturn(systemDescriptors);
+    when(taskAppDesc.getBroadcastStreams()).thenReturn(broadcastStreams);
+    doReturn(MockTaskApplication.class).when(taskAppDesc).getAppClass();
+
+    Map<String, String> systemStreamConfigs = new HashMap<>();
+    inputDescriptors.forEach((key, value) -> 
systemStreamConfigs.putAll(value.toConfig()));
+    outputDescriptors.forEach((key, value) -> 
systemStreamConfigs.putAll(value.toConfig()));
+    systemDescriptors.forEach(sd -> systemStreamConfigs.putAll(sd.toConfig()));
+
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+    JobGraph jobGraph = planner.createJobGraph(config, taskAppDesc);
+    assertEquals(1, jobGraph.getJobNodes().size());
+    assertTrue(jobGraph.getInputStreams().stream().map(edge -> edge.getName())
+        .filter(streamId -> 
inputDescriptors.containsKey(streamId)).collect(Collectors.toList()).isEmpty());
+    Set<String> intermediateStreams = new HashSet<>(inputDescriptors.keySet());
+    jobGraph.getInputStreams().forEach(edge -> {
+        if (intermediateStreams.contains(edge.getStreamSpec().getId())) {
+          intermediateStreams.remove(edge.getStreamSpec().getId());
+        }
+      });
+    assertEquals(new HashSet<String>() { { this.add(intermediateStream1); 
this.add(intermediateBroadcast); } }.toArray(),
+        intermediateStreams.toArray());
+  }
+
+  @Test
+  public void testCreateJobGraphForLegacyTaskApplication() {
+    TaskApplicationDescriptorImpl taskAppDesc = 
mock(TaskApplicationDescriptorImpl.class);
+
+    when(taskAppDesc.getInputDescriptors()).thenReturn(new HashMap<>());
+    when(taskAppDesc.getOutputDescriptors()).thenReturn(new HashMap<>());
+    when(taskAppDesc.getTableDescriptors()).thenReturn(new HashSet<>());
+    when(taskAppDesc.getSystemDescriptors()).thenReturn(new HashSet<>());
+    when(taskAppDesc.getBroadcastStreams()).thenReturn(new HashSet<>());
+    doReturn(LegacyTaskApplication.class).when(taskAppDesc).getAppClass();
+
+    Map<String, String> systemStreamConfigs = new HashMap<>();
+    inputDescriptors.forEach((key, value) -> 
systemStreamConfigs.putAll(value.toConfig()));
+    outputDescriptors.forEach((key, value) -> 
systemStreamConfigs.putAll(value.toConfig()));
+    systemDescriptors.forEach(sd -> systemStreamConfigs.putAll(sd.toConfig()));
+
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+    JobGraph jobGraph = planner.createJobGraph(config, taskAppDesc);
+    assertEquals(1, jobGraph.getJobNodes().size());
+    JobNode jobNode = jobGraph.getJobNodes().get(0);
+    assertEquals("test-app", jobNode.getJobName());
+    assertEquals("test-app-1", jobNode.getJobNameAndId());
+    assertEquals(0, jobNode.getInEdges().size());
+    assertEquals(0, jobNode.getOutEdges().size());
+    assertEquals(0, jobNode.getTables().size());
+    assertEquals(config, jobNode.getConfig());
+  }
+
+  public static class MockTaskApplication implements SamzaApplication {
+
+    @Override
+    public void describe(ApplicationDescriptor appDesc) {
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java
new file mode 100644
index 0000000..bc15709
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for {@link IntermediateStreamManager}
+ */
+public class TestIntermediateStreamManager extends ExecutionPlannerTestBase {
+
+  @Test
+  public void testCalculateRepartitionJoinTopicPartitions() {
+    mockStreamAppDesc = new 
StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), 
mockConfig);
+    IntermediateStreamManager partitionPlanner = new 
IntermediateStreamManager(mockConfig, mockStreamAppDesc);
+    JobGraph mockGraph = new ExecutionPlanner(mockConfig, 
mock(StreamManager.class))
+        .createJobGraph(mockConfig, mockStreamAppDesc);
+    // set the input stream partitions
+    mockGraph.getInputStreams().forEach(inEdge -> {
+        if 
(inEdge.getStreamSpec().getId().equals(input1Descriptor.getStreamId())) {
+          inEdge.setPartitionCount(6);
+        } else if 
(inEdge.getStreamSpec().getId().equals(input2Descriptor.getStreamId())) {
+          inEdge.setPartitionCount(5);
+        }
+      });
+    partitionPlanner.calculatePartitions(mockGraph);
+    assertEquals(1, mockGraph.getIntermediateStreamEdges().size());
+    assertEquals(5, mockGraph.getIntermediateStreamEdges().stream()
+        .filter(inEdge -> 
inEdge.getStreamSpec().getId().equals(intermediateInputDescriptor.getStreamId()))
+        .findFirst().get().getPartitionCount());
+  }
+
+  @Test
+  public void testCalculateRepartitionIntermediateTopicPartitions() {
+    mockStreamAppDesc = new 
StreamApplicationDescriptorImpl(getRepartitionOnlyStreamApplication(), 
mockConfig);
+    IntermediateStreamManager partitionPlanner = new 
IntermediateStreamManager(mockConfig, mockStreamAppDesc);
+    JobGraph mockGraph = new ExecutionPlanner(mockConfig, 
mock(StreamManager.class))
+        .createJobGraph(mockConfig, mockStreamAppDesc);
+    // set the input stream partitions
+    mockGraph.getInputStreams().forEach(inEdge -> inEdge.setPartitionCount(7));
+    partitionPlanner.calculatePartitions(mockGraph);
+    assertEquals(1, mockGraph.getIntermediateStreamEdges().size());
+    assertEquals(7, mockGraph.getIntermediateStreamEdges().stream()
+        .filter(inEdge -> 
inEdge.getStreamSpec().getId().equals(intermediateInputDescriptor.getStreamId()))
+        .findFirst().get().getPartitionCount());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
index ed35d67..4de0485 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
@@ -19,12 +19,11 @@
 
 package org.apache.samza.execution;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Before;
 import org.junit.Test;
@@ -32,7 +31,6 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 
 public class TestJobGraph {
@@ -61,9 +59,8 @@ public class TestJobGraph {
    * 2 9 10
    */
   private void createGraph1() {
-    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
-    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
-    graph1 = new JobGraph(null, specGraph);
+    StreamApplicationDescriptorImpl appDesc = 
mock(StreamApplicationDescriptorImpl.class);
+    graph1 = new JobGraph(null, appDesc);
 
     JobNode n2 = graph1.getOrCreateJobNode("2", "1");
     JobNode n3 = graph1.getOrCreateJobNode("3", "1");
@@ -96,9 +93,8 @@ public class TestJobGraph {
    *      |<---6 <--|    <>
    */
   private void createGraph2() {
-    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
-    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
-    graph2 = new JobGraph(null, specGraph);
+    StreamApplicationDescriptorImpl appDesc = 
mock(StreamApplicationDescriptorImpl.class);
+    graph2 = new JobGraph(null, appDesc);
 
     JobNode n1 = graph2.getOrCreateJobNode("1", "1");
     JobNode n2 = graph2.getOrCreateJobNode("2", "1");
@@ -125,9 +121,8 @@ public class TestJobGraph {
    * 1<->1 -> 2<->2
    */
   private void createGraph3() {
-    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
-    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
-    graph3 = new JobGraph(null, specGraph);
+    StreamApplicationDescriptorImpl appDesc = 
mock(StreamApplicationDescriptorImpl.class);
+    graph3 = new JobGraph(null, appDesc);
 
     JobNode n1 = graph3.getOrCreateJobNode("1", "1");
     JobNode n2 = graph3.getOrCreateJobNode("2", "1");
@@ -143,9 +138,8 @@ public class TestJobGraph {
    * 1<->1
    */
   private void createGraph4() {
-    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
-    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
-    graph4 = new JobGraph(null, specGraph);
+    StreamApplicationDescriptorImpl appDesc = 
mock(StreamApplicationDescriptorImpl.class);
+    graph4 = new JobGraph(null, appDesc);
 
     JobNode n1 = graph4.getOrCreateJobNode("1", "1");
 
@@ -163,9 +157,8 @@ public class TestJobGraph {
 
   @Test
   public void testAddSource() {
-    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
-    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
-    JobGraph graph = new JobGraph(null, specGraph);
+    StreamApplicationDescriptorImpl appDesc = 
mock(StreamApplicationDescriptorImpl.class);
+    JobGraph graph = new JobGraph(null, appDesc);
 
     /**
      * s1 -> 1
@@ -206,9 +199,8 @@ public class TestJobGraph {
      * 2 -> s2
      * 2 -> s3
      */
-    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
-    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
-    JobGraph graph = new JobGraph(null, specGraph);
+    StreamApplicationDescriptorImpl appDesc = 
mock(StreamApplicationDescriptorImpl.class);
+    JobGraph graph = new JobGraph(null, appDesc);
     JobNode n1 = graph.getOrCreateJobNode("1", "1");
     JobNode n2 = graph.getOrCreateJobNode("2", "1");
     StreamSpec s1 = genStream();

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index ae6e25e..c207118 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -20,9 +20,14 @@
 package org.apache.samza.execution;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -40,10 +45,13 @@ import org.apache.samza.serializers.LongSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.testUtils.StreamTestUtils;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.hamcrest.Matchers;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.samza.execution.TestExecutionPlanner.*;
@@ -51,16 +59,68 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 
+/**
+ * Unit test for {@link JobGraphJsonGenerator}
+ */
 public class TestJobGraphJsonGenerator {
+  private Config mockConfig;
+  private JobNode mockJobNode;
+  private StreamSpec input1Spec;
+  private StreamSpec input2Spec;
+  private StreamSpec outputSpec;
+  private StreamSpec repartitionSpec;
+  private KVSerde<String, Object> defaultSerde;
+  private GenericSystemDescriptor inputSystemDescriptor;
+  private GenericSystemDescriptor outputSystemDescriptor;
+  private GenericSystemDescriptor intermediateSystemDescriptor;
+  private GenericInputDescriptor<KV<String, Object>> input1Descriptor;
+  private GenericInputDescriptor<KV<String, Object>> input2Descriptor;
+  private GenericOutputDescriptor<KV<String, Object>> outputDescriptor;
 
-  public class PageViewEvent {
-    String getCountry() {
-      return "";
-    }
+  @Before
+  public void setUp() {
+    input1Spec = new StreamSpec("input1", "input1", "input-system");
+    input2Spec = new StreamSpec("input2", "input2", "input-system");
+    outputSpec = new StreamSpec("output", "output", "output-system");
+    repartitionSpec =
+        new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", 
"intermediate-system");
+
+
+    defaultSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>());
+    inputSystemDescriptor = new GenericSystemDescriptor("input-system", 
"mockSystemFactoryClassName");
+    outputSystemDescriptor = new GenericSystemDescriptor("output-system", 
"mockSystemFactoryClassName");
+    intermediateSystemDescriptor = new 
GenericSystemDescriptor("intermediate-system", "mockSystemFactoryClassName");
+    input1Descriptor = inputSystemDescriptor.getInputDescriptor("input1", 
defaultSerde);
+    input2Descriptor = inputSystemDescriptor.getInputDescriptor("input2", 
defaultSerde);
+    outputDescriptor = outputSystemDescriptor.getOutputDescriptor("output", 
defaultSerde);
+
+    Map<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "jobName");
+    configs.put(JobConfig.JOB_ID(), "jobId");
+    mockConfig = spy(new MapConfig(configs));
+
+    mockJobNode = mock(JobNode.class);
+    StreamEdge input1Edge = new StreamEdge(input1Spec, false, false, 
mockConfig);
+    StreamEdge input2Edge = new StreamEdge(input2Spec, false, false, 
mockConfig);
+    StreamEdge outputEdge = new StreamEdge(outputSpec, false, false, 
mockConfig);
+    StreamEdge repartitionEdge = new StreamEdge(repartitionSpec, true, false, 
mockConfig);
+    Map<String, StreamEdge> inputEdges = new HashMap<>();
+    inputEdges.put(input1Descriptor.getStreamId(), input1Edge);
+    inputEdges.put(input2Descriptor.getStreamId(), input2Edge);
+    inputEdges.put(repartitionSpec.getId(), repartitionEdge);
+    Map<String, StreamEdge> outputEdges = new HashMap<>();
+    outputEdges.put(outputDescriptor.getStreamId(), outputEdge);
+    outputEdges.put(repartitionSpec.getId(), repartitionEdge);
+    when(mockJobNode.getInEdges()).thenReturn(inputEdges);
+    when(mockJobNode.getOutEdges()).thenReturn(outputEdges);
+    when(mockJobNode.getConfig()).thenReturn(mockConfig);
+    when(mockJobNode.getJobName()).thenReturn("jobName");
+    when(mockJobNode.getJobId()).thenReturn("jobId");
+    
when(mockJobNode.getJobNameAndId()).thenReturn(JobNode.createJobNameAndId("jobName",
 "jobId"));
   }
 
   @Test
-  public void test() throws Exception {
+  public void testRepartitionedJoinStreamApplication() throws Exception {
 
     /**
      * the graph looks like the following.
@@ -142,7 +202,7 @@ public class TestJobGraphJsonGenerator {
       }, config);
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
+    ExecutionPlan plan = planner.plan(graphSpec);
     String json = plan.getPlanAsJson();
     System.out.println(json);
 
@@ -157,7 +217,7 @@ public class TestJobGraphJsonGenerator {
   }
 
   @Test
-  public void test2() throws Exception {
+  public void testRepartitionedWindowStreamApplication() throws Exception {
     Map<String, String> configMap = new HashMap<>();
     configMap.put(JobConfig.JOB_NAME(), "test-app");
     configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
@@ -202,7 +262,7 @@ public class TestJobGraphJsonGenerator {
       }, config);
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
+    ExecutionPlan plan = planner.plan(graphSpec);
     String json = plan.getPlanAsJson();
     System.out.println(json);
 
@@ -222,4 +282,75 @@ public class TestJobGraphJsonGenerator {
     
assertEquals(operatorGraphJson.operators.get("test-app-1-send_to-5").get("outputStreamId"),
         "PageViewCount");
   }
+
+  @Test
+  public void testTaskApplication() throws Exception {
+    JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator();
+    JobGraph mockJobGraph = mock(JobGraph.class);
+    ApplicationConfig mockAppConfig = mock(ApplicationConfig.class);
+    when(mockAppConfig.getAppName()).thenReturn("testTaskApp");
+    when(mockAppConfig.getAppId()).thenReturn("testTaskAppId");
+    when(mockJobGraph.getApplicationConfig()).thenReturn(mockAppConfig);
+    // compute the three disjoint sets of the JobGraph: input only, output 
only, and intermediate streams
+    Set<StreamEdge> inEdges = new HashSet<>(mockJobNode.getInEdges().values());
+    Set<StreamEdge> outEdges = new 
HashSet<>(mockJobNode.getOutEdges().values());
+    Set<StreamEdge> intermediateEdges = new HashSet<>(inEdges);
+    // intermediate streams are the intersection between input and output
+    intermediateEdges.retainAll(outEdges);
+    // remove all intermediate streams from input
+    inEdges.removeAll(intermediateEdges);
+    // remove all intermediate streams from output
+    outEdges.removeAll(intermediateEdges);
+    // set the return values for mockJobGraph
+    when(mockJobGraph.getInputStreams()).thenReturn(inEdges);
+    when(mockJobGraph.getOutputStreams()).thenReturn(outEdges);
+    
when(mockJobGraph.getIntermediateStreamEdges()).thenReturn(intermediateEdges);
+    
when(mockJobGraph.getJobNodes()).thenReturn(Collections.singletonList(mockJobNode));
+    String graphJson = jsonGenerator.toJson(mockJobGraph);
+    ObjectMapper objectMapper = new ObjectMapper();
+    JobGraphJsonGenerator.JobGraphJson jsonObject = 
objectMapper.readValue(graphJson.getBytes(), 
JobGraphJsonGenerator.JobGraphJson.class);
+    assertEquals("testTaskAppId", jsonObject.applicationId);
+    assertEquals("testTaskApp", jsonObject.applicationName);
+    Set<String> inStreamIds = inEdges.stream().map(stream -> 
stream.getStreamSpec().getId()).collect(Collectors.toSet());
+    assertThat(jsonObject.sourceStreams.keySet(), 
Matchers.containsInAnyOrder(inStreamIds.toArray()));
+    Set<String> outStreamIds = outEdges.stream().map(stream -> 
stream.getStreamSpec().getId()).collect(Collectors.toSet());
+    assertThat(jsonObject.sinkStreams.keySet(), 
Matchers.containsInAnyOrder(outStreamIds.toArray()));
+    Set<String> intStreamIds = intermediateEdges.stream().map(stream -> 
stream.getStreamSpec().getId()).collect(Collectors.toSet());
+    assertThat(jsonObject.intermediateStreams.keySet(), 
Matchers.containsInAnyOrder(intStreamIds.toArray()));
+    JobGraphJsonGenerator.JobNodeJson expectedNodeJson = new 
JobGraphJsonGenerator.JobNodeJson();
+    expectedNodeJson.jobId = mockJobNode.getJobId();
+    expectedNodeJson.jobName = mockJobNode.getJobName();
+    assertEquals(1, jsonObject.jobs.size());
+    JobGraphJsonGenerator.JobNodeJson actualNodeJson = jsonObject.jobs.get(0);
+    assertEquals(expectedNodeJson.jobId, actualNodeJson.jobId);
+    assertEquals(expectedNodeJson.jobName, actualNodeJson.jobName);
+    assertEquals(3, actualNodeJson.operatorGraph.inputStreams.size());
+    assertEquals(2, actualNodeJson.operatorGraph.outputStreams.size());
+    assertEquals(0, actualNodeJson.operatorGraph.operators.size());
+  }
+
+  @Test
+  public void testLegacyTaskApplication() throws Exception {
+    JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator();
+    JobGraph mockJobGraph = mock(JobGraph.class);
+    ApplicationConfig mockAppConfig = mock(ApplicationConfig.class);
+    when(mockAppConfig.getAppName()).thenReturn("testTaskApp");
+    when(mockAppConfig.getAppId()).thenReturn("testTaskAppId");
+    when(mockJobGraph.getApplicationConfig()).thenReturn(mockAppConfig);
+    String graphJson = jsonGenerator.toJson(mockJobGraph);
+    ObjectMapper objectMapper = new ObjectMapper();
+    JobGraphJsonGenerator.JobGraphJson jsonObject = 
objectMapper.readValue(graphJson.getBytes(), 
JobGraphJsonGenerator.JobGraphJson.class);
+    assertEquals("testTaskAppId", jsonObject.applicationId);
+    assertEquals("testTaskApp", jsonObject.applicationName);
+    JobGraphJsonGenerator.JobNodeJson expectedNodeJson = new 
JobGraphJsonGenerator.JobNodeJson();
+    expectedNodeJson.jobId = mockJobNode.getJobId();
+    expectedNodeJson.jobName = mockJobNode.getJobName();
+    assertEquals(0, jsonObject.jobs.size());
+  }
+
+  public class PageViewEvent {
+    String getCountry() {
+      return "";
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
deleted file mode 100644
index 163b094..0000000
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.execution;
-
-import java.time.Duration;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.SerializerConfig;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
-import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.impl.store.TimestampedValueSerde;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerializableSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.StreamSpec;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
-
-public class TestJobNode {
-
-  @Test
-  public void testAddSerdeConfigs() {
-    StreamSpec input1Spec = new StreamSpec("input1", "input1", "input-system");
-    StreamSpec input2Spec = new StreamSpec("input2", "input2", "input-system");
-    StreamSpec outputSpec = new StreamSpec("output", "output", 
"output-system");
-    StreamSpec partitionBySpec =
-        new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", 
"intermediate-system");
-
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("jobId");
-
-    StreamApplicationDescriptorImpl graphSpec = new 
StreamApplicationDescriptorImpl(appDesc -> {
-        KVSerde<String, Object> serde = KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>());
-        GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", 
"mockSystemFactoryClass");
-        GenericInputDescriptor<KV<String, Object>> inputDescriptor1 = 
sd.getInputDescriptor("input1", serde);
-        GenericInputDescriptor<KV<String, Object>> inputDescriptor2 = 
sd.getInputDescriptor("input2", serde);
-        GenericOutputDescriptor<KV<String, Object>> outputDescriptor = 
sd.getOutputDescriptor("output", serde);
-        MessageStream<KV<String, Object>> input1 = 
appDesc.getInputStream(inputDescriptor1);
-        MessageStream<KV<String, Object>> input2 = 
appDesc.getInputStream(inputDescriptor2);
-        OutputStream<KV<String, Object>> output = 
appDesc.getOutputStream(outputDescriptor);
-        JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = 
mock(JoinFunction.class);
-        input1
-            .partitionBy(KV::getKey, KV::getValue, serde, "p1")
-            .map(kv -> kv.value)
-            .join(input2.map(kv -> kv.value), mockJoinFn,
-                new StringSerde(), new JsonSerdeV2<>(Object.class), new 
JsonSerdeV2<>(Object.class),
-                Duration.ofHours(1), "j1")
-            .sendTo(output);
-      }, mockConfig);
-
-    JobNode jobNode = new JobNode("jobName", "jobId", 
graphSpec.getOperatorSpecGraph(), mockConfig);
-    Config config = new MapConfig();
-    StreamEdge input1Edge = new StreamEdge(input1Spec, false, false, config);
-    StreamEdge input2Edge = new StreamEdge(input2Spec, false, false, config);
-    StreamEdge outputEdge = new StreamEdge(outputSpec, false, false, config);
-    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, false, 
config);
-    jobNode.addInEdge(input1Edge);
-    jobNode.addInEdge(input2Edge);
-    jobNode.addOutEdge(outputEdge);
-    jobNode.addInEdge(repartitionEdge);
-    jobNode.addOutEdge(repartitionEdge);
-
-    Map<String, String> configs = new HashMap<>();
-    jobNode.addSerdeConfigs(configs);
-
-    MapConfig mapConfig = new MapConfig(configs);
-    Config serializers = mapConfig.subset("serializers.registry.", true);
-
-    // make sure that the serializers deserialize correctly
-    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
-    Map<String, Serde> deserializedSerdes = 
serializers.entrySet().stream().collect(Collectors.toMap(
-        e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), 
""),
-        e -> 
serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
-    ));
-    assertEquals(5, serializers.size()); // 2 default + 3 specific for join
-
-    String input1KeySerde = mapConfig.get("streams.input1.samza.key.serde");
-    String input1MsgSerde = mapConfig.get("streams.input1.samza.msg.serde");
-    assertTrue("Serialized serdes should contain input1 key serde",
-        deserializedSerdes.containsKey(input1KeySerde));
-    assertTrue("Serialized input1 key serde should be a StringSerde",
-        input1KeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue("Serialized serdes should contain input1 msg serde",
-        deserializedSerdes.containsKey(input1MsgSerde));
-    assertTrue("Serialized input1 msg serde should be a JsonSerdeV2",
-        input1MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
-
-    String input2KeySerde = mapConfig.get("streams.input2.samza.key.serde");
-    String input2MsgSerde = mapConfig.get("streams.input2.samza.msg.serde");
-    assertTrue("Serialized serdes should contain input2 key serde",
-        deserializedSerdes.containsKey(input2KeySerde));
-    assertTrue("Serialized input2 key serde should be a StringSerde",
-        input2KeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue("Serialized serdes should contain input2 msg serde",
-        deserializedSerdes.containsKey(input2MsgSerde));
-    assertTrue("Serialized input2 msg serde should be a JsonSerdeV2",
-        input2MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
-
-    String outputKeySerde = mapConfig.get("streams.output.samza.key.serde");
-    String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde");
-    assertTrue("Serialized serdes should contain output key serde",
-        deserializedSerdes.containsKey(outputKeySerde));
-    assertTrue("Serialized output key serde should be a StringSerde",
-        outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue("Serialized serdes should contain output msg serde",
-        deserializedSerdes.containsKey(outputMsgSerde));
-    assertTrue("Serialized output msg serde should be a JsonSerdeV2",
-        outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
-
-    String partitionByKeySerde = 
mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde");
-    String partitionByMsgSerde = 
mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde");
-    assertTrue("Serialized serdes should contain intermediate stream key 
serde",
-        deserializedSerdes.containsKey(partitionByKeySerde));
-    assertTrue("Serialized intermediate stream key serde should be a 
StringSerde",
-        partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue("Serialized serdes should contain intermediate stream msg 
serde",
-        deserializedSerdes.containsKey(partitionByMsgSerde));
-    assertTrue(
-        "Serialized intermediate stream msg serde should be a JsonSerdeV2",
-        partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
-
-    String leftJoinStoreKeySerde = 
mapConfig.get("stores.jobName-jobId-join-j1-L.key.serde");
-    String leftJoinStoreMsgSerde = 
mapConfig.get("stores.jobName-jobId-join-j1-L.msg.serde");
-    assertTrue("Serialized serdes should contain left join store key serde",
-        deserializedSerdes.containsKey(leftJoinStoreKeySerde));
-    assertTrue("Serialized left join store key serde should be a StringSerde",
-        leftJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue("Serialized serdes should contain left join store msg serde",
-        deserializedSerdes.containsKey(leftJoinStoreMsgSerde));
-    assertTrue("Serialized left join store msg serde should be a 
TimestampedValueSerde",
-        
leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
-
-    String rightJoinStoreKeySerde = 
mapConfig.get("stores.jobName-jobId-join-j1-R.key.serde");
-    String rightJoinStoreMsgSerde = 
mapConfig.get("stores.jobName-jobId-join-j1-R.msg.serde");
-    assertTrue("Serialized serdes should contain right join store key serde",
-        deserializedSerdes.containsKey(rightJoinStoreKeySerde));
-    assertTrue("Serialized right join store key serde should be a StringSerde",
-        rightJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue("Serialized serdes should contain right join store msg serde",
-        deserializedSerdes.containsKey(rightJoinStoreMsgSerde));
-    assertTrue("Serialized right join store msg serde should be a 
TimestampedValueSerde",
-        
rightJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
-  }
-
-  @Test
-  public void testAddSerdeConfigsForRepartitionWithNoDefaultSystem() {
-    StreamSpec inputSpec = new StreamSpec("input", "input", "input-system");
-    StreamSpec partitionBySpec =
-        new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", 
"intermediate-system");
-
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("jobId");
-
-    StreamApplicationDescriptorImpl graphSpec = new 
StreamApplicationDescriptorImpl(appDesc -> {
-        GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", 
"mockSystemFactoryClassName");
-        GenericInputDescriptor<KV<String, Object>> inputDescriptor1 =
-            sd.getInputDescriptor("input", KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>()));
-        MessageStream<KV<String, Object>> input = 
appDesc.getInputStream(inputDescriptor1);
-        input.partitionBy(KV::getKey, KV::getValue, "p1");
-      }, mockConfig);
-
-    JobNode jobNode = new JobNode("jobName", "jobId", 
graphSpec.getOperatorSpecGraph(), mockConfig);
-    Config config = new MapConfig();
-    StreamEdge input1Edge = new StreamEdge(inputSpec, false, false, config);
-    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, false, 
config);
-    jobNode.addInEdge(input1Edge);
-    jobNode.addInEdge(repartitionEdge);
-    jobNode.addOutEdge(repartitionEdge);
-
-    Map<String, String> configs = new HashMap<>();
-    jobNode.addSerdeConfigs(configs);
-
-    MapConfig mapConfig = new MapConfig(configs);
-    Config serializers = mapConfig.subset("serializers.registry.", true);
-
-    // make sure that the serializers deserialize correctly
-    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
-    Map<String, Serde> deserializedSerdes = 
serializers.entrySet().stream().collect(Collectors.toMap(
-        e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), 
""),
-        e -> 
serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
-    ));
-    assertEquals(2, serializers.size()); // 2 input stream
-
-    String partitionByKeySerde = 
mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde");
-    String partitionByMsgSerde = 
mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde");
-    assertTrue("Serialized serdes should not contain intermediate stream key 
serde",
-        !deserializedSerdes.containsKey(partitionByKeySerde));
-    assertTrue("Serialized serdes should not contain intermediate stream msg 
serde",
-        !deserializedSerdes.containsKey(partitionByMsgSerde));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
new file mode 100644
index 0000000..f351c44
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.TaskApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigRewriter;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.impl.store.TimestampedValueSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit test for {@link JobNodeConfigurationGenerator}
+ */
+public class TestJobNodeConfigurationGenerator extends 
ExecutionPlannerTestBase {
+
+  @Test
+  public void testConfigureSerdesWithRepartitionJoinApplication() {
+    mockStreamAppDesc = new 
StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), 
mockConfig);
+    configureJobNode(mockStreamAppDesc);
+    // create the JobGraphConfigureGenerator and generate the jobConfig for 
the jobNode
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"testJobGraphJson");
+
+    // Verify the results
+    Config expectedJobConfig = getExpectedJobConfig(mockConfig, 
mockJobNode.getInEdges());
+    validateJobConfig(expectedJobConfig, jobConfig);
+    // additional, check the computed window.ms for join
+    assertEquals("3600000", jobConfig.get(TaskConfig.WINDOW_MS()));
+    Map<String, Serde> deserializedSerdes = 
validateAndGetDeserializedSerdes(jobConfig, 5);
+    validateStreamConfigures(jobConfig, deserializedSerdes);
+    validateJoinStoreConfigures(jobConfig, deserializedSerdes);
+  }
+
+  @Test
+  public void testConfigureSerdesForRepartitionWithNoDefaultSystem() {
+    // set the application to RepartitionOnlyStreamApplication
+    mockStreamAppDesc = new 
StreamApplicationDescriptorImpl(getRepartitionOnlyStreamApplication(), 
mockConfig);
+    configureJobNode(mockStreamAppDesc);
+
+    // create the JobGraphConfigureGenerator and generate the jobConfig for 
the jobNode
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"testJobGraphJson");
+
+    // Verify the results
+    Config expectedJobConfig = getExpectedJobConfig(mockConfig, 
mockJobNode.getInEdges());
+    validateJobConfig(expectedJobConfig, jobConfig);
+
+    Map<String, Serde> deserializedSerdes = 
validateAndGetDeserializedSerdes(jobConfig, 2);
+    validateStreamConfigures(jobConfig, null);
+
+    String partitionByKeySerde = 
jobConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde");
+    String partitionByMsgSerde = 
jobConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde");
+    assertTrue("Serialized serdes should not contain intermediate stream key 
serde",
+        !deserializedSerdes.containsKey(partitionByKeySerde));
+    assertTrue("Serialized serdes should not contain intermediate stream msg 
serde",
+        !deserializedSerdes.containsKey(partitionByMsgSerde));
+  }
+
+  @Test
+  public void testGenerateJobConfigWithTaskApplication() {
+    // set the application to TaskApplication, which still wire up all 
input/output/intermediate streams
+    TaskApplicationDescriptorImpl taskAppDesc = new 
TaskApplicationDescriptorImpl(getTaskApplication(), mockConfig);
+    configureJobNode(taskAppDesc);
+    // create the JobGraphConfigureGenerator and generate the jobConfig for 
the jobNode
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"testJobGraphJson");
+
+    // Verify the results
+    Config expectedJobConfig = getExpectedJobConfig(mockConfig, 
mockJobNode.getInEdges());
+    validateJobConfig(expectedJobConfig, jobConfig);
+    Map<String, Serde> deserializedSerdes = 
validateAndGetDeserializedSerdes(jobConfig, 2);
+    validateStreamConfigures(jobConfig, deserializedSerdes);
+  }
+
+  @Test
+  public void testGenerateJobConfigWithLegacyTaskApplication() {
+    TaskApplicationDescriptorImpl taskAppDesc = new 
TaskApplicationDescriptorImpl(getLegacyTaskApplication(), mockConfig);
+    configureJobNode(taskAppDesc);
+    Map<String, String> originConfig = new HashMap<>(mockConfig);
+
+    // create the JobGraphConfigureGenerator and generate the jobConfig for 
the jobNode
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"");
+    // jobConfig should be exactly the same as original config
+    Map<String, String> generatedConfig = new HashMap<>(jobConfig);
+    assertEquals(originConfig, generatedConfig);
+  }
+
+  @Test
+  public void testBroadcastStreamApplication() {
+    // set the application to BroadcastStreamApplication
+    mockStreamAppDesc = new 
StreamApplicationDescriptorImpl(getBroadcastOnlyStreamApplication(defaultSerde),
 mockConfig);
+    configureJobNode(mockStreamAppDesc);
+
+    // create the JobGraphConfigureGenerator and generate the jobConfig for 
the jobNode
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"testJobGraphJson");
+    Config expectedJobConfig = getExpectedJobConfig(mockConfig, 
mockJobNode.getInEdges());
+    validateJobConfig(expectedJobConfig, jobConfig);
+    Map<String, Serde> deserializedSerdes = 
validateAndGetDeserializedSerdes(jobConfig, 2);
+    validateStreamSerdeConfigure(broadcastInputDesriptor.getStreamId(), 
jobConfig, deserializedSerdes);
+    validateIntermediateStreamConfigure(broadcastInputDesriptor.getStreamId(), 
broadcastInputDesriptor.getPhysicalName().get(), jobConfig);
+  }
+
+  @Test
+  public void testBroadcastStreamApplicationWithoutSerde() {
+    // set the application to BroadcastStreamApplication withoutSerde
+    mockStreamAppDesc = new 
StreamApplicationDescriptorImpl(getBroadcastOnlyStreamApplication(null), 
mockConfig);
+    configureJobNode(mockStreamAppDesc);
+
+    // create the JobGraphConfigureGenerator and generate the jobConfig for 
the jobNode
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"testJobGraphJson");
+    Config expectedJobConfig = getExpectedJobConfig(mockConfig, 
mockJobNode.getInEdges());
+    validateJobConfig(expectedJobConfig, jobConfig);
+    Map<String, Serde> deserializedSerdes = 
validateAndGetDeserializedSerdes(jobConfig, 2);
+    validateIntermediateStreamConfigure(broadcastInputDesriptor.getStreamId(), 
broadcastInputDesriptor.getPhysicalName().get(), jobConfig);
+
+    String keySerde = 
jobConfig.get(String.format("streams.%s.samza.key.serde", 
broadcastInputDesriptor.getStreamId()));
+    String msgSerde = 
jobConfig.get(String.format("streams.%s.samza.msg.serde", 
broadcastInputDesriptor.getStreamId()));
+    assertTrue("Serialized serdes should not contain intermediate stream key 
serde",
+        !deserializedSerdes.containsKey(keySerde));
+    assertTrue("Serialized serdes should not contain intermediate stream msg 
serde",
+        !deserializedSerdes.containsKey(msgSerde));
+  }
+
+  @Test
+  public void testStreamApplicationWithTableAndSideInput() {
+    mockStreamAppDesc = new 
StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), 
mockConfig);
+    // add table to the RepartitionJoinStreamApplication
+    GenericInputDescriptor<KV<String, Object>> sideInput1 = 
inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde);
+    BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
+    TableSpec mockTableSpec = mock(TableSpec.class);
+    when(mockTableSpec.getId()).thenReturn("testTable");
+    when(mockTableSpec.getSerde()).thenReturn((KVSerde) defaultSerde);
+    
when(mockTableSpec.getTableProviderFactoryClassName()).thenReturn(MockTableProviderFactory.class.getName());
+    List<String> sideInputs = new ArrayList<>();
+    sideInputs.add(sideInput1.getStreamId());
+    when(mockTableSpec.getSideInputs()).thenReturn(sideInputs);
+    when(mockTableDescriptor.getTableId()).thenReturn("testTable");
+    when(mockTableDescriptor.getTableSpec()).thenReturn(mockTableSpec);
+    when(mockTableDescriptor.getSerde()).thenReturn(defaultSerde);
+    // add side input and terminate at table in the appplication
+    
mockStreamAppDesc.getInputStream(sideInput1).sendTo(mockStreamAppDesc.getTable(mockTableDescriptor));
+    StreamEdge sideInputEdge = new StreamEdge(new 
StreamSpec(sideInput1.getStreamId(), "sideInput1",
+        inputSystemDescriptor.getSystemName()), false, false, mockConfig);
+    // need to put the sideInput related stream configuration to the original 
config
+    // TODO: this is confusing since part of the system and stream related 
configuration is generated outside the JobGraphConfigureGenerator
+    // It would be nice if all system and stream related configuration is 
generated in one place and only intermediate stream
+    // configuration is generated by JobGraphConfigureGenerator
+    Map<String, String> configs = new HashMap<>(mockConfig);
+    configs.putAll(sideInputEdge.generateConfig());
+    mockConfig = spy(new MapConfig(configs));
+    configureJobNode(mockStreamAppDesc);
+
+    // create the JobGraphConfigureGenerator and generate the jobConfig for 
the jobNode
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"testJobGraphJson");
+    Config expectedJobConfig = getExpectedJobConfig(mockConfig, 
mockJobNode.getInEdges());
+    validateJobConfig(expectedJobConfig, jobConfig);
+    Map<String, Serde> deserializedSerdes = 
validateAndGetDeserializedSerdes(jobConfig, 5);
+    validateTableConfigure(jobConfig, deserializedSerdes, mockTableDescriptor);
+  }
+
+  @Test
+  public void testTaskApplicationWithTableAndSideInput() {
+    // add table to the RepartitionJoinStreamApplication
+    GenericInputDescriptor<KV<String, Object>> sideInput1 = 
inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde);
+    BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
+    TableSpec mockTableSpec = mock(TableSpec.class);
+    when(mockTableSpec.getId()).thenReturn("testTable");
+    when(mockTableSpec.getSerde()).thenReturn((KVSerde) defaultSerde);
+    
when(mockTableSpec.getTableProviderFactoryClassName()).thenReturn(MockTableProviderFactory.class.getName());
+    List<String> sideInputs = new ArrayList<>();
+    sideInputs.add(sideInput1.getStreamId());
+    when(mockTableSpec.getSideInputs()).thenReturn(sideInputs);
+    when(mockTableDescriptor.getTableId()).thenReturn("testTable");
+    when(mockTableDescriptor.getTableSpec()).thenReturn(mockTableSpec);
+    when(mockTableDescriptor.getSerde()).thenReturn(defaultSerde);
+    StreamEdge sideInputEdge = new StreamEdge(new 
StreamSpec(sideInput1.getStreamId(), "sideInput1",
+        inputSystemDescriptor.getSystemName()), false, false, mockConfig);
+    // need to put the sideInput related stream configuration to the original 
config
+    // TODO: this is confusing since part of the system and stream related 
configuration is generated outside the JobGraphConfigureGenerator
+    // It would be nice if all system and stream related configuration is 
generated in one place and only intermediate stream
+    // configuration is generated by JobGraphConfigureGenerator
+    Map<String, String> configs = new HashMap<>(mockConfig);
+    configs.putAll(sideInputEdge.generateConfig());
+    mockConfig = spy(new MapConfig(configs));
+
+    // set the application to TaskApplication, which still wire up all 
input/output/intermediate streams
+    TaskApplicationDescriptorImpl taskAppDesc = new 
TaskApplicationDescriptorImpl(getTaskApplication(), mockConfig);
+    // add table to the task application
+    taskAppDesc.addTable(mockTableDescriptor);
+    
taskAppDesc.addInputStream(inputSystemDescriptor.getInputDescriptor("sideInput1",
 defaultSerde));
+    configureJobNode(taskAppDesc);
+
+    // create the JobGraphConfigureGenerator and generate the jobConfig for 
the jobNode
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"testJobGraphJson");
+
+    // Verify the results
+    Config expectedJobConfig = getExpectedJobConfig(mockConfig, 
mockJobNode.getInEdges());
+    validateJobConfig(expectedJobConfig, jobConfig);
+    Map<String, Serde> deserializedSerdes = 
validateAndGetDeserializedSerdes(jobConfig, 2);
+    validateStreamConfigures(jobConfig, deserializedSerdes);
+    validateTableConfigure(jobConfig, deserializedSerdes, mockTableDescriptor);
+  }
+
+  @Test
+  public void testTaskInputsRemovedFromOriginalConfig() {
+    Map<String, String> configs = new HashMap<>(mockConfig);
+    configs.put(TaskConfig.INPUT_STREAMS(), "not.allowed1,not.allowed2");
+    mockConfig = spy(new MapConfig(configs));
+
+    mockStreamAppDesc = new 
StreamApplicationDescriptorImpl(getBroadcastOnlyStreamApplication(defaultSerde),
 mockConfig);
+    configureJobNode(mockStreamAppDesc);
+
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"testJobGraphJson");
+    Config expectedConfig = getExpectedJobConfig(mockConfig, 
mockJobNode.getInEdges());
+    validateJobConfig(expectedConfig, jobConfig);
+  }
+
+  @Test
+  public void testTaskInputsRetainedForLegacyTaskApplication() {
+    Map<String, String> originConfig = new HashMap<>(mockConfig);
+    originConfig.put(TaskConfig.INPUT_STREAMS(), "must.retain1,must.retain2");
+    mockConfig = new MapConfig(originConfig);
+    TaskApplicationDescriptorImpl taskAppDesc = new 
TaskApplicationDescriptorImpl(getLegacyTaskApplication(), mockConfig);
+    configureJobNode(taskAppDesc);
+
+    // create the JobGraphConfigureGenerator and generate the jobConfig for 
the jobNode
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"");
+    // jobConfig should be exactly the same as original config
+    Map<String, String> generatedConfig = new HashMap<>(jobConfig);
+    assertEquals(originConfig, generatedConfig);
+  }
+
+  @Test
+  public void testOverrideConfigs() {
+    Map<String, String> configs = new HashMap<>(mockConfig);
+    String streamCfgToOverride = String.format("streams.%s.samza.system", 
intermediateInputDescriptor.getStreamId());
+    String overrideCfgKey = 
String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()) + 
streamCfgToOverride;
+    configs.put(overrideCfgKey, "customized-system");
+    mockConfig = spy(new MapConfig(configs));
+    mockStreamAppDesc = new 
StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), 
mockConfig);
+    configureJobNode(mockStreamAppDesc);
+
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"testJobGraphJson");
+    Config expectedConfig = getExpectedJobConfig(mockConfig, 
mockJobNode.getInEdges());
+    validateJobConfig(expectedConfig, jobConfig);
+    assertEquals("customized-system", jobConfig.get(streamCfgToOverride));
+  }
+
+  @Test
+  public void testConfigureRewriter() {
+    Map<String, String> configs = new HashMap<>(mockConfig);
+    String streamCfgToOverride = String.format("streams.%s.samza.system", 
intermediateInputDescriptor.getStreamId());
+    String overrideCfgKey = 
String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()) + 
streamCfgToOverride;
+    configs.put(overrideCfgKey, "customized-system");
+    configs.put(String.format(JobConfig.CONFIG_REWRITER_CLASS(), "mock"), 
MockConfigRewriter.class.getName());
+    configs.put(JobConfig.CONFIG_REWRITERS(), "mock");
+    configs.put(String.format("job.config.rewriter.mock.%s", 
streamCfgToOverride), "rewritten-system");
+    mockConfig = spy(new MapConfig(configs));
+    mockStreamAppDesc = new 
StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), 
mockConfig);
+    configureJobNode(mockStreamAppDesc);
+
+    JobNodeConfigurationGenerator configureGenerator = new 
JobNodeConfigurationGenerator();
+    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, 
"testJobGraphJson");
+    Config expectedConfig = getExpectedJobConfig(mockConfig, 
mockJobNode.getInEdges());
+    validateJobConfig(expectedConfig, jobConfig);
+    assertEquals("rewritten-system", jobConfig.get(streamCfgToOverride));
+  }
+
+  private void validateTableConfigure(JobConfig jobConfig, Map<String, Serde> 
deserializedSerdes,
+      TableDescriptor tableDescriptor) {
+    Config tableConfig = jobConfig.subset(String.format("tables.%s.", 
tableDescriptor.getTableId()));
+    assertEquals(MockTableProviderFactory.class.getName(), 
tableConfig.get("provider.factory"));
+    MockTableProvider mockTableProvider =
+        (MockTableProvider) new 
MockTableProviderFactory().getTableProvider(((BaseTableDescriptor) 
tableDescriptor).getTableSpec());
+    
assertEquals(mockTableProvider.configMap.get("mock.table.provider.config"), 
jobConfig.get("mock.table.provider.config"));
+    validateTableSerdeConfigure(tableDescriptor.getTableId(), jobConfig, 
deserializedSerdes);
+  }
+
+  private Config getExpectedJobConfig(Config originConfig, Map<String, 
StreamEdge> inputEdges) {
+    Map<String, String> configMap = new HashMap<>(originConfig);
+    Set<String> inputs = new HashSet<>();
+    Set<String> broadcasts = new HashSet<>();
+    for (StreamEdge inputEdge : inputEdges.values()) {
+      if (inputEdge.isBroadcast()) {
+        broadcasts.add(inputEdge.getName() + "#0");
+      } else {
+        inputs.add(inputEdge.getName());
+      }
+    }
+    if (!inputs.isEmpty()) {
+      configMap.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+    }
+    if (!broadcasts.isEmpty()) {
+      configMap.put(TaskConfigJava.BROADCAST_INPUT_STREAMS, 
Joiner.on(',').join(broadcasts));
+    }
+    return new MapConfig(configMap);
+  }
+
+  private Map<String, Serde> validateAndGetDeserializedSerdes(Config 
jobConfig, int numSerdes) {
+    Config serializers = jobConfig.subset("serializers.registry.", true);
+    // make sure that the serializers deserialize correctly
+    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+    assertEquals(numSerdes, serializers.size());
+    return serializers.entrySet().stream().collect(Collectors.toMap(
+        e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), 
""),
+        e -> 
serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
+    ));
+  }
+
+  private void validateJobConfig(Config expectedConfig, JobConfig jobConfig) {
+    assertEquals(expectedConfig.get(JobConfig.JOB_NAME()), 
jobConfig.getName().get());
+    assertEquals(expectedConfig.get(JobConfig.JOB_ID()), jobConfig.getJobId());
+    assertEquals("testJobGraphJson", 
jobConfig.get(JobNodeConfigurationGenerator.CONFIG_INTERNAL_EXECUTION_PLAN));
+    assertEquals(expectedConfig.get(TaskConfig.INPUT_STREAMS()), 
jobConfig.get(TaskConfig.INPUT_STREAMS()));
+    assertEquals(expectedConfig.get(TaskConfigJava.BROADCAST_INPUT_STREAMS), 
jobConfig.get(TaskConfigJava.BROADCAST_INPUT_STREAMS));
+  }
+
+  private void validateStreamSerdeConfigure(String streamId, Config config, 
Map<String, Serde> deserializedSerdes) {
+    Config streamConfig = config.subset(String.format("streams.%s.samza.", 
streamId));
+    String keySerdeName = streamConfig.get("key.serde");
+    String valueSerdeName = streamConfig.get("msg.serde");
+    assertTrue(String.format("Serialized serdes should contain %s key serde", 
streamId), deserializedSerdes.containsKey(keySerdeName));
+    assertTrue(String.format("Serialized %s key serde should be a 
StringSerde", streamId), 
keySerdeName.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue(String.format("Serialized serdes should contain %s msg serde", 
streamId), deserializedSerdes.containsKey(valueSerdeName));
+    assertTrue(String.format("Serialized %s msg serde should be a 
JsonSerdeV2", streamId), 
valueSerdeName.startsWith(JsonSerdeV2.class.getSimpleName()));
+  }
+
+  private void validateTableSerdeConfigure(String tableId, Config config, 
Map<String, Serde> deserializedSerdes) {
+    Config streamConfig = config.subset(String.format("tables.%s.", tableId));
+    String keySerdeName = streamConfig.get("key.serde");
+    String valueSerdeName = streamConfig.get("value.serde");
+    assertTrue(String.format("Serialized serdes should contain %s key serde", 
tableId), deserializedSerdes.containsKey(keySerdeName));
+    assertTrue(String.format("Serialized %s key serde should be a 
StringSerde", tableId), 
keySerdeName.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue(String.format("Serialized serdes should contain %s value 
serde", tableId), deserializedSerdes.containsKey(valueSerdeName));
+    assertTrue(String.format("Serialized %s msg serde should be a 
JsonSerdeV2", tableId), 
valueSerdeName.startsWith(JsonSerdeV2.class.getSimpleName()));
+  }
+
+  private void validateIntermediateStreamConfigure(String streamId, String 
physicalName, Config config) {
+    Config intStreamConfig = config.subset(String.format("streams.%s.", 
streamId),  true);
+    assertEquals("intermediate-system", intStreamConfig.get("samza.system"));
+    assertEquals(String.valueOf(Integer.MAX_VALUE), 
intStreamConfig.get("samza.priority"));
+    assertEquals("true", 
intStreamConfig.get("samza.delete.committed.messages"));
+    assertEquals(physicalName, intStreamConfig.get("samza.physical.name"));
+    assertEquals("true", intStreamConfig.get("samza.intermediate"));
+    assertEquals("oldest", intStreamConfig.get("samza.offset.default"));
+  }
+
+  private void validateStreamConfigures(Config config, Map<String, Serde> 
deserializedSerdes) {
+
+    if (deserializedSerdes != null) {
+      validateStreamSerdeConfigure(input1Descriptor.getStreamId(), config, 
deserializedSerdes);
+      validateStreamSerdeConfigure(input2Descriptor.getStreamId(), config, 
deserializedSerdes);
+      validateStreamSerdeConfigure(outputDescriptor.getStreamId(), config, 
deserializedSerdes);
+      validateStreamSerdeConfigure(intermediateInputDescriptor.getStreamId(), 
config, deserializedSerdes);
+    }
+
+    // generated stream config for intermediate stream
+    String physicalName = 
intermediateInputDescriptor.getPhysicalName().isPresent() ?
+        intermediateInputDescriptor.getPhysicalName().get() : null;
+    
validateIntermediateStreamConfigure(intermediateInputDescriptor.getStreamId(), 
physicalName, config);
+  }
+
+  private void validateJoinStoreConfigures(MapConfig mapConfig, Map<String, 
Serde> deserializedSerdes) {
+    String leftJoinStoreKeySerde = 
mapConfig.get("stores.jobName-jobId-join-j1-L.key.serde");
+    String leftJoinStoreMsgSerde = 
mapConfig.get("stores.jobName-jobId-join-j1-L.msg.serde");
+    assertTrue("Serialized serdes should contain left join store key serde",
+        deserializedSerdes.containsKey(leftJoinStoreKeySerde));
+    assertTrue("Serialized left join store key serde should be a StringSerde",
+        leftJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain left join store msg serde",
+        deserializedSerdes.containsKey(leftJoinStoreMsgSerde));
+    assertTrue("Serialized left join store msg serde should be a 
TimestampedValueSerde",
+        
leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
+
+    String rightJoinStoreKeySerde = 
mapConfig.get("stores.jobName-jobId-join-j1-R.key.serde");
+    String rightJoinStoreMsgSerde = 
mapConfig.get("stores.jobName-jobId-join-j1-R.msg.serde");
+    assertTrue("Serialized serdes should contain right join store key serde",
+        deserializedSerdes.containsKey(rightJoinStoreKeySerde));
+    assertTrue("Serialized right join store key serde should be a StringSerde",
+        rightJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain right join store msg serde",
+        deserializedSerdes.containsKey(rightJoinStoreMsgSerde));
+    assertTrue("Serialized right join store msg serde should be a 
TimestampedValueSerde",
+        
rightJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
+
+    Config leftJoinStoreConfig = 
mapConfig.subset("stores.jobName-jobId-join-j1-L.", true);
+    validateJoinStoreConfigure(leftJoinStoreConfig, "jobName-jobId-join-j1-L");
+    Config rightJoinStoreConfig = 
mapConfig.subset("stores.jobName-jobId-join-j1-R.", true);
+    validateJoinStoreConfigure(rightJoinStoreConfig, 
"jobName-jobId-join-j1-R");
+  }
+
+  private void validateJoinStoreConfigure(Config joinStoreConfig, String 
changelogName) {
+    
assertEquals("org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory", 
joinStoreConfig.get("factory"));
+    assertEquals(changelogName, joinStoreConfig.get("changelog"));
+    assertEquals("delete", 
joinStoreConfig.get("changelog.kafka.cleanup.policy"));
+    assertEquals("3600000", 
joinStoreConfig.get("changelog.kafka.retention.ms"));
+    assertEquals("3600000", joinStoreConfig.get("rocksdb.ttl.ms"));
+  }
+
+  private static class MockTableProvider implements TableProvider {
+    private final Map<String, String> configMap;
+
+    MockTableProvider(Map<String, String> configMap) {
+      this.configMap = configMap;
+    }
+
+    @Override
+    public void init(SamzaContainerContext containerContext, TaskContext 
taskContext) {
+
+    }
+
+    @Override
+    public Table getTable() {
+      return null;
+    }
+
+    @Override
+    public Map<String, String> generateConfig(Config jobConfig, Map<String, 
String> generatedConfig) {
+      return configMap;
+    }
+
+    @Override
+    public void close() {
+
+    }
+  }
+
+  public static class MockTableProviderFactory implements TableProviderFactory 
{
+
+    @Override
+    public TableProvider getTableProvider(TableSpec tableSpec) {
+      Map<String, String> configMap = new HashMap<>();
+      configMap.put("mock.table.provider.config", "mock.config.value");
+      return new MockTableProvider(configMap);
+    }
+  }
+
+  public static class MockConfigRewriter implements ConfigRewriter {
+
+    @Override
+    public Config rewrite(String name, Config config) {
+      Map<String, String> configMap = new HashMap<>(config);
+      configMap.putAll(config.subset(String.format("job.config.rewriter.%s.", 
name)));
+      return new MapConfig(configMap);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java
index 988fb34..85921f4 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java
@@ -69,7 +69,7 @@ public class TestRemoteJobPlanner {
     ApplicationConfig mockAppConfig = mock(ApplicationConfig.class);
     
when(mockAppConfig.getAppMode()).thenReturn(ApplicationConfig.ApplicationMode.STREAM);
     when(plan.getApplicationConfig()).thenReturn(mockAppConfig);
-    doReturn(plan).when(remotePlanner).getExecutionPlan(any(), any());
+    doReturn(plan).when(remotePlanner).getExecutionPlan(any());
 
     remotePlanner.prepareJobs();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
index a5b15b8..57ae6d8 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
@@ -117,7 +117,6 @@ public class TestOperatorSpecGraph {
     OperatorSpecGraph specGraph = new OperatorSpecGraph(mockAppDesc);
     assertEquals(specGraph.getInputOperators(), inputOpSpecMap);
     assertEquals(specGraph.getOutputStreams(), outputStrmMap);
-    assertTrue(specGraph.getTables().isEmpty());
     assertTrue(!specGraph.hasWindowOrJoins());
     assertEquals(specGraph.getAllOperatorSpecs(), this.allOpSpecs);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
index 7704a5b..a34fdc3 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
@@ -53,7 +53,6 @@ public class OperatorSpecTestUtils {
   public static void assertClonedGraph(OperatorSpecGraph originalGraph, 
OperatorSpecGraph clonedGraph) {
     assertClonedInputs(originalGraph.getInputOperators(), 
clonedGraph.getInputOperators());
     assertClonedOutputs(originalGraph.getOutputStreams(), 
clonedGraph.getOutputStreams());
-    assertClonedTables(originalGraph.getTables(), clonedGraph.getTables());
     assertAllOperators(originalGraph.getAllOperatorSpecs(), 
clonedGraph.getAllOperatorSpecs());
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 19ee74f..fd0ddf8 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -25,10 +25,10 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.application.ApplicationDescriptor;
 import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.ApplicationDescriptorUtil;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.TaskApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -37,7 +37,6 @@ import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.execution.LocalJobPlanner;
 import org.apache.samza.task.IdentityStreamTask;
-import org.apache.samza.task.StreamTaskFactory;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -73,8 +72,9 @@ public class TestLocalApplicationRunner {
     final Map<String, String> cfgs = new HashMap<>();
     cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
UUIDGenerator.class.getName());
     cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
+    cfgs.put(JobConfig.JOB_ID(), "jobId");
     config = new MapConfig(cfgs);
-    mockApp = (TaskApplication) appDesc -> 
appDesc.setTaskFactory((StreamTaskFactory) () -> new IdentityStreamTask());
+    mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName());
     prepareTest();
 
     StreamProcessor sp = mock(StreamProcessor.class);
@@ -186,7 +186,8 @@ public class TestLocalApplicationRunner {
   }
 
   private void prepareTest() {
-    ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc = 
ApplicationDescriptorUtil.getAppDescriptor(mockApp, config);
+    ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
+        ApplicationDescriptorUtil.getAppDescriptor(mockApp, config);
     localPlanner = spy(new LocalJobPlanner(appDesc));
     runner = spy(new LocalApplicationRunner(appDesc, localPlanner));
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
index ae525fb..702cbfb 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
@@ -124,7 +124,7 @@ public class TestRemoteApplicationRunner {
 
         @Override
         public ApplicationStatus getStatus() {
-          String jobId = c.getJobId().get();
+          String jobId = c.getJobId();
           switch (jobId) {
             case "newJob":
               return ApplicationStatus.New;

http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
 
b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
index 05d717a..3f5f11c 100644
--- 
a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
+++ 
b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
@@ -35,7 +35,7 @@ class HdfsSystemFactory extends SystemFactory with Logging {
   def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry) = {
     val jobConfig = new JobConfig(config)
     val jobName = jobConfig.getName.getOrElse(throw new 
ConfigException("Missing job name."))
-    val jobId = jobConfig.getJobId.getOrElse("1")
+    val jobId = jobConfig.getJobId
 
     val clientId = getClientId("samza-producer", jobName, jobId)
     val metrics = new HdfsSystemProducerMetrics(systemName, registry)

Reply via email to