This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 5b9e4a8 SAMZA-2158: Remove the redunant coordinator stream reads in
the ApplicationMaster startup sequence. (#987)
5b9e4a8 is described below
commit 5b9e4a8bcd88c97b50d2229b52950c7c18a2235e
Author: shanthoosh <[email protected]>
AuthorDate: Mon Apr 8 14:36:52 2019 -0700
SAMZA-2158: Remove the redunant coordinator stream reads in the
ApplicationMaster startup sequence. (#987)
* Create only one coordinator metadata store connection in samza-yarn
application master.
* Fix check-style errors.
* Minor code cleanup.
* Address review comments.
* More code cleanup.
* Fix the import order.
* Fix compilation errors.
* Close coordinator stream store even on exceptions.
* Switch from using unordered list type to paragraph type to format the
javadocs.
* Address review comments.
* Remove unnecessary <p> tag from javadocs.
---
.../apache/samza/container/LocalityManager.java | 43 ++----
.../grouper/task/TaskAssignmentManager.java | 55 +++-----
.../task/TaskPartitionAssignmentManager.java | 29 ++---
.../CoordinatorStreamMetadataStoreFactory.java | 2 +-
.../metadatastore/CoordinatorStreamStore.java | 144 ++++++++++++++++-----
.../NamespaceAwareCoordinatorStreamStore.java | 112 ++++++++++++++++
.../apache/samza/processor/StreamProcessor.java | 2 +-
.../apache/samza/runtime/ContainerLaunchUtil.java | 129 ++++++++++--------
.../apache/samza/startpoint/StartpointManager.java | 50 +++----
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 4 +-
.../apache/samza/container/SamzaContainer.scala | 27 +---
.../apache/samza/coordinator/JobModelManager.scala | 45 +++++--
.../samza/container/TestLocalityManager.java | 49 +++----
.../grouper/task/TestTaskAssignmentManager.java | 39 ++----
.../task/TestTaskPartitionAssignmentManager.java | 29 ++---
.../CoordinatorStreamStoreTestUtil.java | 64 +++++++++
.../metadatastore/TestCoordinatorStreamStore.java | 62 ++++-----
.../TestNamespaceAwareCoordinatorStreamStore.java | 88 +++++++++++++
.../TestCoordinatorStreamSystemProducer.java | 1 +
.../startpoint/StartpointManagerTestUtil.java} | 24 ++--
.../samza/startpoint/TestStartpointManager.java | 37 +++---
.../samza/checkpoint/TestOffsetManager.scala | 6 +-
.../samza/rest/proxy/task/SamzaTaskProxy.java | 9 +-
.../processor/TestZkLocalApplicationRunner.java | 4 +-
24 files changed, 671 insertions(+), 383 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 24ddc23..05f2e8b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -19,19 +19,13 @@
package org.apache.samza.container;
-import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
-import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,32 +39,17 @@ public class LocalityManager {
private final MetadataStore metadataStore;
/**
- * Builds the LocalityManager based upon {@link Config} and {@link
MetricsRegistry}.
- * Uses the {@link CoordinatorStreamValueSerde} to serialize messages before
- * reading/writing into metadata store.
+ * Builds the LocalityManager based upon the provided {@link MetadataStore}
that is instantiated.
+ * Setting up a metadata store instance is expensive which requires opening
multiple connections
+ * and reading tons of information. Fully instantiated metadata store is
taken as a constructor argument
+ * to reuse it across different utility classes. Uses the {@link
CoordinatorStreamValueSerde} to serialize
+ * messages before reading/writing into metadata store.
*
- * @param config the configuration required for setting up metadata store.
- * @param metricsRegistry the registry for reporting metrics.
+ * @param metadataStore an instance of {@link MetadataStore} to read/write
the container locality.
*/
- public LocalityManager(Config config, MetricsRegistry metricsRegistry) {
- this(config, metricsRegistry, new
CoordinatorStreamValueSerde(SetContainerHostMapping.TYPE));
- }
-
- /**
- * Builds the LocalityManager based upon {@link Config} and {@link
MetricsRegistry}.
- * Uses keySerde, valueSerde to serialize/deserialize (key, value) pairs
before reading/writing
- * into {@link MetadataStore}.
- *
- * Key and value serializer are different for yarn (uses
CoordinatorStreamMessage) and standalone (native ObjectOutputStream for
serialization) modes.
- * @param config the configuration required for setting up metadata store.
- * @param metricsRegistry the registry for reporting metrics.
- * @param valueSerde the value serializer.
- */
- LocalityManager(Config config, MetricsRegistry metricsRegistry,
Serde<String> valueSerde) {
- MetadataStoreFactory metadataStoreFactory = Util.getObj(new
JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
- this.metadataStore =
metadataStoreFactory.getMetadataStore(SetContainerHostMapping.TYPE, config,
metricsRegistry);
- this.metadataStore.init();
- this.valueSerde = valueSerde;
+ public LocalityManager(MetadataStore metadataStore) {
+ this.metadataStore = metadataStore;
+ this.valueSerde = new
CoordinatorStreamValueSerde(SetContainerHostMapping.TYPE);
}
/**
@@ -84,7 +63,9 @@ public class LocalityManager {
metadataStore.all().forEach((containerId, valueBytes) -> {
if (valueBytes != null) {
String locationId = valueSerde.fromBytes(valueBytes);
- allMappings.put(containerId,
ImmutableMap.of(SetContainerHostMapping.HOST_KEY, locationId));
+ Map<String, String> values = new HashMap<>();
+ values.put(SetContainerHostMapping.HOST_KEY, locationId);
+ allMappings.put(containerId, values);
}
});
if (LOG.isDebugEnabled()) {
diff --git
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 0bac04d..16f8a51 100644
---
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -18,22 +18,17 @@
*/
package org.apache.samza.container.grouper.task;
+import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
-import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +40,6 @@ public class TaskAssignmentManager {
private static final Logger LOG =
LoggerFactory.getLogger(TaskAssignmentManager.class);
private final Map<String, String> taskNameToContainerId = new HashMap<>();
- private final Serde<String> keySerde;
private final Serde<String> containerIdSerde;
private final Serde<String> taskModeSerde;
@@ -53,42 +47,23 @@ public class TaskAssignmentManager {
private MetadataStore taskModeMappingMetadataStore;
/**
- * Builds the TaskAssignmentManager based upon {@link Config} and {@link
MetricsRegistry}.
- * Uses {@link CoordinatorStreamValueSerde} to serialize messages before
reading/writing
- * into the metadata store.
+ * Builds the TaskAssignmentManager based upon the provided {@link
MetadataStore} that is instantiated.
+ * Setting up a metadata store instance is expensive which requires opening
multiple connections
+ * and reading tons of information. Fully instantiated metadata store is
taken as a constructor argument
+ * to reuse it across different utility classes. Uses the {@link
CoordinatorStreamValueSerde} to serialize
+ * messages before reading/writing into metadata store.
*
- * @param config the configuration required for setting up metadata store.
- * @param metricsRegistry the registry for reporting metrics.
+ * @param taskContainerMappingMetadataStore an instance of {@link
MetadataStore} used to read/write the task to container assignments.
+ * @param taskModeMappingMetadataStore an instance of {@link MetadataStore}
used to read/write the task to mode assignments.
*/
- public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry)
{
- this(config, metricsRegistry, new
CoordinatorStreamKeySerde(SetTaskContainerMapping.TYPE),
- new CoordinatorStreamValueSerde(SetTaskContainerMapping.TYPE), new
CoordinatorStreamValueSerde(SetTaskModeMapping.TYPE));
- }
-
- /**
- * Builds the LocalityManager based upon {@link Config} and {@link
MetricsRegistry}.
- *
- * Uses keySerde, containerIdSerde to serialize/deserialize (key, value)
pairs before reading/writing
- * into {@link MetadataStore}.
- *
- * Key and value serializer are different for yarn(uses
CoordinatorStreamMessage) and standalone(uses native
- * ObjectOutputStream for serialization) modes.
- * @param config the configuration required for setting up metadata store.
- * @param metricsRegistry the registry for reporting metrics.
- * @param keySerde the key serializer.
- * @param containerIdSerde the value serializer.
- * @param taskModeSerde the task-mode serializer.
- */
- public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry,
Serde<String> keySerde, Serde<String> containerIdSerde, Serde<String>
taskModeSerde) {
- this.keySerde = keySerde;
- this.containerIdSerde = containerIdSerde;
- this.taskModeSerde = taskModeSerde;
+ public TaskAssignmentManager(MetadataStore
taskContainerMappingMetadataStore, MetadataStore taskModeMappingMetadataStore) {
+ Preconditions.checkNotNull(taskContainerMappingMetadataStore, "Metadata
store cannot be null");
+ Preconditions.checkNotNull(taskModeMappingMetadataStore, "Metadata store
cannot be null");
- MetadataStoreFactory metadataStoreFactory = Util.getObj(new
JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
- this.taskModeMappingMetadataStore =
metadataStoreFactory.getMetadataStore(SetTaskModeMapping.TYPE, config,
metricsRegistry);
- this.taskContainerMappingMetadataStore =
metadataStoreFactory.getMetadataStore(SetTaskContainerMapping.TYPE, config,
metricsRegistry);
- this.taskModeMappingMetadataStore.init();
- this.taskContainerMappingMetadataStore.init();
+ this.taskModeMappingMetadataStore = taskModeMappingMetadataStore;
+ this.taskContainerMappingMetadataStore = taskContainerMappingMetadataStore;
+ this.containerIdSerde = new
CoordinatorStreamValueSerde(SetTaskContainerMapping.TYPE);
+ this.taskModeSerde = new
CoordinatorStreamValueSerde(SetTaskModeMapping.TYPE);
}
/**
diff --git
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
index c40d591..7e32f0a 100644
---
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
+++
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
@@ -18,18 +18,14 @@
*/
package org.apache.samza.container.grouper.task;
+import com.google.common.base.Preconditions;
import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.Util;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
@@ -54,20 +50,19 @@ public class TaskPartitionAssignmentManager {
private final MetadataStore metadataStore;
/**
- * Instantiates the task partition assignment manager with the provided
metricsRegistry
- * and config.
- * @param config the configuration required for connecting with the metadata
store.
- * @param metricsRegistry the registry to create and report custom metrics.
+ * Builds the TaskPartitionAssignmentManager based upon the provided {@link
MetadataStore} that is instantiated.
+ * Setting up a metadata store instance is expensive which requires opening
multiple connections
+ * and reading tons of information. Fully instantiated metadata store is
taken as a constructor argument
+ * to reuse it across different utility classes. Uses the {@link
CoordinatorStreamValueSerde} to serialize
+ * messages before reading/writing into metadata store.
+ *
+ * @param metadataStore an instance of {@link MetadataStore} used to
read/write the task to partition assignments.
*/
- public TaskPartitionAssignmentManager(Config config, MetricsRegistry
metricsRegistry) {
- this(config, metricsRegistry, new
CoordinatorStreamValueSerde(SetTaskPartitionMapping.TYPE));
- }
+ public TaskPartitionAssignmentManager(MetadataStore metadataStore) {
+ Preconditions.checkNotNull(metadataStore, "Metdatastore cannot be null.");
- TaskPartitionAssignmentManager(Config config, MetricsRegistry
metricsRegistry, Serde<String> valueSerde) {
- this.valueSerde = valueSerde;
- MetadataStoreFactory metadataStoreFactory = Util.getObj(new
JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
- this.metadataStore =
metadataStoreFactory.getMetadataStore(SetTaskPartitionMapping.TYPE, config,
metricsRegistry);
- this.metadataStore.init();
+ this.metadataStore = metadataStore;
+ this.valueSerde = new
CoordinatorStreamValueSerde(SetTaskPartitionMapping.TYPE);
}
/**
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
index c38dc56..65df26e 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
@@ -31,6 +31,6 @@ public class CoordinatorStreamMetadataStoreFactory implements
MetadataStoreFacto
@Override
public MetadataStore getMetadataStore(String namespace, Config config,
MetricsRegistry metricsRegistry) {
- return new CoordinatorStreamStore(namespace, config, metricsRegistry);
+ return new CoordinatorStreamStore(config, metricsRegistry);
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index 899c87c..87121f1 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -18,15 +18,18 @@
*/
package org.apache.samza.coordinator.metadatastore;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
+import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
-import java.util.Objects;
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
@@ -46,18 +49,24 @@ import org.apache.samza.system.SystemStreamMetadata;
import
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.util.CoordinatorStreamUtil;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * An implementation of the {@link MetadataStore} interface where the metadata
of the Samza job is stored in coordinator stream.
+ * An implementation of the {@link MetadataStore} interface where the metadata
of the samza job is stored in coordinator stream.
*
* This class is thread safe.
+ *
+ * It is recommended to use {@link NamespaceAwareCoordinatorStreamStore}. This
will enable the single CoordinatorStreamStore connection
+ * to be shared by the multiple {@link NamespaceAwareCoordinatorStreamStore}
instances.
*/
public class CoordinatorStreamStore implements MetadataStore {
private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorStreamStore.class);
private static final String SOURCE = "SamzaContainer";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final Config config;
private final SystemStream coordinatorSystemStream;
@@ -65,19 +74,17 @@ public class CoordinatorStreamStore implements
MetadataStore {
private final SystemProducer systemProducer;
private final SystemConsumer systemConsumer;
private final SystemAdmin systemAdmin;
- private final String type;
- private final CoordinatorStreamKeySerde keySerde;
- private final Map<String, byte[]> bootstrappedMessages = new HashMap<>();
+ // Namespaced key to the message byte array.
+ private final Map<String, byte[]> messagesReadFromCoordinatorStream = new
ConcurrentHashMap<>();
+
private final Object bootstrapLock = new Object();
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
private SystemStreamPartitionIterator iterator;
- public CoordinatorStreamStore(String namespace, Config config,
MetricsRegistry metricsRegistry) {
+ public CoordinatorStreamStore(Config config, MetricsRegistry
metricsRegistry) {
this.config = config;
- this.type = namespace;
- this.keySerde = new CoordinatorStreamKeySerde(type);
this.coordinatorSystemStream =
CoordinatorStreamUtil.getCoordinatorSystemStream(config);
this.coordinatorSystemStreamPartition = new
SystemStreamPartition(coordinatorSystemStream, new Partition(0));
SystemFactory systemFactory =
CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
@@ -86,6 +93,16 @@ public class CoordinatorStreamStore implements MetadataStore
{
this.systemAdmin =
systemFactory.getAdmin(this.coordinatorSystemStream.getSystem(), config);
}
+ @VisibleForTesting
+ CoordinatorStreamStore(Config config, SystemProducer systemProducer,
SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
+ this.config = config;
+ this.systemConsumer = systemConsumer;
+ this.systemProducer = systemProducer;
+ this.systemAdmin = systemAdmin;
+ this.coordinatorSystemStream =
CoordinatorStreamUtil.getCoordinatorSystemStream(config);
+ this.coordinatorSystemStreamPartition = new
SystemStreamPartition(coordinatorSystemStream, new Partition(0));
+ }
+
@Override
public void init() {
if (isInitialized.compareAndSet(false, true)) {
@@ -95,41 +112,44 @@ public class CoordinatorStreamStore implements
MetadataStore {
systemProducer.register(SOURCE);
systemProducer.start();
iterator = new SystemStreamPartitionIterator(systemConsumer,
coordinatorSystemStreamPartition);
- bootstrapMessagesFromStream();
+ readMessagesFromCoordinatorStream();
} else {
LOG.info("Store had already been initialized. Skipping.",
coordinatorSystemStreamPartition);
}
}
@Override
- public byte[] get(String key) {
- bootstrapMessagesFromStream();
- return bootstrappedMessages.get(key);
+ public byte[] get(String namespacedKey) {
+ readMessagesFromCoordinatorStream();
+ return messagesReadFromCoordinatorStream.get(namespacedKey);
}
@Override
- public void put(String key, byte[] value) {
- OutgoingMessageEnvelope envelope = new
OutgoingMessageEnvelope(coordinatorSystemStream, 0, keySerde.toBytes(key),
value);
+ public void put(String namespacedKey, byte[] value) {
+ // 1. Store the namespace and key into correct fields of the
CoordinatorStreamKey and convert the key to bytes.
+ CoordinatorMessageKey coordinatorMessageKey =
deserializeCoordinatorMessageKeyFromJson(namespacedKey);
+ CoordinatorStreamKeySerde keySerde = new
CoordinatorStreamKeySerde(coordinatorMessageKey.getNamespace());
+ byte[] keyBytes = keySerde.toBytes(coordinatorMessageKey.getKey());
+
+ // 2. Set the key, message in correct fields of {@link
OutgoingMessageEnvelope} and publish it to the coordinator stream.
+ OutgoingMessageEnvelope envelope = new
OutgoingMessageEnvelope(coordinatorSystemStream, 0, keyBytes, value);
systemProducer.send(SOURCE, envelope);
flush();
}
@Override
- public void delete(String key) {
- // Since kafka doesn't support individual message deletion, store value as
null for a key to delete.
- put(key, null);
+ public void delete(String namespacedKey) {
+ // Since kafka doesn't support individual message deletion, store value as
null for a namespacedKey to delete.
+ put(namespacedKey, null);
}
@Override
public Map<String, byte[]> all() {
- bootstrapMessagesFromStream();
- return Collections.unmodifiableMap(bootstrappedMessages);
+ readMessagesFromCoordinatorStream();
+ return Collections.unmodifiableMap(messagesReadFromCoordinatorStream);
}
- /**
- * Returns all the messages from the earliest offset all the way to the
latest.
- */
- private void bootstrapMessagesFromStream() {
+ private void readMessagesFromCoordinatorStream() {
synchronized (bootstrapLock) {
while (iterator.hasNext()) {
IncomingMessageEnvelope envelope = iterator.next();
@@ -137,12 +157,11 @@ public class CoordinatorStreamStore implements
MetadataStore {
Serde<List<?>> serde = new JsonSerde<>();
Object[] keyArray = serde.fromBytes(keyAsBytes).toArray();
CoordinatorStreamMessage coordinatorStreamMessage = new
CoordinatorStreamMessage(keyArray, new HashMap<>());
- if (Objects.equals(coordinatorStreamMessage.getType(), type)) {
- if (envelope.getMessage() != null) {
- bootstrappedMessages.put(coordinatorStreamMessage.getKey(),
(byte[]) envelope.getMessage());
- } else {
- bootstrappedMessages.remove(coordinatorStreamMessage.getKey());
- }
+ String namespacedKey =
serializeCoordinatorMessageKeyToJson(coordinatorStreamMessage.getType(),
coordinatorStreamMessage.getKey());
+ if (envelope.getMessage() != null) {
+ messagesReadFromCoordinatorStream.put(namespacedKey, (byte[])
envelope.getMessage());
+ } else {
+ messagesReadFromCoordinatorStream.remove(namespacedKey);
}
}
}
@@ -169,6 +188,12 @@ public class CoordinatorStreamStore implements
MetadataStore {
}
}
+ /**
+ * <p>
+ * Fetches the metadata of the topic partition of coordinator stream.
Registers the oldest offset
+ * for the topic partition of coordinator stream with the coordinator
system consumer.
+ * </p>
+ */
private void registerConsumer() {
LOG.debug("Attempting to register system stream partition: {}",
coordinatorSystemStreamPartition);
String streamName = coordinatorSystemStreamPartition.getStream();
@@ -184,4 +209,65 @@ public class CoordinatorStreamStore implements
MetadataStore {
LOG.info("Registering system stream partition: {} with offset: {}.",
coordinatorSystemStreamPartition, startingOffset);
systemConsumer.register(coordinatorSystemStreamPartition, startingOffset);
}
+
+ /**
+ *
+ * Serializes the {@link CoordinatorMessageKey} into a json string.
+ *
+ * @param type the type of the coordinator message.
+ * @param key the key associated with the type
+ * @return the CoordinatorMessageKey serialized to a json string.
+ */
+ public static String serializeCoordinatorMessageKeyToJson(String type,
String key) {
+ try {
+ CoordinatorMessageKey coordinatorMessageKey = new
CoordinatorMessageKey(key, type);
+ return OBJECT_MAPPER.writeValueAsString(coordinatorMessageKey);
+ } catch (IOException e) {
+ throw new SamzaException(String.format("Exception occurred when
serializing metadata for type: %s, key: %s", type, key), e);
+ }
+ }
+
+ /**
+ * Deserializes the @param coordinatorMsgKeyAsString in json format to
{@link CoordinatorMessageKey}.
+ * @param coordinatorMsgKeyAsJson the serialized CoordinatorMessageKey in
json format.
+ * @return the deserialized CoordinatorMessageKey.
+ */
+ public static CoordinatorMessageKey
deserializeCoordinatorMessageKeyFromJson(String coordinatorMsgKeyAsJson) {
+ try {
+ return OBJECT_MAPPER.readValue(coordinatorMsgKeyAsJson,
CoordinatorMessageKey.class);
+ } catch (IOException e) {
+ throw new SamzaException(String.format("Exception occurred when
deserializing the coordinatorMsgKey: %s", coordinatorMsgKeyAsJson), e);
+ }
+ }
+
+ /**
+ * <p>
+ * Represents the key of a message in the coordinator stream.
+ *
+ * Coordinator message key is composite. It has both the type of the message
+ * and the key associated with the type in it.
+ * </p>
+ */
+ public static class CoordinatorMessageKey {
+
+ // Represents the key associated with the type
+ private final String key;
+
+ // Represents the type of the message.
+ private final String namespace;
+
+ CoordinatorMessageKey(@JsonProperty("key") String key,
+ @JsonProperty("namespace") String namespace) {
+ this.key = key;
+ this.namespace = namespace;
+ }
+
+ public String getKey() {
+ return this.key;
+ }
+
+ public String getNamespace() {
+ return this.namespace;
+ }
+ }
}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
new file mode 100644
index 0000000..8a28fae
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.metadatastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.CoordinatorMessageKey;
+import org.apache.samza.metadatastore.MetadataStore;
+
+/**
+ * Provides a namespace aware read/write operations on top of {@link
CoordinatorStreamStore}.
+ */
+public class NamespaceAwareCoordinatorStreamStore implements MetadataStore {
+
+ private final MetadataStore metadataStore;
+ private final String namespace;
+
+ /**
+ * Instantiates the {@link NamespaceAwareCoordinatorStreamStore} based upon
the provided
+ * MetadataStore that is instantiated and the namespace.
+ *
+ * @param metadataStore the instantiated {@link MetadataStore}.
+ * @param namespace the namespace to use for storing the keys in the
metadata store.
+ */
+ public NamespaceAwareCoordinatorStreamStore(MetadataStore metadataStore,
String namespace) {
+ this.metadataStore = metadataStore;
+ this.namespace = namespace;
+ }
+
+ @Override
+ public void init() {
+ // Metadata store lifecycle is managed outside of this class, so not
starting it.
+ }
+
+ @Override
+ public byte[] get(String key) {
+ Map<String, byte[]> bootstrappedMessages =
readMessagesFromCoordinatorStore();
+ return bootstrappedMessages.get(key);
+ }
+
+ @Override
+ public void put(String key, byte[] value) {
+ String coordinatorMessageKeyAsJson = getCoordinatorMessageKey(key);
+ metadataStore.put(coordinatorMessageKeyAsJson, value);
+ }
+
+ @Override
+ public void delete(String key) {
+ String coordinatorMessageKeyAsJson = getCoordinatorMessageKey(key);
+ metadataStore.delete(coordinatorMessageKeyAsJson);
+ }
+
+ @Override
+ public Map<String, byte[]> all() {
+ Map<String, byte[]> bootstrappedMessages =
readMessagesFromCoordinatorStore();
+ return Collections.unmodifiableMap(bootstrappedMessages);
+ }
+
+ @Override
+ public void flush() {
+ metadataStore.flush();
+ }
+
+ @Override
+ public void close() {
+ // Metadata store lifecycle is managed outside of this class, so not
stopping it.
+ }
+
+ @VisibleForTesting
+ String getCoordinatorMessageKey(String key) {
+ return
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, key);
+ }
+
+ /**
+ * Reads and returns all the messages from coordinator stream for a
particular namespace.
+ */
+ private Map<String, byte[]> readMessagesFromCoordinatorStore() {
+ Map<String, byte[]> bootstrappedMessages = new HashMap<>();
+ Map<String, byte[]> coordinatorStreamMessages = metadataStore.all();
+ coordinatorStreamMessages.forEach((coordinatorMessageKeyAsJson, value) -> {
+ CoordinatorMessageKey coordinatorMessageKey =
CoordinatorStreamStore.deserializeCoordinatorMessageKeyFromJson(coordinatorMessageKeyAsJson);
+ if (Objects.equals(namespace, coordinatorMessageKey.getNamespace())) {
+ if (value != null) {
+ bootstrappedMessages.put(coordinatorMessageKey.getKey(), value);
+ } else {
+ bootstrappedMessages.remove(coordinatorMessageKey.getKey());
+ }
+ }
+ });
+
+ return bootstrappedMessages;
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 2c0c0b7..2a8c862 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -334,7 +334,7 @@ public class StreamProcessor {
this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
- Option.apply(this.externalContextOptional.orElse(null)), null);
+ Option.apply(this.externalContextOptional.orElse(null)), null, null);
}
private JobCoordinator createJobCoordinator() {
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index b483997..564dbf3 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -23,6 +23,7 @@ import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.ContainerHeartbeatClient;
import org.apache.samza.container.ContainerHeartbeatMonitor;
@@ -32,9 +33,13 @@ import org.apache.samza.container.SamzaContainer$;
import org.apache.samza.container.SamzaContainerListener;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.context.JobContextImpl;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.task.TaskFactory;
import org.apache.samza.task.TaskFactoryUtil;
import org.apache.samza.util.ScalaJavaUtil;
@@ -74,62 +79,74 @@ public class ContainerLaunchUtil {
JobModel jobModel,
Config config,
Optional<ExternalContext> externalContextOptional) {
- TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
- LocalityManager localityManager = new LocalityManager(config, new
MetricsRegistryMap());
- SamzaContainer container = SamzaContainer$.MODULE$.apply(
- containerId,
- jobModel,
- ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId,
config)),
- taskFactory,
- JobContextImpl.fromConfigWithDefaults(config),
-
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
- Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
- Option.apply(externalContextOptional.orElse(null)), localityManager);
-
- ProcessorLifecycleListener listener =
appDesc.getProcessorLifecycleListenerFactory()
- .createInstance(new ProcessorContext() { }, config);
-
- container.setContainerListener(
- new SamzaContainerListener() {
- @Override
- public void beforeStart() {
- log.info("Before starting the container.");
- listener.beforeStart();
- }
-
- @Override
- public void afterStart() {
- log.info("Container Started");
- listener.afterStart();
- }
-
- @Override
- public void afterStop() {
- log.info("Container Stopped");
- listener.afterStop();
- }
-
- @Override
- public void afterFailure(Throwable t) {
- log.info("Container Failed");
- containerRunnerException = t;
- listener.afterFailure(t);
- }
- });
-
- ContainerHeartbeatMonitor heartbeatMonitor =
createContainerHeartbeatMonitor(container);
- if (heartbeatMonitor != null) {
- heartbeatMonitor.start();
- }
-
- container.run();
- if (heartbeatMonitor != null) {
- heartbeatMonitor.stop();
- }
-
- if (containerRunnerException != null) {
- log.error("Container stopped with Exception. Exiting process now.",
containerRunnerException);
- System.exit(1);
+ CoordinatorStreamStore coordinatorStreamStore = new
CoordinatorStreamStore(config, new MetricsRegistryMap());
+ coordinatorStreamStore.init();
+
+ try {
+ TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
+ LocalityManager localityManager = new LocalityManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetContainerHostMapping.TYPE));
+ Optional<StartpointManager> startpointManager = Optional.empty();
+ if (new JobConfig(config).getStartpointMetadataStoreFactory() != null) {
+ startpointManager = Optional.of(new StartpointManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
StartpointManager.NAMESPACE)));
+ }
+
+ SamzaContainer container = SamzaContainer$.MODULE$.apply(
+ containerId,
+ jobModel,
+ ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId,
config)),
+ taskFactory,
+ JobContextImpl.fromConfigWithDefaults(config),
+
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
+
Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
+ Option.apply(externalContextOptional.orElse(null)), localityManager,
startpointManager.orElse(null));
+
+ ProcessorLifecycleListener listener =
appDesc.getProcessorLifecycleListenerFactory()
+ .createInstance(new ProcessorContext() { }, config);
+
+ container.setContainerListener(
+ new SamzaContainerListener() {
+ @Override
+ public void beforeStart() {
+ log.info("Before starting the container.");
+ listener.beforeStart();
+ }
+
+ @Override
+ public void afterStart() {
+ log.info("Container Started");
+ listener.afterStart();
+ }
+
+ @Override
+ public void afterStop() {
+ log.info("Container Stopped");
+ listener.afterStop();
+ }
+
+ @Override
+ public void afterFailure(Throwable t) {
+ log.info("Container Failed");
+ containerRunnerException = t;
+ listener.afterFailure(t);
+ }
+ });
+
+ ContainerHeartbeatMonitor heartbeatMonitor =
createContainerHeartbeatMonitor(container);
+ if (heartbeatMonitor != null) {
+ heartbeatMonitor.start();
+ }
+
+ container.run();
+ if (heartbeatMonitor != null) {
+ heartbeatMonitor.stop();
+ }
+
+ if (containerRunnerException != null) {
+ log.error("Container stopped with Exception. Exiting process now.",
containerRunnerException);
+ System.exit(1);
+ }
+ } finally {
+ coordinatorStreamStore.close();
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
index 6005c92..da7f990 100644
---
a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
+++
b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
@@ -29,6 +29,7 @@ import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.container.TaskName;
+import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
@@ -54,40 +55,45 @@ import org.slf4j.LoggerFactory;
*/
public class StartpointManager {
private static final Logger LOG =
LoggerFactory.getLogger(StartpointManager.class);
- private static final String NAMESPACE = "samza-startpoint-v1";
+ public static final String NAMESPACE = "samza-startpoint-v1";
static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
private final MetadataStore metadataStore;
private final StartpointSerde startpointSerde = new StartpointSerde();
- private boolean started = false;
+ private boolean stopped = false;
/**
- * Constructs a {@link StartpointManager} instance with the provided {@link
MetadataStoreFactory}
- * @param metadataStoreFactory {@link MetadataStoreFactory} used to
construct the underlying store.
- * @param config {@link Config} required for the underlying store.
- * @param metricsRegistry {@link MetricsRegistry} to hook into the
underlying store.
+ * Constructs a {@link StartpointManager} instance by instantiating a new
metadata store connection.
+ * This is primarily used for testing.
*/
- public StartpointManager(MetadataStoreFactory metadataStoreFactory, Config
config, MetricsRegistry metricsRegistry) {
+ @VisibleForTesting
+ StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config,
MetricsRegistry metricsRegistry) {
Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory
cannot be null");
Preconditions.checkNotNull(config, "Config cannot be null");
Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be
null");
this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE,
config, metricsRegistry);
LOG.info("StartpointManager created with metadata store: {}",
metadataStore.getClass().getCanonicalName());
+ this.metadataStore.init();
}
/**
- * Starts the underlying {@link MetadataStore}
+ * Builds the StartpointManager based upon the provided {@link
MetadataStore} that is instantiated.
+ * Setting up a metadata store instance is expensive which requires opening
multiple connections
+ * and reading tons of information. Fully instantiated metadata store is
taken as a constructor argument
+ * to reuse it across different utility classes.
+ *
+ * @param metadataStore an instance of {@link MetadataStore} used to
read/write the start-points.
*/
+ public StartpointManager(MetadataStore metadataStore) {
+ Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null");
+ this.metadataStore = new
NamespaceAwareCoordinatorStreamStore(metadataStore, NAMESPACE);
+ }
+
public void start() {
- if (!started) {
- metadataStore.init();
- started = true;
- } else {
- LOG.warn("StartpointManager already started");
- }
+ // Metadata store lifecycle is managed outside of the StartpointManager,
so not starting it.
}
/**
@@ -106,7 +112,7 @@ public class StartpointManager {
* @param startpoint Reference to a Startpoint object.
*/
public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName,
Startpoint startpoint) {
- Preconditions.checkState(started, "Underlying metadata store not
available");
+ Preconditions.checkState(!stopped, "Underlying metadata store not
available");
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
Preconditions.checkNotNull(startpoint, "Startpoint cannot be null");
@@ -134,7 +140,7 @@ public class StartpointManager {
* @return {@link Startpoint} for the {@link SystemStreamPartition}, or null
if it does not exist or if it is too stale.
*/
public Startpoint readStartpoint(SystemStreamPartition ssp, TaskName
taskName) {
- Preconditions.checkState(started, "Underlying metadata store not
available");
+ Preconditions.checkState(!stopped, "Underlying metadata store not
available");
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
byte[] startpointBytes = metadataStore.get(toStoreKey(ssp, taskName));
@@ -164,7 +170,7 @@ public class StartpointManager {
* @param taskName ssp The {@link TaskName} to delete the {@link Startpoint}
for.
*/
public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {
- Preconditions.checkState(started, "Underlying metadata store not
available");
+ Preconditions.checkState(!stopped, "Underlying metadata store not
available");
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
metadataStore.delete(toStoreKey(ssp, taskName));
@@ -179,7 +185,7 @@ public class StartpointManager {
* @return The list of {@link SystemStreamPartition}s that were fanned out
to SystemStreamPartition+TaskName.
*/
public Set<SystemStreamPartition> fanOutStartpointsToTasks(JobModel
jobModel) {
- Preconditions.checkState(started, "Underlying metadata store not
available");
+ Preconditions.checkState(!stopped, "Underlying metadata store not
available");
Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
HashSet<SystemStreamPartition> sspsToDelete = new HashSet<>();
@@ -222,12 +228,8 @@ public class StartpointManager {
* Relinquish resources held by the underlying {@link MetadataStore}
*/
public void stop() {
- if (started) {
- metadataStore.close();
- started = false;
- } else {
- LOG.warn("StartpointManager already stopped.");
- }
+ stopped = true;
+ // Metadata store lifecycle is managed outside of the StartpointManager,
so not closing it.
}
@VisibleForTesting
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 9bfc3f8..a0d0e4e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -44,6 +44,7 @@ import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.job.model.ContainerModel;
@@ -307,7 +308,8 @@ public class ZkJobCoordinator implements JobCoordinator {
CoordinatorStreamValueSerde jsonSerde = new
CoordinatorStreamValueSerde(SetConfig.TYPE);
for (Map.Entry<String, String> entry : config.entrySet()) {
byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
- metadataStore.put(entry.getKey(), serializedValue);
+ String configKey =
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE,
entry.getKey());
+ metadataStore.put(configKey, serializedValue);
}
} finally {
if (metadataStore != null) {
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index d2b8f8f..307ee35 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -135,7 +135,8 @@ object SamzaContainer extends Logging {
applicationContainerContextFactoryOption:
Option[ApplicationContainerContextFactory[ApplicationContainerContext]],
applicationTaskContextFactoryOption:
Option[ApplicationTaskContextFactory[ApplicationTaskContext]],
externalContextOption: Option[ExternalContext],
- localityManager: LocalityManager = null) = {
+ localityManager: LocalityManager = null,
+ startpointManager: StartpointManager = null) = {
val config = jobContext.getConfig
val systemConfig = new SystemConfig(config)
val containerModel = jobModel.getContainers.get(containerId)
@@ -428,28 +429,12 @@ object SamzaContainer extends Logging {
.orNull
info("Got checkpoint manager: %s" format checkpointManager)
- val startpointMetadataStoreFactory =
Option(config.getStartpointMetadataStoreFactory)
- .map(Util.getObj(_, classOf[MetadataStoreFactory]))
- .orNull
- val startpointManager = if (startpointMetadataStoreFactory != null) {
- try {
- Option(new StartpointManager(startpointMetadataStoreFactory, config,
samzaContainerMetrics.registry))
- } catch {
- case e: Exception => {
- error("Unable to get an instance of the StartpointManager.
Continuing without one.", e)
- None
- }
- }
- } else {
- None
- }
-
// create a map of consumers with callbacks to pass to the OffsetManager
val checkpointListeners =
consumers.filter(_._2.isInstanceOf[CheckpointListener])
.map { case (system, consumer) => (system,
consumer.asInstanceOf[CheckpointListener])}
info("Got checkpointListeners : %s" format checkpointListeners)
- val offsetManager = OffsetManager(inputStreamMetadata, config,
checkpointManager, startpointManager.getOrElse(null), systemAdmins,
checkpointListeners, offsetManagerMetrics)
+ val offsetManager = OffsetManager(inputStreamMetadata, config,
checkpointManager, startpointManager, systemAdmins, checkpointListeners,
offsetManagerMetrics)
info("Got offset manager: %s" format offsetManager)
val dropDeserializationError = config.getDropDeserializationErrors
@@ -938,11 +923,11 @@ class SamzaContainer(
localityManager.writeContainerToHostMapping(containerId,
hostInet.getHostName)
} catch {
case uhe: UnknownHostException =>
- warn("Received UnknownHostException when persisting locality info
for container %s: " +
- "%s" format (containerId, uhe.getMessage)) //No-op
+ warn("Received UnknownHostException when persisting locality info
for container: " +
+ "%s" format (containerId), uhe) //No-op
case unknownException: Throwable =>
warn("Received an exception when persisting locality info for
container %s: " +
- "%s" format (containerId, unknownException.getMessage))
+ "%s" format (containerId), unknownException)
} finally {
info("Shutting down locality manager.")
localityManager.close()
diff --git
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index fa9d2a7..230a62d 100644
---
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -27,16 +27,31 @@ import org.apache.samza.config._
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config.Config
-import org.apache.samza.container.grouper.stream.{SSPGrouperProxy,
SystemStreamPartitionGrouperFactory}
+import org.apache.samza.container.grouper.stream.SSPGrouperProxy
+import
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
import org.apache.samza.container.grouper.task._
-import org.apache.samza.container.{LocalityManager, TaskName}
-import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
+import org.apache.samza.container.LocalityManager
+import org.apache.samza.container.TaskName
+import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory
+import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
+import org.apache.samza.coordinator.server.HttpServer
+import org.apache.samza.coordinator.server.JobServlet
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
-import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode,
TaskModel}
-import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.TaskMode
+import org.apache.samza.job.model.TaskModel
+import org.apache.samza.metadatastore.MetadataStore
+import org.apache.samza.metadatastore.MetadataStoreFactory
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.runtime.LocationId
import org.apache.samza.system._
-import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.util.Logging
+import org.apache.samza.util.Util
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -47,6 +62,8 @@ import scala.collection.JavaConverters._
*/
object JobModelManager extends Logging {
+ val SOURCE = "JobModelManager"
+
/**
* a volatile value to store the current instantiated
<code>JobModelManager</code>
*/
@@ -65,9 +82,15 @@ object JobModelManager extends Logging {
* @return the instantiated {@see JobModelManager}.
*/
def apply(config: Config, changelogPartitionMapping: util.Map[TaskName,
Integer], metricsRegistry: MetricsRegistry = new MetricsRegistryMap()):
JobModelManager = {
- val localityManager = new LocalityManager(config, metricsRegistry)
- val taskAssignmentManager = new TaskAssignmentManager(config,
metricsRegistry)
- val taskPartitionAssignmentManager = new
TaskPartitionAssignmentManager(config, metricsRegistry)
+ val coordinatorStreamStoreFactory: MetadataStoreFactory = new
CoordinatorStreamMetadataStoreFactory()
+ val coordinatorStreamStore: MetadataStore =
coordinatorStreamStoreFactory.getMetadataStore(SOURCE, config, metricsRegistry)
+ coordinatorStreamStore.init()
+
+ // Instantiate the respective metadata store util classes which uses the
same coordinator metadata store.
+ val localityManager = new LocalityManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetContainerHostMapping.TYPE))
+ val taskAssignmentManager = new TaskAssignmentManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetTaskContainerMapping.TYPE), new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetTaskModeMapping.TYPE))
+ val taskPartitionAssignmentManager = new
TaskPartitionAssignmentManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetTaskPartitionMapping.TYPE))
+
val systemAdmins = new SystemAdmins(config)
try {
systemAdmins.start()
@@ -85,10 +108,8 @@ object JobModelManager extends Logging {
currentJobModelManager = new JobModelManager(jobModelRef.get(), server,
localityManager)
currentJobModelManager
} finally {
- taskPartitionAssignmentManager.close()
- taskAssignmentManager.close()
systemAdmins.stop()
- // Not closing localityManager, since {@code ClusterBasedJobCoordinator}
uses it to read container locality through {@code JobModel}.
+ // Not closing coordinatorStreamStore, since {@code
ClusterBasedJobCoordinator} uses it to read container locality through {@code
JobModel}.
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
index 387491f..d18ad67 100644
---
a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
+++
b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
@@ -24,42 +24,29 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
import
org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer;
import
org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.CoordinatorStreamUtil;
import org.junit.After;
import org.junit.Before;
-import org.junit.runner.RunWith;
import org.junit.Test;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-/**
- * Unit tests for {@link LocalityManager}
- */
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(CoordinatorStreamUtil.class)
public class TestLocalityManager {
- private MockCoordinatorStreamSystemFactory
mockCoordinatorStreamSystemFactory;
- private final Config config = new MapConfig(ImmutableMap.of("job.name",
"test-job", "job.coordinator.system", "test-kafka"));
+ private static final Config CONFIG = new
MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system",
"test-kafka"));
+
+ private CoordinatorStreamStore coordinatorStreamStore;
+ private CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil;
@Before
public void setup() {
- mockCoordinatorStreamSystemFactory = new
MockCoordinatorStreamSystemFactory();
- MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
- PowerMockito.mockStatic(CoordinatorStreamUtil.class);
-
when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(mockCoordinatorStreamSystemFactory);
-
when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new
SystemStream("test-kafka", "test"));
- when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(),
anyObject())).thenReturn("test");
+ coordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(CONFIG);
+ coordinatorStreamStore =
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
}
@After
@@ -67,8 +54,9 @@ public class TestLocalityManager {
MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
}
- @Test public void testLocalityManager() {
- LocalityManager localityManager = new LocalityManager(config, new
MetricsRegistryMap());
+ @Test
+ public void testLocalityManager() {
+ LocalityManager localityManager = new LocalityManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetContainerHostMapping.TYPE));
localityManager.writeContainerToHostMapping("0", "localhost");
Map<String, Map<String, String>> localMap =
localityManager.readContainerLocality();
@@ -87,14 +75,15 @@ public class TestLocalityManager {
localityManager.close();
- MockCoordinatorStreamSystemProducer producer =
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config,
null);
- MockCoordinatorStreamSystemConsumer consumer =
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config,
null);
+ MockCoordinatorStreamSystemProducer producer =
coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemProducer();
+ MockCoordinatorStreamSystemConsumer consumer =
coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemConsumer();
assertTrue(producer.isStopped());
assertTrue(consumer.isStopped());
}
- @Test public void testWriteOnlyLocalityManager() {
- LocalityManager localityManager = new LocalityManager(config, new
MetricsRegistryMap());
+ @Test
+ public void testWriteOnlyLocalityManager() {
+ LocalityManager localityManager = new LocalityManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetContainerHostMapping.TYPE));
localityManager.writeContainerToHostMapping("1", "localhost");
@@ -104,8 +93,8 @@ public class TestLocalityManager {
localityManager.close();
- MockCoordinatorStreamSystemProducer producer =
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config,
null);
- MockCoordinatorStreamSystemConsumer consumer =
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config,
null);
+ MockCoordinatorStreamSystemProducer producer =
coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemProducer();
+ MockCoordinatorStreamSystemConsumer consumer =
coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemConsumer();
assertTrue(producer.isStopped());
assertTrue(consumer.isStopped());
}
diff --git
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 443262e..357e8ae 100644
---
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -19,46 +19,37 @@
package org.apache.samza.container.grouper.task;
+import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.system.*;
-import org.apache.samza.util.CoordinatorStreamUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(CoordinatorStreamUtil.class)
public class TestTaskAssignmentManager {
- private MockCoordinatorStreamSystemFactory
mockCoordinatorStreamSystemFactory;
+ private static final Config CONFIG = new
MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system",
"test-kafka"));
- private final Config config = new MapConfig(ImmutableMap.of("job.name",
"test-job", "job.coordinator.system", "test-kafka"));
+ private CoordinatorStreamStore coordinatorStreamStore;
+ private TaskAssignmentManager taskAssignmentManager;
@Before
public void setup() {
- MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
- mockCoordinatorStreamSystemFactory = new
MockCoordinatorStreamSystemFactory();
- PowerMockito.mockStatic(CoordinatorStreamUtil.class);
-
when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(mockCoordinatorStreamSystemFactory);
-
when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new
SystemStream("test-kafka", "test"));
- when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(),
anyObject())).thenReturn("test");
+ CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(CONFIG);
+ coordinatorStreamStore =
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+ taskAssignmentManager = new TaskAssignmentManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetTaskContainerMapping.TYPE), new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetTaskModeMapping.TYPE));
}
@After
@@ -68,8 +59,6 @@ public class TestTaskAssignmentManager {
@Test
public void testTaskAssignmentManager() {
- TaskAssignmentManager taskAssignmentManager = new
TaskAssignmentManager(config, new MetricsRegistryMap());
-
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1",
"1", "Task2", "2", "Task3", "0", "Task4", "1");
for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
@@ -85,8 +74,6 @@ public class TestTaskAssignmentManager {
@Test
public void testDeleteMappings() {
- TaskAssignmentManager taskAssignmentManager = new
TaskAssignmentManager(config, new MetricsRegistryMap());
-
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1",
"1");
for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
@@ -106,8 +93,6 @@ public class TestTaskAssignmentManager {
@Test
public void testTaskAssignmentManagerEmptyCoordinatorStream() {
- TaskAssignmentManager taskAssignmentManager = new
TaskAssignmentManager(config, new MetricsRegistryMap());
-
Map<String, String> expectedMap = new HashMap<>();
Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
diff --git
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
index a141ee3..1a7f0c9 100644
---
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
+++
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
@@ -24,47 +24,34 @@ import java.util.List;
import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
-import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.Partition;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.CoordinatorStreamUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(CoordinatorStreamUtil.class)
public class TestTaskPartitionAssignmentManager {
private static final String TEST_SYSTEM = "system";
private static final String TEST_STREAM = "stream";
private static final Partition PARTITION = new Partition(0);
+ private static final Config CONFIG = new
MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system",
"test-kafka"));
- private final Config config = new MapConfig(ImmutableMap.of("job.name",
"test-job", "job.coordinator.system", "test-kafka"));
private final SystemStreamPartition testSystemStreamPartition = new
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, PARTITION);
- private MockCoordinatorStreamSystemFactory
mockCoordinatorStreamSystemFactory;
private TaskPartitionAssignmentManager taskPartitionAssignmentManager;
@Before
public void setup() {
- mockCoordinatorStreamSystemFactory = new
MockCoordinatorStreamSystemFactory();
- MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
- PowerMockito.mockStatic(CoordinatorStreamUtil.class);
-
when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(mockCoordinatorStreamSystemFactory);
-
when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new
SystemStream("test-kafka", "test"));
- when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(),
anyObject())).thenReturn("test");
- taskPartitionAssignmentManager = new
TaskPartitionAssignmentManager(config, new MetricsRegistryMap());
+ CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(CONFIG);
+ CoordinatorStreamStore coordinatorStreamStore =
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+ taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetTaskPartitionMapping.TYPE));
}
@After
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
new file mode 100644
index 0000000..9ec548d
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.metadatastore;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.util.NoOpMetricsRegistry;
+
+/**
+ * <p>
+ * Instantiates and initializes a {@link CoordinatorStreamStore} based upon
the provided configuration.
+ *
+ * Primarily used for testing the metadata read/write flows in
ApplicationMaster.
+ * </p>
+ */
+public class CoordinatorStreamStoreTestUtil {
+
+ private final CoordinatorStreamStore coordinatorStreamStore;
+ private final MockCoordinatorStreamSystemFactory systemFactory;
+ private final Config config;
+
+ public CoordinatorStreamStoreTestUtil(Config config) {
+ this.config = config;
+ this.systemFactory = new MockCoordinatorStreamSystemFactory();
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
+ SystemConsumer systemConsumer = systemFactory.getConsumer("test-kafka",
config, new NoOpMetricsRegistry());
+ SystemProducer systemProducer = systemFactory.getProducer("test-kafka",
config, new NoOpMetricsRegistry());
+ SystemAdmin systemAdmin = systemFactory.getAdmin("test-kafka", config);
+ this.coordinatorStreamStore = new CoordinatorStreamStore(config,
systemProducer, systemConsumer, systemAdmin);
+ this.coordinatorStreamStore.init();
+ }
+
+ public CoordinatorStreamStore getCoordinatorStreamStore() {
+ return coordinatorStreamStore;
+ }
+
+ public
MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer
getMockCoordinatorStreamSystemProducer() {
+ return systemFactory.getCoordinatorStreamSystemProducer(config, null);
+ }
+
+ public
MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer
getMockCoordinatorStreamSystemConsumer() {
+ return systemFactory.getCoordinatorStreamSystemConsumer(config, null);
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
index 2f6f598..d5da781 100644
---
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
@@ -19,99 +19,83 @@
package org.apache.samza.coordinator.metadatastore;
import com.google.common.collect.ImmutableMap;
+import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
-import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.CoordinatorStreamUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
import java.util.*;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(CoordinatorStreamUtil.class)
public class TestCoordinatorStreamStore {
+ private static final String NAMESPACE = "namespace";
+ private static final Config CONFIG = new
MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system",
"test-kafka"));
+
private CoordinatorStreamStore coordinatorStreamStore;
+ private NamespaceAwareCoordinatorStreamStore
namespaceAwareCoordinatorStreamStore;
@Before
- public void setUp() {
- MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
- Map<String, String> configMap = ImmutableMap.of("job.name", "test-job",
- "job.coordinator.system",
"test-kafka");
- MockCoordinatorStreamSystemFactory systemFactory = new
MockCoordinatorStreamSystemFactory();
- PowerMockito.mockStatic(CoordinatorStreamUtil.class);
-
when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(systemFactory);
-
when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new
SystemStream("test-kafka", "test"));
- when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(),
anyObject())).thenReturn("test");
- coordinatorStreamStore = new
CoordinatorStreamStore(SetTaskContainerMapping.TYPE, new MapConfig(configMap),
new MetricsRegistryMap());
- coordinatorStreamStore.init();
+ public void setup() {
+ CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(CONFIG);
+ coordinatorStreamStore =
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+ namespaceAwareCoordinatorStreamStore = new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, NAMESPACE);
}
@Test
public void testReadAfterWrite() {
- String key = "test-key1";
+ String key = getCoordinatorMessageKey("test-key1");
byte[] value = getValue("test-value1");
Assert.assertNull(coordinatorStreamStore.get(key));
coordinatorStreamStore.put(key, value);
Assert.assertEquals(value, coordinatorStreamStore.get(key));
- Assert.assertEquals(1, coordinatorStreamStore.all().size());
+ Assert.assertEquals(1, namespaceAwareCoordinatorStreamStore.all().size());
}
@Test
public void testReadAfterDelete() {
- String key = "test-key1";
+ String key = getCoordinatorMessageKey("test-key1");
byte[] value = getValue("test-value1");
Assert.assertNull(coordinatorStreamStore.get(key));
coordinatorStreamStore.put(key, value);
Assert.assertEquals(value, coordinatorStreamStore.get(key));
coordinatorStreamStore.delete(key);
Assert.assertNull(coordinatorStreamStore.get(key));
- Assert.assertEquals(0, coordinatorStreamStore.all().size());
+ Assert.assertEquals(0, namespaceAwareCoordinatorStreamStore.all().size());
}
@Test
public void testReadOfNonExistentKey() {
Assert.assertNull(coordinatorStreamStore.get("randomKey"));
- Assert.assertEquals(0, coordinatorStreamStore.all().size());
+ Assert.assertEquals(0, namespaceAwareCoordinatorStreamStore.all().size());
}
@Test
public void testMultipleUpdatesForSameKey() {
- String key = "test-key1";
+ String key = getCoordinatorMessageKey("test-key1");
byte[] value = getValue("test-value1");
byte[] value1 = getValue("test-value2");
coordinatorStreamStore.put(key, value);
coordinatorStreamStore.put(key, value1);
Assert.assertEquals(value1, coordinatorStreamStore.get(key));
- Assert.assertEquals(1, coordinatorStreamStore.all().size());
+ Assert.assertEquals(1, namespaceAwareCoordinatorStreamStore.all().size());
}
@Test
public void testAllEntries() {
- String key = "test-key1";
- String key1 = "test-key2";
- String key2 = "test-key3";
+ String key = getCoordinatorMessageKey("test-key1");
+ String key1 = getCoordinatorMessageKey("test-key2");
+ String key2 = getCoordinatorMessageKey("test-key3");
byte[] value = getValue("test-value1");
byte[] value1 = getValue("test-value2");
byte[] value2 = getValue("test-value3");
coordinatorStreamStore.put(key, value);
coordinatorStreamStore.put(key1, value1);
coordinatorStreamStore.put(key2, value2);
- ImmutableMap<String, byte[]> expected = ImmutableMap.of(key, value, key1,
value1, key2, value2);
- Assert.assertEquals(expected, coordinatorStreamStore.all());
+ ImmutableMap<String, byte[]> expected = ImmutableMap.of("test-key1",
value, "test-key2", value1, "test-key3", value2);
+ Assert.assertEquals(expected, namespaceAwareCoordinatorStreamStore.all());
}
private byte[] getValue(String value) {
@@ -119,4 +103,8 @@ public class TestCoordinatorStreamStore {
SetTaskContainerMapping setTaskContainerMapping = new
SetTaskContainerMapping("testSource", "testTask", value);
return messageSerde.toBytes(setTaskContainerMapping.getMessageMap());
}
+
+ private static String getCoordinatorMessageKey(String key) {
+ return
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(NAMESPACE, key);
+ }
}
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
new file mode 100644
index 0000000..06197f6
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.metadatastore;
+
+import com.google.common.collect.ImmutableMap;
+import java.nio.charset.StandardCharsets;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestNamespaceAwareCoordinatorStreamStore {
+ private static final String KEY1 = "testKey";
+
+ private CoordinatorStreamStore coordinatorStreamStore;
+ private NamespaceAwareCoordinatorStreamStore
namespaceAwareCoordinatorStreamStore;
+ private String namespace;
+
+ @Before
+ public void setUp() {
+ namespace = RandomStringUtils.randomAlphabetic(5);
+ coordinatorStreamStore = Mockito.mock(CoordinatorStreamStore.class);
+ namespaceAwareCoordinatorStreamStore = new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, namespace);
+ }
+
+ @Test
+ public void testGetShouldDelegateTheInvocationToUnderlyingStore() {
+ String value = RandomStringUtils.randomAlphabetic(5);
+ String namespacedKey =
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, KEY1);
+
Mockito.when(coordinatorStreamStore.all()).thenReturn(ImmutableMap.of(namespacedKey,
value.getBytes(StandardCharsets.UTF_8)));
+
+ Assert.assertArrayEquals(value.getBytes(StandardCharsets.UTF_8),
namespaceAwareCoordinatorStreamStore.get(KEY1));
+ Mockito.verify(coordinatorStreamStore).all();
+ }
+
+ @Test
+ public void testPutShouldDelegateTheInvocationToUnderlyingStore() {
+ String value = RandomStringUtils.randomAlphabetic(5);
+ String namespacedKey =
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, KEY1);
+
+ Mockito.doNothing().when(coordinatorStreamStore).put(namespacedKey,
value.getBytes(StandardCharsets.UTF_8));
+
+ namespaceAwareCoordinatorStreamStore.put(KEY1,
value.getBytes(StandardCharsets.UTF_8));
+
+ Mockito.verify(coordinatorStreamStore).put(namespacedKey,
value.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testDeleteShouldDelegateTheInvocationToUnderlyingStore() {
+ String namespacedKey =
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, KEY1);
+
+ namespaceAwareCoordinatorStreamStore.delete(KEY1);
+
+ Mockito.verify(coordinatorStreamStore).delete(namespacedKey);
+ }
+
+ @Test
+ public void testAllShouldDelegateToUnderlyingMetadaStore() {
+ String value = RandomStringUtils.randomAlphabetic(5);
+ String key2 = "key2";
+ String key3 = "key3";
+ byte[] valueAsBytes = value.getBytes(StandardCharsets.UTF_8);
+ String namespacedKey1 =
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, KEY1);
+ String namespacedKey2 =
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson("namespace1", key2);
+ String namespacedKey3 =
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson("namespace1", key3);
+
Mockito.when(coordinatorStreamStore.all()).thenReturn(ImmutableMap.of(namespacedKey1,
valueAsBytes, namespacedKey2, new byte[0], namespacedKey3, new byte[0]));
+
+ Assert.assertEquals(ImmutableMap.of(KEY1, valueAsBytes),
namespaceAwareCoordinatorStreamStore.all());
+ Mockito.verify(coordinatorStreamStore).all();
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
index 9e5e06c..d6ca2ca 100644
---
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -41,6 +41,7 @@ import org.junit.Test;
public class TestCoordinatorStreamSystemProducer {
@Test
public void testCoordinatorStreamSystemProducer() {
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
String source = "source";
SystemStream systemStream = new SystemStream("system", "stream");
MockCoordinatorSystemProducer systemProducer = new
MockCoordinatorSystemProducer(source);
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
b/samza-core/src/test/java/org/apache/samza/startpoint/StartpointManagerTestUtil.java
similarity index 55%
copy from
samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
copy to
samza-core/src/test/java/org/apache/samza/startpoint/StartpointManagerTestUtil.java
index c38dc56..f8eb855 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
+++
b/samza-core/src/test/java/org/apache/samza/startpoint/StartpointManagerTestUtil.java
@@ -16,21 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.samza.coordinator.metadatastore;
+package org.apache.samza.startpoint;
-import org.apache.samza.config.Config;
-import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
+import org.apache.samza.util.NoOpMetricsRegistry;
-/**
- * Builds the {@link CoordinatorStreamStore} based upon the provided {@link
Config}
- * and {@link MetricsRegistry}.
- */
-public class CoordinatorStreamMetadataStoreFactory implements
MetadataStoreFactory {
- @Override
- public MetadataStore getMetadataStore(String namespace, Config config,
MetricsRegistry metricsRegistry) {
- return new CoordinatorStreamStore(namespace, config, metricsRegistry);
+public class StartpointManagerTestUtil {
+
+ private StartpointManagerTestUtil() {
+ }
+
+ public static StartpointManager getStartpointManager() {
+ return new StartpointManager(new InMemoryMetadataStoreFactory(), new
MapConfig(), new NoOpMetricsRegistry());
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
index cb20148..cc86882 100644
---
a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
+++
b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
@@ -28,33 +28,44 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.metadatastore.InMemoryMetadataStore;
-import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-
public class TestStartpointManager {
+ private static final Config CONFIG = new
MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system",
"test-kafka"));
+
+ private CoordinatorStreamStore coordinatorStreamStore;
+ private StartpointManager startpointManager;
+
+ @Before
+ public void setup() {
+ CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(CONFIG);
+ coordinatorStreamStore =
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+ startpointManager = new StartpointManager(coordinatorStreamStore);
+ }
+
@Test
public void testDefaultMetadataStore() {
- MapConfig config = new MapConfig();
- StartpointManager startpointManager = new StartpointManager(new
InMemoryMetadataStoreFactory(), config, new NoOpMetricsRegistry());
+ StartpointManager startpointManager = new
StartpointManager(coordinatorStreamStore);
Assert.assertNotNull(startpointManager);
- Assert.assertEquals(InMemoryMetadataStore.class,
startpointManager.getMetadataStore().getClass());
+ Assert.assertEquals(NamespaceAwareCoordinatorStreamStore.class,
startpointManager.getMetadataStore().getClass());
}
@Test
public void testNoLongerUsableAfterStop() {
- MapConfig config = new MapConfig();
- StartpointManager startpointManager = new StartpointManager(new
InMemoryMetadataStoreFactory(), config, new NoOpMetricsRegistry());
+ StartpointManager startpointManager = new
StartpointManager(coordinatorStreamStore);
startpointManager.start();
SystemStreamPartition ssp =
new SystemStreamPartition("mockSystem", "mockStream", new
Partition(2));
@@ -101,7 +112,6 @@ public class TestStartpointManager {
@Test
public void testBasics() {
- StartpointManager startpointManager = new StartpointManager(new
InMemoryMetadataStoreFactory(), new MapConfig(), new NoOpMetricsRegistry());
startpointManager.start();
SystemStreamPartition ssp =
new SystemStreamPartition("mockSystem", "mockStream", new
Partition(2));
@@ -161,10 +171,8 @@ public class TestStartpointManager {
}
@Test
- public void testStaleStartpoints() throws InterruptedException {
- StartpointManager startpointManager = new StartpointManager(new
InMemoryMetadataStoreFactory(), new MapConfig(), new NoOpMetricsRegistry());
- SystemStreamPartition ssp =
- new SystemStreamPartition("mockSystem", "mockStream", new
Partition(2));
+ public void testStaleStartpoints() {
+ SystemStreamPartition ssp = new SystemStreamPartition("mockSystem",
"mockStream", new Partition(2));
TaskName taskName = new TaskName("MockTask");
startpointManager.start();
@@ -180,7 +188,6 @@ public class TestStartpointManager {
@Test
public void testGroupStartpointsPerTask() {
MapConfig config = new MapConfig();
- StartpointManager startpointManager = new StartpointManager(new
InMemoryMetadataStoreFactory(), new MapConfig(), new NoOpMetricsRegistry());
startpointManager.start();
SystemStreamPartition sspBroadcast =
new SystemStreamPartition("mockSystem1", "mockStream1", new
Partition(2));
diff --git
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 5e45547..0c009a1 100644
---
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -25,11 +25,9 @@ import java.util.function.BiConsumer
import org.apache.samza.{Partition, SamzaException}
import org.apache.samza.config.MapConfig
import org.apache.samza.container.TaskName
-import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory
-import org.apache.samza.startpoint.{StartpointManager, StartpointOldest,
StartpointUpcoming}
+import org.apache.samza.startpoint.{StartpointManagerTestUtil,
StartpointOldest, StartpointUpcoming}
import org.apache.samza.system.SystemStreamMetadata.{OffsetType,
SystemStreamPartitionMetadata}
import org.apache.samza.system._
-import org.apache.samza.util.NoOpMetricsRegistry
import org.junit.Assert._
import org.junit.Test
import org.mockito.Matchers._
@@ -526,7 +524,7 @@ class TestOffsetManager {
}
private def getStartpointManager() = {
- val startpointManager = new StartpointManager(new
InMemoryMetadataStoreFactory, new MapConfig, new NoOpMetricsRegistry)
+ val startpointManager = StartpointManagerTestUtil.getStartpointManager()
startpointManager.start
startpointManager
}
diff --git
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
index daf665b..7789aca 100644
---
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
+++
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -35,8 +35,12 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.rest.model.Task;
import org.apache.samza.rest.proxy.installation.InstallationFinder;
@@ -130,9 +134,10 @@ public class SamzaTaskProxy implements TaskProxy {
* @return list of {@link Task} constructed from job model in coordinator
stream.
*/
protected List<Task>
readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
- LocalityManager localityManager = new
LocalityManager(consumer.getConfig(), new MetricsRegistryMap());
+ CoordinatorStreamStore coordinatorStreamStore = new
CoordinatorStreamStore(consumer.getConfig(), new MetricsRegistryMap());
+ LocalityManager localityManager = new
LocalityManager(coordinatorStreamStore);
Map<String, Map<String, String>> containerIdToHostMapping =
localityManager.readContainerLocality();
- TaskAssignmentManager taskAssignmentManager = new
TaskAssignmentManager(consumer.getConfig(), new MetricsRegistryMap());
+ TaskAssignmentManager taskAssignmentManager = new
TaskAssignmentManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetTaskContainerMapping.TYPE), new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetTaskModeMapping.TYPE));
Map<String, String> taskNameToContainerIdMapping =
taskAssignmentManager.readTaskAssignment();
StorageConfig storageConfig = new StorageConfig(consumer.getConfig());
List<String> storeNames =
JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
diff --git
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index e6ce045..ad0b94c 100644
---
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -53,6 +53,7 @@ import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.model.ContainerModel;
@@ -717,8 +718,9 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
Map<String, String> configMap = new HashMap<>();
CoordinatorStreamValueSerde jsonSerde = new
CoordinatorStreamValueSerde("set-config");
metadataStore.all().forEach((key, value) -> {
+ CoordinatorStreamStore.CoordinatorMessageKey coordinatorMessageKey =
CoordinatorStreamStore.deserializeCoordinatorMessageKeyFromJson(key);
String deserializedValue = jsonSerde.fromBytes(value);
- configMap.put(key, deserializedValue);
+ configMap.put(coordinatorMessageKey.getKey(), deserializedValue);
});
return new MapConfig(configMap);
}