This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 6300ed3 SAMZA-2250: Support large job models in standalone. (#1079)
6300ed3 is described below
commit 6300ed3ec33e3087ce871c15ba7723325e33d4a5
Author: shanthoosh <[email protected]>
AuthorDate: Mon Jun 24 17:47:00 2019 -0700
SAMZA-2250: Support large job models in standalone. (#1079)
* Initial version of supporting large job models in standalone.
* Move the chuncking logic into zookeeper metadata store implementation.
* Add the checksum to end of the value in ZkMetadataStore.
* Code cleanup.
* Remove unused variable and imports.
* Code cleanup.
* Address review comments.
* Address review comments.
* Address the nitpick comment.
---
.../org/apache/samza/job/model/JobModelUtil.java | 46 +++++++++
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 25 ++++-
.../apache/samza/zk/ZkJobCoordinatorFactory.java | 5 +-
.../java/org/apache/samza/zk/ZkKeyBuilder.java | 2 +-
.../java/org/apache/samza/zk/ZkMetadataStore.java | 108 +++++++++++++++++----
.../org/apache/samza/zk/TestZkJobCoordinator.java | 22 +++--
.../org/apache/samza/zk/TestZkMetadataStore.java | 25 +++--
.../processor/TestZkLocalApplicationRunner.java | 86 ++++++++--------
8 files changed, 231 insertions(+), 88 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index c236266..e230b1a 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -24,15 +24,25 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
/**
* Utility class for the {@link JobModel}
*/
public class JobModelUtil {
+ private static final ObjectMapper MAPPER =
SamzaObjectMapper.getObjectMapper();
+
+ private static final String UTF_8 = "UTF-8";
+
+ private static final String JOB_MODEL_GENERATION_KEY =
"jobModelGeneration/jobModels";
+
/**
* Extracts the map of {@link SystemStreamPartition}s to {@link TaskName}
from the {@link JobModel}
*
@@ -57,6 +67,42 @@ public class JobModelUtil {
return taskToSSPs;
}
+
+ /**
+ * Converts the JobModel into a byte array into {@link MetadataStore}.
+ * @param jobModel the job model to store into {@link MetadataStore}.
+ * @param jobModelVersion the job model version.
+ * @param metadataStore the metadata store.
+ */
+ public static void writeJobModel(JobModel jobModel, String jobModelVersion,
MetadataStore metadataStore) {
+ try {
+ String jobModelSerializedAsString = MAPPER.writeValueAsString(jobModel);
+ byte[] jobModelSerializedAsBytes =
jobModelSerializedAsString.getBytes(UTF_8);
+ String metadataStoreKey = getJobModelKey(jobModelVersion);
+ metadataStore.put(metadataStoreKey, jobModelSerializedAsBytes);
+ } catch (Exception e) {
+ throw new SamzaException(String.format("Exception occurred when storing
JobModel: %s with version: %s.", jobModel, jobModelVersion), e);
+ }
+ }
+
+ /**
+ * Reads and returns the {@link JobModel} from {@link MetadataStore}.
+ * @param metadataStore the metadata store.
+ * @return the job model read from the metadata store.
+ */
+ public static JobModel readJobModel(String jobModelVersion, MetadataStore
metadataStore) {
+ try {
+ byte[] jobModelAsBytes =
metadataStore.get(getJobModelKey(jobModelVersion));
+ return MAPPER.readValue(new String(jobModelAsBytes, UTF_8),
JobModel.class);
+ } catch (Exception e) {
+ throw new SamzaException(String.format("Exception occurred when reading
JobModel version: %s from metadata store.", jobModelVersion), e);
+ }
+ }
+
+ private static String getJobModelKey(String version) {
+ return String.format("%s/%s", JOB_MODEL_GENERATION_KEY, version);
+ }
+
public static Set<SystemStream> getSystemStreams(JobModel jobModel) {
Map<TaskName, Set<SystemStreamPartition>> taskToSSPs =
getTaskToSystemStreamPartitions(jobModel);
return taskToSSPs.values().stream().flatMap(taskSSPs ->
taskSSPs.stream().map(ssp ->
ssp.getSystemStream())).collect(Collectors.toSet());
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index bcf0f11..167d458 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -53,6 +53,7 @@ import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.JobModelUtil;
import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.runtime.LocationId;
import org.apache.samza.runtime.LocationIdProvider;
@@ -107,6 +108,7 @@ public class ZkJobCoordinator implements JobCoordinator {
private final int debounceTimeMs;
private final Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
private final LocationId locationId;
+ private final MetadataStore jobModelMetadataStore;
private JobCoordinatorListener coordinatorListener = null;
private JobModel newJobModel;
@@ -122,7 +124,7 @@ public class ZkJobCoordinator implements JobCoordinator {
@VisibleForTesting
StreamPartitionCountMonitor streamPartitionCountMonitor = null;
- ZkJobCoordinator(String processorId, Config config, MetricsRegistry
metricsRegistry, ZkUtils zkUtils) {
+ ZkJobCoordinator(String processorId, Config config, MetricsRegistry
metricsRegistry, ZkUtils zkUtils, MetadataStore jobModelMetadataStore) {
this.config = config;
this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
this.zkSessionMetrics = new ZkSessionMetrics(metricsRegistry);
@@ -148,6 +150,7 @@ public class ZkJobCoordinator implements JobCoordinator {
LocationIdProviderFactory.class);
LocationIdProvider locationIdProvider =
locationIdProviderFactory.getLocationIdProvider(config);
this.locationId = locationIdProvider.getLocationId();
+ this.jobModelMetadataStore = jobModelMetadataStore;
}
@Override
@@ -156,6 +159,7 @@ public class ZkJobCoordinator implements JobCoordinator {
zkUtils.validateZkVersion();
zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(),
keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(),
keyBuilder.getTaskLocalityPath()});
+ this.jobModelMetadataStore.init();
systemAdmins.start();
leaderElector.tryBecomeLeader();
zkUtils.subscribeToJobModelVersionChange(new
ZkJobModelVersionChangeHandler(zkUtils));
@@ -203,6 +207,7 @@ public class ZkJobCoordinator implements JobCoordinator {
coordinatorListener.onCoordinatorStop();
}
+ jobModelMetadataStore.close();
shutdownSuccessful = true;
} catch (Throwable t) {
LOG.error("Encountered errors during job coordinator stop.", t);
@@ -275,7 +280,7 @@ public class ZkJobCoordinator implements JobCoordinator {
LOG.info("pid=" + processorId + "Generated new JobModel with version: " +
nextJMVersion + " and processors: " + currentProcessorIds);
// Publish the new job model
- zkUtils.publishJobModel(nextJMVersion, jobModel);
+ publishJobModelToMetadataStore(jobModel, nextJMVersion);
// Start the barrier for the job model update
barrier.create(nextJMVersion, currentProcessorIds);
@@ -288,6 +293,16 @@ public class ZkJobCoordinator implements JobCoordinator {
debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () ->
zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
}
+ @VisibleForTesting
+ void publishJobModelToMetadataStore(JobModel jobModel, String nextJMVersion)
{
+ JobModelUtil.writeJobModel(jobModel, nextJMVersion, jobModelMetadataStore);
+ }
+
+ @VisibleForTesting
+ JobModel readJobModelFromMetadataStore(String zkJobModelVersion) {
+ return JobModelUtil.readJobModel(zkJobModelVersion, jobModelMetadataStore);
+ }
+
/**
* Stores the configuration of the job in the coordinator stream.
*/
@@ -352,7 +367,7 @@ public class ZkJobCoordinator implements JobCoordinator {
String zkJobModelVersion = zkUtils.getJobModelVersion();
// If JobModel exists in zookeeper && cached JobModel version is unequal
to JobModel version stored in zookeeper.
if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion,
zkJobModelVersion)) {
- JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
+ JobModel jobModel = readJobModelFromMetadataStore(zkJobModelVersion);
for (ContainerModel containerModel : jobModel.getContainers().values()) {
containerModel.getTasks().forEach((taskName, taskModel) ->
changeLogPartitionMap.put(taskName,
taskModel.getChangelogPartition().getPartitionId()));
}
@@ -398,7 +413,7 @@ public class ZkJobCoordinator implements JobCoordinator {
Map<TaskName, String> taskToProcessorId = new HashMap<>();
Map<TaskName, List<SystemStreamPartition>> taskToSSPs = new HashMap<>();
if (jobModelVersion != null) {
- JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
+ JobModel jobModel = readJobModelFromMetadataStore(jobModelVersion);
for (ContainerModel containerModel : jobModel.getContainers().values()) {
for (TaskModel taskModel : containerModel.getTasks().values()) {
taskToProcessorId.put(taskModel.getTaskName(),
containerModel.getId());
@@ -538,7 +553,7 @@ public class ZkJobCoordinator implements JobCoordinator {
LOG.info("Got a notification for new JobModel version. Path = {}
Version = {}", dataPath, data);
- newJobModel = zkUtils.getJobModel(jobModelVersion);
+ newJobModel = readJobModelFromMetadataStore(jobModelVersion);
LOG.info("pid=" + processorId + ": new JobModel is available.
Version =" + jobModelVersion + "; JobModel = " + newJobModel);
if (!newJobModel.getContainers().containsKey(processorId)) {
diff --git
a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 4a02a7a..8aacb96 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -35,7 +35,7 @@ public class ZkJobCoordinatorFactory implements
JobCoordinatorFactory {
private static final Logger LOG =
LoggerFactory.getLogger(ZkJobCoordinatorFactory.class);
private static final String JOB_COORDINATOR_ZK_PATH_FORMAT =
"%s/%s-%s-%s-coordinationData";
private static final String DEFAULT_JOB_NAME = "defaultJob";
- private static final String PROTOCOL_VERSION = "1.0";
+ private static final String PROTOCOL_VERSION = "2.0";
/**
* Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
@@ -49,7 +49,8 @@ public class ZkJobCoordinatorFactory implements
JobCoordinatorFactory {
String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
ZkUtils zkUtils = getZkUtils(config, metricsRegistry,
jobCoordinatorZkBasePath);
LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
- return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils);
+ ZkMetadataStore metadataStore = new
ZkMetadataStore(zkUtils.getKeyBuilder().getRootPath(), config, metricsRegistry);
+ return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils,
metadataStore);
}
private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry,
String coordinatorZkBasePath) {
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 16efe81..60a1e63 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -62,7 +62,7 @@ public class ZkKeyBuilder {
}
}
- String getRootPath() {
+ public String getRootPath() {
return "/" + pathPrefix;
}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
index fa41df6..697607e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
@@ -18,11 +18,17 @@
*/
package org.apache.samza.zk;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Longs;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.zip.CRC32;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.metrics.MetricsRegistry;
@@ -40,6 +46,15 @@ public class ZkMetadataStore implements MetadataStore {
private static final Logger LOG =
LoggerFactory.getLogger(ZkMetadataStore.class);
+ private static final int CHECKSUM_SIZE_IN_BYTES = 8;
+
+ /**
+ * By default, the maximum data node size supported by zookeeper server is 1
MB.
+ * ZkClient prepends any value with serialization bytes before storing to
zookeeper server.
+ * Maximum segment size constant is initialized after factoring in size of
serialization bytes.
+ */
+ private static final int VALUE_SEGMENT_SIZE_IN_BYTES = 1020 * 1020;
+
private final ZkClient zkClient;
private final ZkConfig zkConfig;
private final String zkBaseDir;
@@ -64,7 +79,28 @@ public class ZkMetadataStore implements MetadataStore {
*/
@Override
public byte[] get(String key) {
- return zkClient.readData(getZkPathForKey(key), true);
+ byte[] aggregatedZNodeValues = new byte[0];
+ for (int segmentIndex = 0;; ++segmentIndex) {
+ String zkPath = getZkPath(key, segmentIndex);
+ byte[] zNodeValue = zkClient.readData(zkPath, true);
+ if (zNodeValue == null) {
+ break;
+ }
+ aggregatedZNodeValues = Bytes.concat(aggregatedZNodeValues, zNodeValue);
+ }
+ if (aggregatedZNodeValues.length > 0) {
+ byte[] value = ArrayUtils.subarray(aggregatedZNodeValues, 0,
aggregatedZNodeValues.length - CHECKSUM_SIZE_IN_BYTES);
+ byte[] checkSum = ArrayUtils.subarray(aggregatedZNodeValues,
aggregatedZNodeValues.length - CHECKSUM_SIZE_IN_BYTES,
aggregatedZNodeValues.length);
+ byte[] expectedChecksum = getCRCChecksum(value);
+ if (!Arrays.equals(checkSum, expectedChecksum)) {
+ String exceptionMessage = String.format("Expected checksum: %s did not
match the actual checksum: %s for value: %s",
+ Arrays.toString(expectedChecksum), Arrays.toString(value),
Arrays.toString(checkSum));
+ LOG.error(exceptionMessage);
+ throw new IllegalStateException(exceptionMessage);
+ }
+ return value;
+ }
+ return null;
}
/**
@@ -72,9 +108,12 @@ public class ZkMetadataStore implements MetadataStore {
*/
@Override
public void put(String key, byte[] value) {
- String zkPath = getZkPathForKey(key);
- zkClient.createPersistent(zkPath, true);
- zkClient.writeData(zkPath, value);
+ List<byte[]> valueSegments = chunkMetadataStoreValue(value);
+ for (int segmentIndex = 0; segmentIndex < valueSegments.size();
segmentIndex++) {
+ String zkPath = getZkPath(key, segmentIndex);
+ zkClient.createPersistent(zkPath, true);
+ zkClient.writeData(zkPath, valueSegments.get(segmentIndex));
+ }
}
/**
@@ -82,7 +121,8 @@ public class ZkMetadataStore implements MetadataStore {
*/
@Override
public void delete(String key) {
- zkClient.delete(getZkPathForKey(key));
+ String zkPath = String.format("%s/%s", zkBaseDir, key);
+ zkClient.deleteRecursive(zkPath);
}
/**
@@ -91,22 +131,15 @@ public class ZkMetadataStore implements MetadataStore {
*/
@Override
public Map<String, byte[]> all() {
- try {
- List<String> zkSubDirectories = zkClient.getChildren(zkBaseDir);
- Map<String, byte[]> result = new HashMap<>();
- for (String zkSubDir : zkSubDirectories) {
- String completeZkPath = String.format("%s/%s", zkBaseDir, zkSubDir);
- byte[] value = zkClient.readData(completeZkPath, true);
- if (value != null) {
- result.put(zkSubDir, value);
- }
+ List<String> zkSubDirectories = zkClient.getChildren(zkBaseDir);
+ Map<String, byte[]> result = new HashMap<>();
+ for (String zkSubDir : zkSubDirectories) {
+ byte[] value = get(zkSubDir);
+ if (value != null) {
+ result.put(zkSubDir, value);
}
- return result;
- } catch (Exception e) {
- String errorMsg = String.format("Error reading path: %s from
zookeeper.", zkBaseDir);
- LOG.error(errorMsg, e);
- throw new SamzaException(errorMsg, e);
}
+ return result;
}
/**
@@ -125,7 +158,40 @@ public class ZkMetadataStore implements MetadataStore {
zkClient.close();
}
- private String getZkPathForKey(String key) {
- return String.format("%s/%s", zkBaseDir, key);
+ private String getZkPath(String key, int segmentIndex) {
+ return String.format("%s/%s/%d", zkBaseDir, key, segmentIndex);
+ }
+
+ /**
+ * Computes and returns the crc32 checksum of the input byte array.
+ * @param value the input byte array.
+ * @return the crc32 checksum of the byte array.
+ */
+ private static byte[] getCRCChecksum(byte[] value) {
+ CRC32 crc32 = new CRC32();
+ crc32.update(value);
+ long checksum = crc32.getValue();
+ return Longs.toByteArray(checksum);
+ }
+
+ /**
+ * Splits the input byte array value into independent byte array segments of
1 MB size.
+ * @param value the input byte array to split.
+ * @return the byte array splitted into independent byte array chunks.
+ */
+ private static List<byte[]> chunkMetadataStoreValue(byte[] value) {
+ try {
+ byte[] checksum = getCRCChecksum(value);
+ byte[] valueWithChecksum = ArrayUtils.addAll(value, checksum);
+ List<byte[]> valueSegments = new ArrayList<>();
+ int length = valueWithChecksum.length;
+ for (int index = 0; index < length; index +=
VALUE_SEGMENT_SIZE_IN_BYTES) {
+ byte[] valueSegment = ArrayUtils.subarray(valueWithChecksum, index,
Math.min(index + VALUE_SEGMENT_SIZE_IN_BYTES, length));
+ valueSegments.add(valueSegment);
+ }
+ return valueSegments;
+ } catch (Exception e) {
+ throw new SamzaException(String.format("Exception occurred when
splitting the value: %s to small chunks.", value), e);
+ }
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 8e3639e..94a45d6 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -38,6 +38,7 @@ import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.system.SystemStreamPartition;
@@ -68,6 +69,7 @@ public class TestZkJobCoordinator {
private final Config config;
private final JobModel jobModel;
+ private final MetadataStore metadataStore;
public TestZkJobCoordinator() {
Map<String, String> configMap = ImmutableMap.of(
@@ -83,6 +85,7 @@ public class TestZkJobCoordinator {
new TaskName("t1"), new TaskModel(new TaskName("t1"), ssps, new
Partition(0)));
ContainerModel containerModel = new ContainerModel("0", tasksForContainer);
jobModel = new JobModel(config, ImmutableMap.of("0", containerModel));
+ metadataStore = Mockito.mock(MetadataStore.class);
}
@Test
@@ -95,9 +98,9 @@ public class TestZkJobCoordinator {
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
- when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new
JobModel(new MapConfig(), new HashMap<>()));
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils, metadataStore));
+ doReturn(new JobModel(new MapConfig(), new
HashMap<>())).when(zkJobCoordinator).readJobModelFromMetadataStore(TEST_JOB_MODEL_VERSION);
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
jcShutdownLatch.countDown();
@@ -124,7 +127,7 @@ public class TestZkJobCoordinator {
ScheduleAfterDebounceTime mockDebounceTimer =
Mockito.mock(ScheduleAfterDebounceTime.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils, metadataStore));
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new
MetricsRegistryMap());
final ZkSessionStateChangedListener zkSessionStateChangedListener =
zkJobCoordinator.new ZkSessionStateChangedListener();
@@ -150,7 +153,7 @@ public class TestZkJobCoordinator {
ScheduleAfterDebounceTime mockDebounceTimer =
Mockito.mock(ScheduleAfterDebounceTime.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils, metadataStore));
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new
MetricsRegistryMap());
final ZkSessionStateChangedListener zkSessionStateChangedListener =
zkJobCoordinator.new ZkSessionStateChangedListener();
@@ -181,7 +184,7 @@ public class TestZkJobCoordinator {
ScheduleAfterDebounceTime mockDebounceTimer =
Mockito.mock(ScheduleAfterDebounceTime.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils, metadataStore));
StreamPartitionCountMonitor monitor =
Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
@@ -204,7 +207,7 @@ public class TestZkJobCoordinator {
ScheduleAfterDebounceTime mockDebounceTimer =
Mockito.mock(ScheduleAfterDebounceTime.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils, metadataStore));
StreamPartitionCountMonitor monitor =
Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
@@ -231,7 +234,7 @@ public class TestZkJobCoordinator {
ScheduleAfterDebounceTime mockDebounceTimer =
Mockito.mock(ScheduleAfterDebounceTime.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new
NoOpMetricsRegistry(), zkUtils, metadataStore));
StreamPartitionCountMonitor monitor =
Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
@@ -254,7 +257,7 @@ public class TestZkJobCoordinator {
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
StartpointManager mockStartpointManager =
Mockito.mock(StartpointManager.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(),
zkUtils));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(),
zkUtils, metadataStore));
doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager(any(CoordinatorStreamStore.class));
doReturn(mock(CoordinatorStreamStore.class)).when(zkJobCoordinator).createCoordinatorStreamStore();
@@ -284,7 +287,7 @@ public class TestZkJobCoordinator {
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
StartpointManager mockStartpointManager =
Mockito.mock(StartpointManager.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(),
zkUtils));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(),
zkUtils, metadataStore));
doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager(any(CoordinatorStreamStore.class));
doReturn(mock(CoordinatorStreamStore.class)).when(zkJobCoordinator).createCoordinatorStreamStore();
@@ -293,7 +296,6 @@ public class TestZkJobCoordinator {
zkJobCoordinator.doOnProcessorChange();
- verify(zkUtils).publishJobModel(anyString(), eq(jobModel));
verify(zkUtils).publishJobModelVersion(anyString(), anyString());
verify(zkJobCoordinator).loadMetadataResources(eq(jobModel));
}
diff --git
a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
index 4d53222..94f188e 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
@@ -20,6 +20,7 @@ package org.apache.samza.zk;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
+import java.util.Random;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
@@ -38,6 +39,10 @@ public class TestZkMetadataStore {
private static final String LOCALHOST = "127.0.0.1";
+ private static final Random RANDOM = new Random();
+
+ private static final int VALUE_SIZE_IN_BYTES = 1024 * 1024 * 10; // 10 MB
+
private static EmbeddedZookeeper zkServer;
private MetadataStore zkMetadataStore;
@@ -68,7 +73,7 @@ public class TestZkMetadataStore {
@Test
public void testReadAfterWrite() throws Exception {
String key = "test-key1";
- byte[] value = "test-value1".getBytes("UTF-8");
+ byte[] value = getRandomByteArray(VALUE_SIZE_IN_BYTES);
Assert.assertNull(zkMetadataStore.get(key));
zkMetadataStore.put(key, value);
Assert.assertTrue(Arrays.equals(value, zkMetadataStore.get(key)));
@@ -78,7 +83,7 @@ public class TestZkMetadataStore {
@Test
public void testReadAfterDelete() throws Exception {
String key = "test-key1";
- byte[] value = "test-value1".getBytes("UTF-8");
+ byte[] value = getRandomByteArray(VALUE_SIZE_IN_BYTES);
Assert.assertNull(zkMetadataStore.get(key));
zkMetadataStore.put(key, value);
Assert.assertTrue(Arrays.equals(value, zkMetadataStore.get(key)));
@@ -96,8 +101,8 @@ public class TestZkMetadataStore {
@Test
public void testMultipleUpdatesForSameKey() throws Exception {
String key = "test-key1";
- byte[] value = "test-value1".getBytes("UTF-8");
- byte[] value1 = "test-value2".getBytes("UTF-8");
+ byte[] value = getRandomByteArray(VALUE_SIZE_IN_BYTES);
+ byte[] value1 = getRandomByteArray(VALUE_SIZE_IN_BYTES);
zkMetadataStore.put(key, value);
zkMetadataStore.put(key, value1);
Assert.assertTrue(Arrays.equals(value1, zkMetadataStore.get(key)));
@@ -109,13 +114,19 @@ public class TestZkMetadataStore {
String key = "test-key1";
String key1 = "test-key2";
String key2 = "test-key3";
- byte[] value = "test-value1".getBytes("UTF-8");
- byte[] value1 = "test-value2".getBytes("UTF-8");
- byte[] value2 = "test-value3".getBytes("UTF-8");
+ byte[] value = getRandomByteArray(VALUE_SIZE_IN_BYTES);
+ byte[] value1 = getRandomByteArray(VALUE_SIZE_IN_BYTES);
+ byte[] value2 = getRandomByteArray(VALUE_SIZE_IN_BYTES);
zkMetadataStore.put(key, value);
zkMetadataStore.put(key1, value1);
zkMetadataStore.put(key2, value2);
ImmutableMap<String, byte[]> expected = ImmutableMap.of(key, value, key1,
value1, key2, value2);
Assert.assertEquals(expected.size(), zkMetadataStore.all().size());
}
+
+ private static byte[] getRandomByteArray(int numBytes) {
+ byte[] byteArray = new byte[numBytes];
+ RANDOM.nextBytes(byteArray);
+ return byteArray;
+ }
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index d234590..754939f 100644
---
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -56,6 +56,7 @@ import
org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metadatastore.MetadataStore;
@@ -69,6 +70,7 @@ import org.apache.samza.test.StandaloneTestUtils;
import org.apache.samza.test.harness.IntegrationTestHarness;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.zk.ZkMetadataStore;
import org.apache.samza.zk.ZkStringSerializer;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.apache.samza.zk.ZkKeyBuilder;
@@ -118,6 +120,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
private ApplicationConfig applicationConfig3;
private String testStreamAppName;
private String testStreamAppId;
+ private MetadataStore zkMetadataStore;
@Rule
public Timeout testTimeOutInMillis = new Timeout(150000);
@@ -165,6 +168,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
.collect(Collectors.toList());
assertTrue("Encountered errors during test setup. Failed to create
topics.", createTopics(newTopics));
+ zkMetadataStore = new
ZkMetadataStore(zkUtils.getKeyBuilder().getRootPath(), new
MapConfig(configMap), new NoOpMetricsRegistry());
}
@Override
@@ -270,7 +274,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
TestStreamApplication.StreamApplicationCallback callback = m -> {
if (hasSecondProcessorJoined.compareAndSet(false, true)) {
previousJobModelVersion[0] = zkUtils.getJobModelVersion();
- previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
+ previousJobModel[0] =
JobModelUtil.readJobModel(previousJobModelVersion[0], zkMetadataStore);
executeRun(appRunner2, localTestConfig2);
try {
// Wait for appRunner2 to register with zookeeper.
@@ -292,7 +296,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
kafkaEventsConsumedLatch.await();
String currentJobModelVersion = zkUtils.getJobModelVersion();
- JobModel updatedJobModel = zkUtils.getJobModel(currentJobModelVersion);
+ JobModel updatedJobModel =
JobModelUtil.readJobModel(currentJobModelVersion, zkMetadataStore);
// Job model before and after the addition of second stream processor
should be the same.
assertEquals(previousJobModel[0], updatedJobModel);
@@ -352,7 +356,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
TestStreamApplication.StreamApplicationCallback streamApplicationCallback
= message -> {
if (hasSecondProcessorJoined.compareAndSet(false, true)) {
previousJobModelVersion[0] = zkUtils.getJobModelVersion();
- previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
+ previousJobModel[0] =
JobModelUtil.readJobModel(previousJobModelVersion[0], zkMetadataStore);
executeRun(appRunner2, testAppConfig2);
try {
// Wait for appRunner2 to register with zookeeper.
@@ -376,7 +380,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
kafkaEventsConsumedLatch.await();
String currentJobModelVersion = zkUtils.getJobModelVersion();
- JobModel updatedJobModel = zkUtils.getJobModel(currentJobModelVersion);
+ JobModel updatedJobModel =
JobModelUtil.readJobModel(currentJobModelVersion, zkMetadataStore);
// JobModelVersion check to verify that leader publishes new jobModel.
assertTrue(Integer.parseInt(previousJobModelVersion[0]) <
Integer.parseInt(currentJobModelVersion));
@@ -399,7 +403,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
TaskModel taskModel1 =
updatedTaskModelMap1.values().stream().findFirst().get();
TaskModel taskModel2 =
updatedTaskModelMap2.values().stream().findFirst().get();
assertEquals(taskModel1.getSystemStreamPartitions(),
taskModel2.getSystemStreamPartitions());
-
assertTrue(!taskModel1.getTaskName().getTaskName().equals(taskModel2.getTaskName().getTaskName()));
+
assertFalse(taskModel1.getTaskName().getTaskName().equals(taskModel2.getTaskName().getTaskName()));
processedMessagesLatch.await();
@@ -441,7 +445,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
// Verifications before killing the leader.
String jobModelVersion = zkUtils.getJobModelVersion();
- JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
+ JobModel jobModel = JobModelUtil.readJobModel(jobModelVersion,
zkMetadataStore);
assertEquals(2, jobModel.getContainers().size());
assertEquals(Sets.newHashSet("0000000000", "0000000001"),
jobModel.getContainers().keySet());
assertEquals("1", jobModelVersion);
@@ -467,7 +471,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
assertEquals(2, processorIdsFromZK.size());
assertEquals(PROCESSOR_IDS[1], processorIdsFromZK.get(0));
jobModelVersion = zkUtils.getJobModelVersion();
- jobModel = zkUtils.getJobModel(jobModelVersion);
+ jobModel = JobModelUtil.readJobModel(jobModelVersion, zkMetadataStore);
assertEquals(Sets.newHashSet("0000000001", "0000000002"),
jobModel.getContainers().keySet());
assertEquals(2, jobModel.getContainers().size());
@@ -560,7 +564,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
// Read job model before rolling upgrade.
String jobModelVersion = zkUtils.getJobModelVersion();
- JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
+ JobModel jobModel = JobModelUtil.readJobModel(jobModelVersion,
zkMetadataStore);
appRunner1.kill();
appRunner1.waitForFinish();
@@ -584,7 +588,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
// Read new job model after rolling upgrade.
String newJobModelVersion = zkUtils.getJobModelVersion();
- JobModel newJobModel = zkUtils.getJobModel(newJobModelVersion);
+ JobModel newJobModel = JobModelUtil.readJobModel(newJobModelVersion,
zkMetadataStore);
assertEquals(Integer.parseInt(jobModelVersion) + 1,
Integer.parseInt(newJobModelVersion));
assertEquals(jobModel.getContainers(), newJobModel.getContainers());
@@ -685,8 +689,8 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
processedMessagesLatch1.await();
String jobModelVersion = zkUtils.getJobModelVersion();
- JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
- Set<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
+ JobModel jobModel = JobModelUtil.readJobModel(jobModelVersion,
zkMetadataStore);
+ List<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
// Validate that the input partition count is 5 in the JobModel.
Assert.assertEquals(5, ssps.size());
@@ -702,7 +706,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
}
String newJobModelVersion = zkUtils.getJobModelVersion();
- JobModel newJobModel = zkUtils.getJobModel(newJobModelVersion);
+ JobModel newJobModel = JobModelUtil.readJobModel(newJobModelVersion,
zkMetadataStore);
ssps = getSystemStreamPartitions(newJobModel);
// Validate that the input partition count is 100 in the new JobModel.
@@ -777,8 +781,8 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
// Read the latest JobModel for validation.
String jobModelVersion = zkUtils.getJobModelVersion();
- JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
- Set<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
+ JobModel jobModel = JobModelUtil.readJobModel(jobModelVersion,
zkMetadataStore);
+ List<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
// Validate that the input partition count is 32 in the JobModel.
Assert.assertEquals(32, ssps.size());
@@ -795,7 +799,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
while (true) {
LOGGER.info("Waiting for new jobModel to be published");
jobModelVersion = zkUtils.getJobModelVersion();
- jobModel = zkUtils.getJobModel(jobModelVersion);
+ jobModel = JobModelUtil.readJobModel(jobModelVersion, zkMetadataStore);
ssps = getSystemStreamPartitions(jobModel);
if (ssps.size() == 64) {
@@ -881,8 +885,8 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
// Read the latest JobModel for validation.
String jobModelVersion = zkUtils.getJobModelVersion();
- JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
- Set<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
+ JobModel jobModel = JobModelUtil.readJobModel(jobModelVersion,
zkMetadataStore);
+ List<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
// Validate that the input 64 partitions are present in JobModel.
Assert.assertEquals(64, ssps.size());
@@ -902,7 +906,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
while (true) {
LOGGER.info("Waiting for new jobModel to be published");
jobModelVersion = zkUtils.getJobModelVersion();
- jobModel = zkUtils.getJobModel(jobModelVersion);
+ jobModel = JobModelUtil.readJobModel(jobModelVersion, zkMetadataStore);
ssps = getSystemStreamPartitions(jobModel);
if (ssps.size() == 128) {
@@ -964,27 +968,6 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
}
/**
- * Computes the task to partition assignment of the {@param JobModel}.
- * @param jobModel the jobModel to compute task to partition assignment for.
- * @return the computed task to partition assignments of the {@param
JobModel}.
- */
- private static Map<TaskName, Set<SystemStreamPartition>>
getTaskAssignments(JobModel jobModel) {
- Map<TaskName, Set<SystemStreamPartition>> taskAssignments = new
HashMap<>();
- for (Map.Entry<String, ContainerModel> entry :
jobModel.getContainers().entrySet()) {
- Map<TaskName, TaskModel> tasks = entry.getValue().getTasks();
- for (TaskModel taskModel : tasks.values()) {
- if (!taskAssignments.containsKey(taskModel.getTaskName())) {
- taskAssignments.put(taskModel.getTaskName(), new HashSet<>());
- }
- if (taskModel.getTaskMode() == TaskMode.Active) {
-
taskAssignments.get(taskModel.getTaskName()).addAll(taskModel.getSystemStreamPartitions());
- }
- }
- }
- return taskAssignments;
- }
-
- /**
* Test if two processors coming up at the same time agree on a single runid
* 1. bring up two processors
* 2. wait till they start consuimg messages
@@ -1247,10 +1230,29 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
appRunner3.waitForFinish();
}
+ /**
+ * Computes the task to partition assignment of the {@param JobModel}.
+ * @param jobModel the jobModel to compute task to partition assignment for.
+ * @return the computed task to partition assignments of the {@param
JobModel}.
+ */
+ private static Map<TaskName, Set<SystemStreamPartition>>
getTaskAssignments(JobModel jobModel) {
+ Map<TaskName, Set<SystemStreamPartition>> taskAssignments = new
HashMap<>();
+ for (Map.Entry<String, ContainerModel> entry :
jobModel.getContainers().entrySet()) {
+ Map<TaskName, TaskModel> tasks = entry.getValue().getTasks();
+ for (TaskModel taskModel : tasks.values()) {
+ if (!taskAssignments.containsKey(taskModel.getTaskName())) {
+ taskAssignments.put(taskModel.getTaskName(), new HashSet<>());
+ }
+ if (taskModel.getTaskMode() == TaskMode.Active) {
+
taskAssignments.get(taskModel.getTaskName()).addAll(taskModel.getSystemStreamPartitions());
+ }
+ }
+ }
+ return taskAssignments;
+ }
- private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel
jobModel) {
- System.out.println(jobModel);
- Set<SystemStreamPartition> ssps = new HashSet<>();
+ private static List<SystemStreamPartition>
getSystemStreamPartitions(JobModel jobModel) {
+ List<SystemStreamPartition> ssps = new ArrayList<>();
jobModel.getContainers().forEach((containerName, containerModel) -> {
containerModel.getTasks().forEach((taskName, taskModel) ->
ssps.addAll(taskModel.getSystemStreamPartitions()));
});