This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6300ed3  SAMZA-2250: Support large job models in standalone. (#1079)
6300ed3 is described below

commit 6300ed3ec33e3087ce871c15ba7723325e33d4a5
Author: shanthoosh <[email protected]>
AuthorDate: Mon Jun 24 17:47:00 2019 -0700

    SAMZA-2250: Support large job models in standalone. (#1079)
    
    * Initial version of supporting large job models in standalone.
    
    * Move the chuncking logic into zookeeper metadata store implementation.
    
    * Add the checksum to end of the value in ZkMetadataStore.
    
    * Code cleanup.
    
    * Remove unused variable and imports.
    
    * Code cleanup.
    
    * Address review comments.
    
    * Address review comments.
    
    * Address the nitpick comment.
---
 .../org/apache/samza/job/model/JobModelUtil.java   |  46 +++++++++
 .../java/org/apache/samza/zk/ZkJobCoordinator.java |  25 ++++-
 .../apache/samza/zk/ZkJobCoordinatorFactory.java   |   5 +-
 .../java/org/apache/samza/zk/ZkKeyBuilder.java     |   2 +-
 .../java/org/apache/samza/zk/ZkMetadataStore.java  | 108 +++++++++++++++++----
 .../org/apache/samza/zk/TestZkJobCoordinator.java  |  22 +++--
 .../org/apache/samza/zk/TestZkMetadataStore.java   |  25 +++--
 .../processor/TestZkLocalApplicationRunner.java    |  86 ++++++++--------
 8 files changed, 231 insertions(+), 88 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java 
b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index c236266..e230b1a 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -24,15 +24,25 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * Utility class for the {@link JobModel}
  */
 public class JobModelUtil {
 
+  private static final ObjectMapper MAPPER = 
SamzaObjectMapper.getObjectMapper();
+
+  private static final String UTF_8 = "UTF-8";
+
+  private static final String JOB_MODEL_GENERATION_KEY = 
"jobModelGeneration/jobModels";
+
   /**
    * Extracts the map of {@link SystemStreamPartition}s to {@link TaskName} 
from the {@link JobModel}
    *
@@ -57,6 +67,42 @@ public class JobModelUtil {
     return taskToSSPs;
   }
 
+
+  /**
+   * Converts the JobModel into a byte array into {@link MetadataStore}.
+   * @param jobModel the job model to store into {@link MetadataStore}.
+   * @param jobModelVersion the job model version.
+   * @param metadataStore the metadata store.
+   */
+  public static void writeJobModel(JobModel jobModel, String jobModelVersion, 
MetadataStore metadataStore) {
+    try {
+      String jobModelSerializedAsString = MAPPER.writeValueAsString(jobModel);
+      byte[] jobModelSerializedAsBytes = 
jobModelSerializedAsString.getBytes(UTF_8);
+      String metadataStoreKey = getJobModelKey(jobModelVersion);
+      metadataStore.put(metadataStoreKey, jobModelSerializedAsBytes);
+    } catch (Exception e) {
+      throw new SamzaException(String.format("Exception occurred when storing 
JobModel: %s with version: %s.", jobModel, jobModelVersion), e);
+    }
+  }
+
+  /**
+   * Reads and returns the {@link JobModel} from {@link MetadataStore}.
+   * @param metadataStore the metadata store.
+   * @return the job model read from the metadata store.
+   */
+  public static JobModel readJobModel(String jobModelVersion, MetadataStore 
metadataStore) {
+    try {
+      byte[] jobModelAsBytes = 
metadataStore.get(getJobModelKey(jobModelVersion));
+      return MAPPER.readValue(new String(jobModelAsBytes, UTF_8), 
JobModel.class);
+    } catch (Exception e) {
+      throw new SamzaException(String.format("Exception occurred when reading 
JobModel version: %s from metadata store.", jobModelVersion), e);
+    }
+  }
+
+  private static String getJobModelKey(String version) {
+    return String.format("%s/%s", JOB_MODEL_GENERATION_KEY, version);
+  }
+
   public static Set<SystemStream> getSystemStreams(JobModel jobModel) {
     Map<TaskName, Set<SystemStreamPartition>> taskToSSPs = 
getTaskToSystemStreamPartitions(jobModel);
     return taskToSSPs.values().stream().flatMap(taskSSPs -> 
taskSSPs.stream().map(ssp -> 
ssp.getSystemStream())).collect(Collectors.toSet());
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index bcf0f11..167d458 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -53,6 +53,7 @@ import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.JobModelUtil;
 import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.runtime.LocationId;
 import org.apache.samza.runtime.LocationIdProvider;
@@ -107,6 +108,7 @@ public class ZkJobCoordinator implements JobCoordinator {
   private final int debounceTimeMs;
   private final Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
   private final LocationId locationId;
+  private final MetadataStore jobModelMetadataStore;
 
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
@@ -122,7 +124,7 @@ public class ZkJobCoordinator implements JobCoordinator {
   @VisibleForTesting
   StreamPartitionCountMonitor streamPartitionCountMonitor = null;
 
-  ZkJobCoordinator(String processorId, Config config, MetricsRegistry 
metricsRegistry, ZkUtils zkUtils) {
+  ZkJobCoordinator(String processorId, Config config, MetricsRegistry 
metricsRegistry, ZkUtils zkUtils, MetadataStore jobModelMetadataStore) {
     this.config = config;
     this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
     this.zkSessionMetrics = new ZkSessionMetrics(metricsRegistry);
@@ -148,6 +150,7 @@ public class ZkJobCoordinator implements JobCoordinator {
             LocationIdProviderFactory.class);
     LocationIdProvider locationIdProvider = 
locationIdProviderFactory.getLocationIdProvider(config);
     this.locationId = locationIdProvider.getLocationId();
+    this.jobModelMetadataStore = jobModelMetadataStore;
   }
 
   @Override
@@ -156,6 +159,7 @@ public class ZkJobCoordinator implements JobCoordinator {
     zkUtils.validateZkVersion();
     zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), 
keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(), 
keyBuilder.getTaskLocalityPath()});
 
+    this.jobModelMetadataStore.init();
     systemAdmins.start();
     leaderElector.tryBecomeLeader();
     zkUtils.subscribeToJobModelVersionChange(new 
ZkJobModelVersionChangeHandler(zkUtils));
@@ -203,6 +207,7 @@ public class ZkJobCoordinator implements JobCoordinator {
           coordinatorListener.onCoordinatorStop();
         }
 
+        jobModelMetadataStore.close();
         shutdownSuccessful = true;
       } catch (Throwable t) {
         LOG.error("Encountered errors during job coordinator stop.", t);
@@ -275,7 +280,7 @@ public class ZkJobCoordinator implements JobCoordinator {
     LOG.info("pid=" + processorId + "Generated new JobModel with version: " + 
nextJMVersion + " and processors: " + currentProcessorIds);
 
     // Publish the new job model
-    zkUtils.publishJobModel(nextJMVersion, jobModel);
+    publishJobModelToMetadataStore(jobModel, nextJMVersion);
 
     // Start the barrier for the job model update
     barrier.create(nextJMVersion, currentProcessorIds);
@@ -288,6 +293,16 @@ public class ZkJobCoordinator implements JobCoordinator {
     debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () -> 
zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
   }
 
+  @VisibleForTesting
+  void publishJobModelToMetadataStore(JobModel jobModel, String nextJMVersion) 
{
+    JobModelUtil.writeJobModel(jobModel, nextJMVersion, jobModelMetadataStore);
+  }
+
+  @VisibleForTesting
+  JobModel readJobModelFromMetadataStore(String zkJobModelVersion) {
+    return JobModelUtil.readJobModel(zkJobModelVersion, jobModelMetadataStore);
+  }
+
   /**
    * Stores the configuration of the job in the coordinator stream.
    */
@@ -352,7 +367,7 @@ public class ZkJobCoordinator implements JobCoordinator {
     String zkJobModelVersion = zkUtils.getJobModelVersion();
     // If JobModel exists in zookeeper && cached JobModel version is unequal 
to JobModel version stored in zookeeper.
     if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, 
zkJobModelVersion)) {
-      JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
+      JobModel jobModel = readJobModelFromMetadataStore(zkJobModelVersion);
       for (ContainerModel containerModel : jobModel.getContainers().values()) {
         containerModel.getTasks().forEach((taskName, taskModel) -> 
changeLogPartitionMap.put(taskName, 
taskModel.getChangelogPartition().getPartitionId()));
       }
@@ -398,7 +413,7 @@ public class ZkJobCoordinator implements JobCoordinator {
     Map<TaskName, String> taskToProcessorId = new HashMap<>();
     Map<TaskName, List<SystemStreamPartition>> taskToSSPs = new HashMap<>();
     if (jobModelVersion != null) {
-      JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
+      JobModel jobModel = readJobModelFromMetadataStore(jobModelVersion);
       for (ContainerModel containerModel : jobModel.getContainers().values()) {
         for (TaskModel taskModel : containerModel.getTasks().values()) {
           taskToProcessorId.put(taskModel.getTaskName(), 
containerModel.getId());
@@ -538,7 +553,7 @@ public class ZkJobCoordinator implements JobCoordinator {
 
           LOG.info("Got a notification for new JobModel version. Path = {} 
Version = {}", dataPath, data);
 
-          newJobModel = zkUtils.getJobModel(jobModelVersion);
+          newJobModel = readJobModelFromMetadataStore(jobModelVersion);
           LOG.info("pid=" + processorId + ": new JobModel is available. 
Version =" + jobModelVersion + "; JobModel = " + newJobModel);
 
           if (!newJobModel.getContainers().containsKey(processorId)) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 4a02a7a..8aacb96 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -35,7 +35,7 @@ public class ZkJobCoordinatorFactory implements 
JobCoordinatorFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(ZkJobCoordinatorFactory.class);
   private static final String JOB_COORDINATOR_ZK_PATH_FORMAT = 
"%s/%s-%s-%s-coordinationData";
   private static final String DEFAULT_JOB_NAME = "defaultJob";
-  private static final String PROTOCOL_VERSION = "1.0";
+  private static final String PROTOCOL_VERSION = "2.0";
 
   /**
    * Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
@@ -49,7 +49,8 @@ public class ZkJobCoordinatorFactory implements 
JobCoordinatorFactory {
     String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
     ZkUtils zkUtils = getZkUtils(config, metricsRegistry, 
jobCoordinatorZkBasePath);
     LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
-    return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils);
+    ZkMetadataStore metadataStore = new 
ZkMetadataStore(zkUtils.getKeyBuilder().getRootPath(), config, metricsRegistry);
+    return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils, 
metadataStore);
   }
 
   private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry, 
String coordinatorZkBasePath) {
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 16efe81..60a1e63 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -62,7 +62,7 @@ public class ZkKeyBuilder {
     }
   }
 
-  String getRootPath() {
+  public String getRootPath() {
     return "/" + pathPrefix;
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
index fa41df6..697607e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
@@ -18,11 +18,17 @@
  */
 package org.apache.samza.zk;
 
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Longs;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.zip.CRC32;
 import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -40,6 +46,15 @@ public class ZkMetadataStore implements MetadataStore {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ZkMetadataStore.class);
 
+  private static final int CHECKSUM_SIZE_IN_BYTES = 8;
+
+  /**
+   * By default, the maximum data node size supported by zookeeper server is 1 
MB.
+   * ZkClient prepends any value with serialization bytes before storing to 
zookeeper server.
+   * Maximum segment size constant is initialized after factoring in size of 
serialization bytes.
+   */
+  private static final int VALUE_SEGMENT_SIZE_IN_BYTES = 1020 * 1020;
+
   private final ZkClient zkClient;
   private final ZkConfig zkConfig;
   private final String zkBaseDir;
@@ -64,7 +79,28 @@ public class ZkMetadataStore implements MetadataStore {
    */
   @Override
   public byte[] get(String key) {
-    return zkClient.readData(getZkPathForKey(key), true);
+    byte[] aggregatedZNodeValues = new byte[0];
+    for (int segmentIndex = 0;; ++segmentIndex) {
+      String zkPath = getZkPath(key, segmentIndex);
+      byte[] zNodeValue = zkClient.readData(zkPath, true);
+      if (zNodeValue == null) {
+        break;
+      }
+      aggregatedZNodeValues = Bytes.concat(aggregatedZNodeValues, zNodeValue);
+    }
+    if (aggregatedZNodeValues.length > 0) {
+      byte[] value = ArrayUtils.subarray(aggregatedZNodeValues, 0, 
aggregatedZNodeValues.length - CHECKSUM_SIZE_IN_BYTES);
+      byte[] checkSum = ArrayUtils.subarray(aggregatedZNodeValues, 
aggregatedZNodeValues.length - CHECKSUM_SIZE_IN_BYTES, 
aggregatedZNodeValues.length);
+      byte[] expectedChecksum = getCRCChecksum(value);
+      if (!Arrays.equals(checkSum, expectedChecksum)) {
+        String exceptionMessage = String.format("Expected checksum: %s did not 
match the actual checksum: %s for value: %s",
+            Arrays.toString(expectedChecksum), Arrays.toString(value), 
Arrays.toString(checkSum));
+        LOG.error(exceptionMessage);
+        throw new IllegalStateException(exceptionMessage);
+      }
+      return value;
+    }
+    return null;
   }
 
   /**
@@ -72,9 +108,12 @@ public class ZkMetadataStore implements MetadataStore {
    */
   @Override
   public void put(String key, byte[] value) {
-    String zkPath = getZkPathForKey(key);
-    zkClient.createPersistent(zkPath, true);
-    zkClient.writeData(zkPath, value);
+    List<byte[]> valueSegments = chunkMetadataStoreValue(value);
+    for (int segmentIndex = 0; segmentIndex < valueSegments.size(); 
segmentIndex++) {
+      String zkPath = getZkPath(key, segmentIndex);
+      zkClient.createPersistent(zkPath, true);
+      zkClient.writeData(zkPath, valueSegments.get(segmentIndex));
+    }
   }
 
   /**
@@ -82,7 +121,8 @@ public class ZkMetadataStore implements MetadataStore {
    */
   @Override
   public void delete(String key) {
-    zkClient.delete(getZkPathForKey(key));
+    String zkPath = String.format("%s/%s", zkBaseDir, key);
+    zkClient.deleteRecursive(zkPath);
   }
 
   /**
@@ -91,22 +131,15 @@ public class ZkMetadataStore implements MetadataStore {
    */
   @Override
   public Map<String, byte[]> all() {
-    try {
-      List<String> zkSubDirectories = zkClient.getChildren(zkBaseDir);
-      Map<String, byte[]> result = new HashMap<>();
-      for (String zkSubDir : zkSubDirectories) {
-        String completeZkPath = String.format("%s/%s", zkBaseDir, zkSubDir);
-        byte[] value = zkClient.readData(completeZkPath, true);
-        if (value != null) {
-          result.put(zkSubDir, value);
-        }
+    List<String> zkSubDirectories = zkClient.getChildren(zkBaseDir);
+    Map<String, byte[]> result = new HashMap<>();
+    for (String zkSubDir : zkSubDirectories) {
+      byte[] value = get(zkSubDir);
+      if (value != null) {
+        result.put(zkSubDir, value);
       }
-      return result;
-    } catch (Exception e) {
-      String errorMsg = String.format("Error reading path: %s from 
zookeeper.", zkBaseDir);
-      LOG.error(errorMsg, e);
-      throw new SamzaException(errorMsg, e);
     }
+    return result;
   }
 
   /**
@@ -125,7 +158,40 @@ public class ZkMetadataStore implements MetadataStore {
     zkClient.close();
   }
 
-  private String getZkPathForKey(String key) {
-    return String.format("%s/%s", zkBaseDir, key);
+  private String getZkPath(String key, int segmentIndex) {
+    return String.format("%s/%s/%d", zkBaseDir, key, segmentIndex);
+  }
+
+  /**
+   * Computes and returns the crc32 checksum of the input byte array.
+   * @param value the input byte array.
+   * @return the crc32 checksum of the byte array.
+   */
+  private static byte[] getCRCChecksum(byte[] value) {
+    CRC32 crc32 = new CRC32();
+    crc32.update(value);
+    long checksum = crc32.getValue();
+    return Longs.toByteArray(checksum);
+  }
+
+  /**
+   * Splits the input byte array value into independent byte array segments of 
1 MB size.
+   * @param value the input byte array to split.
+   * @return the byte array splitted into independent byte array chunks.
+   */
+  private static List<byte[]> chunkMetadataStoreValue(byte[] value) {
+    try {
+      byte[] checksum = getCRCChecksum(value);
+      byte[] valueWithChecksum = ArrayUtils.addAll(value, checksum);
+      List<byte[]> valueSegments = new ArrayList<>();
+      int length = valueWithChecksum.length;
+      for (int index = 0; index < length; index += 
VALUE_SEGMENT_SIZE_IN_BYTES) {
+        byte[] valueSegment = ArrayUtils.subarray(valueWithChecksum, index, 
Math.min(index + VALUE_SEGMENT_SIZE_IN_BYTES, length));
+        valueSegments.add(valueSegment);
+      }
+      return valueSegments;
+    } catch (Exception e) {
+      throw new SamzaException(String.format("Exception occurred when 
splitting the value: %s to small chunks.", value), e);
+    }
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 8e3639e..94a45d6 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -38,6 +38,7 @@ import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.system.SystemStreamPartition;
@@ -68,6 +69,7 @@ public class TestZkJobCoordinator {
 
   private final Config config;
   private final JobModel jobModel;
+  private final MetadataStore metadataStore;
 
   public TestZkJobCoordinator() {
     Map<String, String> configMap = ImmutableMap.of(
@@ -83,6 +85,7 @@ public class TestZkJobCoordinator {
         new TaskName("t1"), new TaskModel(new TaskName("t1"), ssps, new 
Partition(0)));
     ContainerModel containerModel = new ContainerModel("0", tasksForContainer);
     jobModel = new JobModel(config, ImmutableMap.of("0", containerModel));
+    metadataStore = Mockito.mock(MetadataStore.class);
   }
 
   @Test
@@ -95,9 +98,9 @@ public class TestZkJobCoordinator {
     ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
     when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
     when(zkUtils.getZkClient()).thenReturn(mockZkClient);
-    when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new 
JobModel(new MapConfig(), new HashMap<>()));
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils, metadataStore));
+    doReturn(new JobModel(new MapConfig(), new 
HashMap<>())).when(zkJobCoordinator).readJobModelFromMetadataStore(TEST_JOB_MODEL_VERSION);
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
         jcShutdownLatch.countDown();
@@ -124,7 +127,7 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils, metadataStore));
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new 
MetricsRegistryMap());
     final ZkSessionStateChangedListener zkSessionStateChangedListener = 
zkJobCoordinator.new ZkSessionStateChangedListener();
@@ -150,7 +153,7 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils, metadataStore));
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new 
MetricsRegistryMap());
     final ZkSessionStateChangedListener zkSessionStateChangedListener = 
zkJobCoordinator.new ZkSessionStateChangedListener();
@@ -181,7 +184,7 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils, metadataStore));
     StreamPartitionCountMonitor monitor = 
Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.streamPartitionCountMonitor = monitor;
@@ -204,7 +207,7 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils, metadataStore));
 
     StreamPartitionCountMonitor monitor = 
Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
@@ -231,7 +234,7 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils, metadataStore));
 
     StreamPartitionCountMonitor monitor = 
Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
@@ -254,7 +257,7 @@ public class TestZkJobCoordinator {
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
 
     StartpointManager mockStartpointManager = 
Mockito.mock(StartpointManager.class);
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), 
zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), 
zkUtils, metadataStore));
     
doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager(any(CoordinatorStreamStore.class));
     
doReturn(mock(CoordinatorStreamStore.class)).when(zkJobCoordinator).createCoordinatorStreamStore();
 
@@ -284,7 +287,7 @@ public class TestZkJobCoordinator {
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
 
     StartpointManager mockStartpointManager = 
Mockito.mock(StartpointManager.class);
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), 
zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), 
zkUtils, metadataStore));
     
doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager(any(CoordinatorStreamStore.class));
     
doReturn(mock(CoordinatorStreamStore.class)).when(zkJobCoordinator).createCoordinatorStreamStore();
 
@@ -293,7 +296,6 @@ public class TestZkJobCoordinator {
 
     zkJobCoordinator.doOnProcessorChange();
 
-    verify(zkUtils).publishJobModel(anyString(), eq(jobModel));
     verify(zkUtils).publishJobModelVersion(anyString(), anyString());
     verify(zkJobCoordinator).loadMetadataResources(eq(jobModel));
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
index 4d53222..94f188e 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
@@ -20,6 +20,7 @@ package org.apache.samza.zk;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
+import java.util.Random;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -38,6 +39,10 @@ public class TestZkMetadataStore {
 
   private static final String LOCALHOST = "127.0.0.1";
 
+  private static final Random RANDOM = new Random();
+
+  private static final int VALUE_SIZE_IN_BYTES = 1024  * 1024 * 10; // 10 MB
+
   private static EmbeddedZookeeper zkServer;
 
   private MetadataStore zkMetadataStore;
@@ -68,7 +73,7 @@ public class TestZkMetadataStore {
   @Test
   public void testReadAfterWrite() throws Exception {
     String key = "test-key1";
-    byte[] value = "test-value1".getBytes("UTF-8");
+    byte[] value = getRandomByteArray(VALUE_SIZE_IN_BYTES);
     Assert.assertNull(zkMetadataStore.get(key));
     zkMetadataStore.put(key, value);
     Assert.assertTrue(Arrays.equals(value, zkMetadataStore.get(key)));
@@ -78,7 +83,7 @@ public class TestZkMetadataStore {
   @Test
   public void testReadAfterDelete() throws Exception {
     String key = "test-key1";
-    byte[] value = "test-value1".getBytes("UTF-8");
+    byte[] value = getRandomByteArray(VALUE_SIZE_IN_BYTES);
     Assert.assertNull(zkMetadataStore.get(key));
     zkMetadataStore.put(key, value);
     Assert.assertTrue(Arrays.equals(value, zkMetadataStore.get(key)));
@@ -96,8 +101,8 @@ public class TestZkMetadataStore {
   @Test
   public void testMultipleUpdatesForSameKey() throws Exception {
     String key = "test-key1";
-    byte[] value = "test-value1".getBytes("UTF-8");
-    byte[] value1 = "test-value2".getBytes("UTF-8");
+    byte[] value = getRandomByteArray(VALUE_SIZE_IN_BYTES);
+    byte[] value1 = getRandomByteArray(VALUE_SIZE_IN_BYTES);
     zkMetadataStore.put(key, value);
     zkMetadataStore.put(key, value1);
     Assert.assertTrue(Arrays.equals(value1, zkMetadataStore.get(key)));
@@ -109,13 +114,19 @@ public class TestZkMetadataStore {
     String key = "test-key1";
     String key1 = "test-key2";
     String key2 = "test-key3";
-    byte[] value = "test-value1".getBytes("UTF-8");
-    byte[] value1 = "test-value2".getBytes("UTF-8");
-    byte[] value2 = "test-value3".getBytes("UTF-8");
+    byte[] value = getRandomByteArray(VALUE_SIZE_IN_BYTES);
+    byte[] value1 = getRandomByteArray(VALUE_SIZE_IN_BYTES);
+    byte[] value2 = getRandomByteArray(VALUE_SIZE_IN_BYTES);
     zkMetadataStore.put(key, value);
     zkMetadataStore.put(key1, value1);
     zkMetadataStore.put(key2, value2);
     ImmutableMap<String, byte[]> expected = ImmutableMap.of(key, value, key1, 
value1, key2, value2);
     Assert.assertEquals(expected.size(), zkMetadataStore.all().size());
   }
+
+  private static byte[] getRandomByteArray(int numBytes) {
+    byte[] byteArray = new byte[numBytes];
+    RANDOM.nextBytes(byteArray);
+    return byteArray;
+  }
 }
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index d234590..754939f 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -56,6 +56,7 @@ import 
org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metadatastore.MetadataStore;
@@ -69,6 +70,7 @@ import org.apache.samza.test.StandaloneTestUtils;
 import org.apache.samza.test.harness.IntegrationTestHarness;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.zk.ZkMetadataStore;
 import org.apache.samza.zk.ZkStringSerializer;
 import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.apache.samza.zk.ZkKeyBuilder;
@@ -118,6 +120,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
   private ApplicationConfig applicationConfig3;
   private String testStreamAppName;
   private String testStreamAppId;
+  private MetadataStore zkMetadataStore;
 
   @Rule
   public Timeout testTimeOutInMillis = new Timeout(150000);
@@ -165,6 +168,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
             .collect(Collectors.toList());
 
     assertTrue("Encountered errors during test setup. Failed to create 
topics.", createTopics(newTopics));
+    zkMetadataStore = new 
ZkMetadataStore(zkUtils.getKeyBuilder().getRootPath(), new 
MapConfig(configMap), new NoOpMetricsRegistry());
   }
 
   @Override
@@ -270,7 +274,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     TestStreamApplication.StreamApplicationCallback callback = m -> {
       if (hasSecondProcessorJoined.compareAndSet(false, true)) {
         previousJobModelVersion[0] = zkUtils.getJobModelVersion();
-        previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
+        previousJobModel[0] = 
JobModelUtil.readJobModel(previousJobModelVersion[0], zkMetadataStore);
         executeRun(appRunner2, localTestConfig2);
         try {
           // Wait for appRunner2 to register with zookeeper.
@@ -292,7 +296,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     kafkaEventsConsumedLatch.await();
 
     String currentJobModelVersion = zkUtils.getJobModelVersion();
-    JobModel updatedJobModel = zkUtils.getJobModel(currentJobModelVersion);
+    JobModel updatedJobModel = 
JobModelUtil.readJobModel(currentJobModelVersion, zkMetadataStore);
 
     // Job model before and after the addition of second stream processor 
should be the same.
     assertEquals(previousJobModel[0], updatedJobModel);
@@ -352,7 +356,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     TestStreamApplication.StreamApplicationCallback streamApplicationCallback 
= message -> {
       if (hasSecondProcessorJoined.compareAndSet(false, true)) {
         previousJobModelVersion[0] = zkUtils.getJobModelVersion();
-        previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
+        previousJobModel[0] = 
JobModelUtil.readJobModel(previousJobModelVersion[0], zkMetadataStore);
         executeRun(appRunner2, testAppConfig2);
         try {
           // Wait for appRunner2 to register with zookeeper.
@@ -376,7 +380,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     kafkaEventsConsumedLatch.await();
 
     String currentJobModelVersion = zkUtils.getJobModelVersion();
-    JobModel updatedJobModel = zkUtils.getJobModel(currentJobModelVersion);
+    JobModel updatedJobModel = 
JobModelUtil.readJobModel(currentJobModelVersion, zkMetadataStore);
 
     // JobModelVersion check to verify that leader publishes new jobModel.
     assertTrue(Integer.parseInt(previousJobModelVersion[0]) < 
Integer.parseInt(currentJobModelVersion));
@@ -399,7 +403,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     TaskModel taskModel1 = 
updatedTaskModelMap1.values().stream().findFirst().get();
     TaskModel taskModel2 = 
updatedTaskModelMap2.values().stream().findFirst().get();
     assertEquals(taskModel1.getSystemStreamPartitions(), 
taskModel2.getSystemStreamPartitions());
-    
assertTrue(!taskModel1.getTaskName().getTaskName().equals(taskModel2.getTaskName().getTaskName()));
+    
assertFalse(taskModel1.getTaskName().getTaskName().equals(taskModel2.getTaskName().getTaskName()));
 
     processedMessagesLatch.await();
 
@@ -441,7 +445,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
 
     // Verifications before killing the leader.
     String jobModelVersion = zkUtils.getJobModelVersion();
-    JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
+    JobModel jobModel = JobModelUtil.readJobModel(jobModelVersion, 
zkMetadataStore);
     assertEquals(2, jobModel.getContainers().size());
     assertEquals(Sets.newHashSet("0000000000", "0000000001"), 
jobModel.getContainers().keySet());
     assertEquals("1", jobModelVersion);
@@ -467,7 +471,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     assertEquals(2, processorIdsFromZK.size());
     assertEquals(PROCESSOR_IDS[1], processorIdsFromZK.get(0));
     jobModelVersion = zkUtils.getJobModelVersion();
-    jobModel = zkUtils.getJobModel(jobModelVersion);
+    jobModel = JobModelUtil.readJobModel(jobModelVersion, zkMetadataStore);
     assertEquals(Sets.newHashSet("0000000001", "0000000002"), 
jobModel.getContainers().keySet());
     assertEquals(2, jobModel.getContainers().size());
 
@@ -560,7 +564,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
 
     // Read job model before rolling upgrade.
     String jobModelVersion = zkUtils.getJobModelVersion();
-    JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
+    JobModel jobModel = JobModelUtil.readJobModel(jobModelVersion, 
zkMetadataStore);
 
     appRunner1.kill();
     appRunner1.waitForFinish();
@@ -584,7 +588,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
 
     // Read new job model after rolling upgrade.
     String newJobModelVersion = zkUtils.getJobModelVersion();
-    JobModel newJobModel = zkUtils.getJobModel(newJobModelVersion);
+    JobModel newJobModel = JobModelUtil.readJobModel(newJobModelVersion, 
zkMetadataStore);
 
     assertEquals(Integer.parseInt(jobModelVersion) + 1, 
Integer.parseInt(newJobModelVersion));
     assertEquals(jobModel.getContainers(), newJobModel.getContainers());
@@ -685,8 +689,8 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     processedMessagesLatch1.await();
 
     String jobModelVersion = zkUtils.getJobModelVersion();
-    JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
-    Set<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
+    JobModel jobModel = JobModelUtil.readJobModel(jobModelVersion, 
zkMetadataStore);
+    List<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
 
     // Validate that the input partition count is 5 in the JobModel.
     Assert.assertEquals(5, ssps.size());
@@ -702,7 +706,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     }
 
     String newJobModelVersion = zkUtils.getJobModelVersion();
-    JobModel newJobModel = zkUtils.getJobModel(newJobModelVersion);
+    JobModel newJobModel = JobModelUtil.readJobModel(newJobModelVersion, 
zkMetadataStore);
     ssps = getSystemStreamPartitions(newJobModel);
 
     // Validate that the input partition count is 100 in the new JobModel.
@@ -777,8 +781,8 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
 
     // Read the latest JobModel for validation.
     String jobModelVersion = zkUtils.getJobModelVersion();
-    JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
-    Set<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
+    JobModel jobModel = JobModelUtil.readJobModel(jobModelVersion, 
zkMetadataStore);
+    List<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
 
     // Validate that the input partition count is 32 in the JobModel.
     Assert.assertEquals(32, ssps.size());
@@ -795,7 +799,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     while (true) {
       LOGGER.info("Waiting for new jobModel to be published");
       jobModelVersion = zkUtils.getJobModelVersion();
-      jobModel = zkUtils.getJobModel(jobModelVersion);
+      jobModel = JobModelUtil.readJobModel(jobModelVersion, zkMetadataStore);
       ssps = getSystemStreamPartitions(jobModel);
 
       if (ssps.size() == 64) {
@@ -881,8 +885,8 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
 
     // Read the latest JobModel for validation.
     String jobModelVersion = zkUtils.getJobModelVersion();
-    JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
-    Set<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
+    JobModel jobModel = JobModelUtil.readJobModel(jobModelVersion, 
zkMetadataStore);
+    List<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
 
     // Validate that the input 64 partitions are present in JobModel.
     Assert.assertEquals(64, ssps.size());
@@ -902,7 +906,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     while (true) {
       LOGGER.info("Waiting for new jobModel to be published");
       jobModelVersion = zkUtils.getJobModelVersion();
-      jobModel = zkUtils.getJobModel(jobModelVersion);
+      jobModel = JobModelUtil.readJobModel(jobModelVersion, zkMetadataStore);
       ssps = getSystemStreamPartitions(jobModel);
 
       if (ssps.size() == 128) {
@@ -964,27 +968,6 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
   }
 
   /**
-   * Computes the task to partition assignment of the {@param JobModel}.
-   * @param jobModel the jobModel to compute task to partition assignment for.
-   * @return the computed task to partition assignments of the {@param 
JobModel}.
-   */
-  private static Map<TaskName, Set<SystemStreamPartition>> 
getTaskAssignments(JobModel jobModel) {
-    Map<TaskName, Set<SystemStreamPartition>> taskAssignments = new 
HashMap<>();
-    for (Map.Entry<String, ContainerModel> entry : 
jobModel.getContainers().entrySet()) {
-      Map<TaskName, TaskModel> tasks = entry.getValue().getTasks();
-      for (TaskModel taskModel : tasks.values()) {
-        if (!taskAssignments.containsKey(taskModel.getTaskName())) {
-          taskAssignments.put(taskModel.getTaskName(), new HashSet<>());
-        }
-        if (taskModel.getTaskMode() == TaskMode.Active) {
-          
taskAssignments.get(taskModel.getTaskName()).addAll(taskModel.getSystemStreamPartitions());
-        }
-      }
-    }
-    return taskAssignments;
-  }
-
-  /**
    * Test if two processors coming up at the same time agree on a single runid
    * 1. bring up two processors
    * 2. wait till they start consuimg messages
@@ -1247,10 +1230,29 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     appRunner3.waitForFinish();
   }
 
+  /**
+   * Computes the task to partition assignment of the {@param JobModel}.
+   * @param jobModel the jobModel to compute task to partition assignment for.
+   * @return the computed task to partition assignments of the {@param 
JobModel}.
+   */
+  private static Map<TaskName, Set<SystemStreamPartition>> 
getTaskAssignments(JobModel jobModel) {
+    Map<TaskName, Set<SystemStreamPartition>> taskAssignments = new 
HashMap<>();
+    for (Map.Entry<String, ContainerModel> entry : 
jobModel.getContainers().entrySet()) {
+      Map<TaskName, TaskModel> tasks = entry.getValue().getTasks();
+      for (TaskModel taskModel : tasks.values()) {
+        if (!taskAssignments.containsKey(taskModel.getTaskName())) {
+          taskAssignments.put(taskModel.getTaskName(), new HashSet<>());
+        }
+        if (taskModel.getTaskMode() == TaskMode.Active) {
+          
taskAssignments.get(taskModel.getTaskName()).addAll(taskModel.getSystemStreamPartitions());
+        }
+      }
+    }
+    return taskAssignments;
+  }
 
-  private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel 
jobModel) {
-    System.out.println(jobModel);
-    Set<SystemStreamPartition> ssps = new HashSet<>();
+  private static List<SystemStreamPartition> 
getSystemStreamPartitions(JobModel jobModel) {
+    List<SystemStreamPartition> ssps = new ArrayList<>();
     jobModel.getContainers().forEach((containerName, containerModel) -> {
         containerModel.getTasks().forEach((taskName, taskModel) -> 
ssps.addAll(taskModel.getSystemStreamPartitions()));
       });

Reply via email to