This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 450e105 SAMZA-2161: Move ChangelogPartitionManager and
CoordinatorStream ConfigReader to MetadataStore. (#990)
450e105 is described below
commit 450e1050389a538f5152cbb7ee9a58668e7667d8
Author: shanthoosh <[email protected]>
AuthorDate: Fri Apr 12 15:38:12 2019 -0700
SAMZA-2161: Move ChangelogPartitionManager and CoordinatorStream
ConfigReader to MetadataStore. (#990)
---
.../clustermanager/ClusterBasedJobCoordinator.java | 73 +++++++-------
.../metadatastore/CoordinatorStreamStore.java | 1 +
.../stream/CoordinatorStreamValueSerde.java | 2 +-
.../stream/messages/SetChangelogMapping.java | 7 +-
.../samza/storage/ChangelogStreamManager.java | 74 ++++++++------
.../org/apache/samza/storage/StorageRecovery.java | 49 +++++-----
.../apache/samza/checkpoint/CheckpointTool.scala | 107 ++++++++++++---------
.../apache/samza/coordinator/JobModelManager.scala | 19 ++--
.../apache/samza/job/local/ProcessJobFactory.scala | 99 ++++++++++---------
.../apache/samza/job/local/ThreadJobFactory.scala | 22 +++--
.../apache/samza/util/CoordinatorStreamUtil.scala | 21 ++++
.../TestClusterBasedJobCoordinator.java | 7 +-
.../CoordinatorStreamStoreTestUtil.java | 10 +-
.../stream/MockCoordinatorStreamSystemFactory.java | 1 +
.../MockCoordinatorStreamWrappedConsumer.java | 14 +--
.../samza/checkpoint/TestCheckpointTool.scala | 16 +--
.../samza/coordinator/TestJobCoordinator.scala | 40 +++++---
.../samza/validation/YarnJobValidationTool.java | 46 +++++----
.../job/yarn/TestSamzaYarnAppMasterService.scala | 45 +++++----
19 files changed, 365 insertions(+), 288 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index d64665c..75f6a4a 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -41,7 +41,9 @@ import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.PartitionChangeException;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.coordinator.StreamRegexMonitor;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
@@ -52,6 +54,7 @@ import org.apache.samza.storage.ChangelogStreamManager;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +87,7 @@ import scala.collection.JavaConverters;
*/
public class ClusterBasedJobCoordinator {
- private static final Logger log =
LoggerFactory.getLogger(ClusterBasedJobCoordinator.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ClusterBasedJobCoordinator.class);
private final Config config;
private final ClusterManagerConfig clusterManagerConfig;
@@ -112,11 +115,6 @@ public class ClusterBasedJobCoordinator {
*/
private final ChangelogStreamManager changelogStreamManager;
- /**
- * Single instance of the coordinator stream to use.
- */
- private final CoordinatorStreamManager coordinatorStreamManager;
-
/*
* The interval for polling the Task Manager for shutdown.
*/
@@ -151,6 +149,9 @@ public class ClusterBasedJobCoordinator {
* Metrics to track stats around container failures, needed containers etc.
*/
private final MetricsRegistryMap metrics;
+ private final CoordinatorStreamStore coordinatorStreamStore;
+
+ private final SystemAdmins systemAdmins;
/**
* Internal variable for the instance of {@link JmxServer}
@@ -162,8 +163,6 @@ public class ClusterBasedJobCoordinator {
*/
volatile private Exception coordinatorException = null;
- private SystemAdmins systemAdmins = null;
-
/**
* Creates a new ClusterBasedJobCoordinator instance from a config. Invoke
run() to actually
* run the jobcoordinator.
@@ -172,22 +171,17 @@ public class ClusterBasedJobCoordinator {
* {@link
org.apache.samza.job.model.JobModel} from.
*/
public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) {
-
metrics = new MetricsRegistryMap();
- coordinatorStreamManager = new
CoordinatorStreamManager(coordinatorSystemConfig, metrics);
- // register ClusterBasedJobCoordinator with the CoordinatorStreamManager.
- coordinatorStreamManager.register(getClass().getSimpleName());
- // start the coordinator stream's underlying consumer and producer.
- coordinatorStreamManager.start();
- // bootstrap current configuration.
- coordinatorStreamManager.bootstrap();
+ coordinatorStreamStore = new
CoordinatorStreamStore(coordinatorSystemConfig, metrics);
+ coordinatorStreamStore.init();
+ config =
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
// build a JobModelManager and ChangelogStreamManager and perform
partition assignments.
- changelogStreamManager = new
ChangelogStreamManager(coordinatorStreamManager);
- jobModelManager =
JobModelManager.apply(coordinatorStreamManager.getConfig(),
changelogStreamManager.readPartitionMapping(), metrics);
+ changelogStreamManager = new ChangelogStreamManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE));
+ jobModelManager = JobModelManager.apply(config,
changelogStreamManager.readPartitionMapping(),
+ coordinatorStreamStore, metrics);
- config = jobModelManager.jobModel().getConfig();
hasDurableStores = new StorageConfig(config).hasDurableStores();
state = new SamzaApplicationState(jobModelManager);
// The systemAdmins should be started before partitionMonitor can be used.
And it should be stopped when this coordinator is stopped.
@@ -209,7 +203,7 @@ public class ClusterBasedJobCoordinator {
*/
public void run() {
if (!isStarted.compareAndSet(false, true)) {
- log.warn("Attempting to start an already started job coordinator. ");
+ LOG.warn("Attempting to start an already started job coordinator. ");
return;
}
// set up JmxServer (if jmx is enabled)
@@ -223,7 +217,7 @@ public class ClusterBasedJobCoordinator {
try {
//initialize JobCoordinator state
- log.info("Starting cluster based job coordinator");
+ LOG.info("Starting cluster based job coordinator");
//create necessary checkpoint and changelog streams, if not created
JobModel jobModel = jobModelManager.jobModel();
@@ -258,12 +252,12 @@ public class ClusterBasedJobCoordinator {
Thread.sleep(jobCoordinatorSleepInterval);
} catch (InterruptedException e) {
isInterrupted = true;
- log.error("Interrupted in job coordinator loop", e);
+ LOG.error("Interrupted in job coordinator loop", e);
Thread.currentThread().interrupt();
}
}
} catch (Throwable e) {
- log.error("Exception thrown in the JobCoordinator loop ", e);
+ LOG.error("Exception thrown in the JobCoordinator loop", e);
throw new SamzaException(e);
} finally {
onShutDown();
@@ -287,18 +281,18 @@ public class ClusterBasedJobCoordinator {
inputStreamRegexMonitor.ifPresent(StreamRegexMonitor::stop);
systemAdmins.stop();
containerProcessManager.stop();
- coordinatorStreamManager.stop();
+ coordinatorStreamStore.close();
} catch (Throwable e) {
- log.error("Exception while stopping cluster based job coordinator", e);
+ LOG.error("Exception while stopping cluster based job coordinator", e);
}
- log.info("Stopped cluster based job coordinator");
+ LOG.info("Stopped cluster based job coordinator");
if (jmxServer != null) {
try {
jmxServer.stop();
- log.info("Stopped JMX server");
+ LOG.info("Stopped Jmx Server");
} catch (Throwable e) {
- log.error("Exception while stopping JMX server", e);
+ LOG.error("Exception while stopping jmx server", e);
}
}
}
@@ -314,7 +308,7 @@ public class ClusterBasedJobCoordinator {
new JobConfig(config).getMonitorPartitionChangeFrequency(),
streamsChanged -> {
// Fail the jobs with durable state store. Otherwise, application
state.status remains UNDEFINED s.t. YARN job will be restarted
if (hasDurableStores) {
- log.error("Input topic partition count changed in a job with durable
state. Failing the job. " +
+ LOG.error("Input topic partition count changed in a job with durable
state. Failing the job. " +
"Changed topics: {}", streamsChanged.toString());
state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
}
@@ -326,7 +320,7 @@ public class ClusterBasedJobCoordinator {
// if input regex monitor is not enabled return empty
if (new JobConfig(config).getMonitorRegexEnabled()) {
- log.info("StreamRegexMonitor is disabled.");
+ LOG.info("StreamRegexMonitor is disabled.");
return Optional.empty();
}
@@ -341,7 +335,7 @@ public class ClusterBasedJobCoordinator {
// if no rewriter is defined, there is nothing to monitor
if (!rewritersList.isDefined()) {
- log.info("No config rewriters are defined. No StreamRegexMonitor
created.");
+ LOG.warn("No config rewriters are defined. No StreamRegexMonitor
created.");
return Optional.empty();
}
@@ -352,7 +346,7 @@ public class ClusterBasedJobCoordinator {
// if there are no regexes to monitor
if (inputRegexesToMonitor.isEmpty()) {
- log.info("No input regexes are defined. No StreamRegexMonitor created.");
+ LOG.info("No input regexes are defined. No StreamRegexMonitor created.");
return Optional.empty();
}
@@ -362,8 +356,8 @@ public class ClusterBasedJobCoordinator {
public void onInputStreamsChanged(Set<SystemStream> initialInputSet,
Set<SystemStream> newInputStreams,
Map<String, Pattern> regexesMonitored) {
if (hasDurableStores) {
- log.error("New input system-streams discovered. Failing the job
since it is stateful. " +
- "New input streams: {}, Existing input streams: {}",
newInputStreams, inputStreamsToMonitor);
+ LOG.error("New input system-streams discovered. Failing the job.
New input streams: {}" +
+ " Existing input streams: {}", newInputStreams,
inputStreamsToMonitor);
state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
}
coordinatorException = new InputStreamsDiscoveredException("New
input streams discovered: " + newInputStreams);
@@ -389,19 +383,20 @@ public class ClusterBasedJobCoordinator {
* @param args args
*/
public static void main(String[] args) {
- Config coordinatorSystemConfig = null;
+ Config coordinatorSystemConfig;
final String coordinatorSystemEnv =
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
try {
//Read and parse the coordinator system config.
- log.info("Parsing coordinator system config {}", coordinatorSystemEnv);
+ LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
coordinatorSystemConfig =
new
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv,
Config.class));
+ LOG.info("Using the coordinator system config: {}.",
coordinatorSystemConfig);
} catch (IOException e) {
- log.error("Error reading coordinator stream config.", e);
+ LOG.error("Exception while reading coordinator stream config", e);
throw new SamzaException(e);
}
ClusterBasedJobCoordinator jc = new
ClusterBasedJobCoordinator(coordinatorSystemConfig);
jc.run();
- log.info("Finished running ClusterBasedJobCoordinator");
+ LOG.info("Finished running ClusterBasedJobCoordinator");
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index 87121f1..4752cdc 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -185,6 +185,7 @@ public class CoordinatorStreamStore implements
MetadataStore {
systemProducer.flush(SOURCE);
} catch (Exception e) {
LOG.error("Exception occurred when flushing the metadata store:", e);
+ throw new SamzaException("Exception occurred when flushing the metadata
store:", e);
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
index b0d5815..9b862bd 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
@@ -59,7 +59,7 @@ public class CoordinatorStreamValueSerde implements
Serde<String> {
return setTaskContainerMapping.getTaskAssignment();
} else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
SetChangelogMapping changelogMapping = new SetChangelogMapping(message);
- return String.valueOf(changelogMapping.getPartition());
+ return (changelogMapping.getPartition() != null) ?
String.valueOf(changelogMapping.getPartition()) : null;
} else if (type.equalsIgnoreCase(SetConfig.TYPE)) {
SetConfig setConfig = new SetConfig(message);
return setConfig.getConfigValue();
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
index 279ae06..912fe24 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
@@ -19,6 +19,8 @@
package org.apache.samza.coordinator.stream.messages;
+import org.apache.commons.lang3.StringUtils;
+
/**
* The {@link SetChangelogMapping} message is used to store the changelog
partition information for a particular task.
* The structure looks like:
@@ -58,7 +60,8 @@ public class SetChangelogMapping extends
CoordinatorStreamMessage {
return getKey();
}
- public int getPartition() {
- return Integer.parseInt(getMessageValue(CHANGELOG_VALUE_KEY));
+ public Integer getPartition() {
+ String changelogPartition = getMessageValue(CHANGELOG_VALUE_KEY);
+ return StringUtils.isNotBlank(changelogPartition) ?
Integer.parseInt(changelogPartition) : null;
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
index 71635aa..4635856 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
@@ -19,6 +19,7 @@
package org.apache.samza.storage;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@@ -28,9 +29,9 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JavaStorageConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStream;
@@ -38,58 +39,73 @@ import org.apache.samza.util.StreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
- * The Changelog manager creates the changelog stream. If a coordinator stream
manager is provided,
- * it can be used to read, write and update the changelog stream
partition-to-task mapping.
+ * Responsible for creating the changelog stream. Used for reading, writing
+ * and updating the task to changelog stream partition association in metadata
store.
*/
public class ChangelogStreamManager {
private static final Logger LOG =
LoggerFactory.getLogger(ChangelogStreamManager.class);
- // This is legacy for changelog. Need to investigate what happens if you use
a different source name
- private static final String SOURCE = "JobModelManager";
- private final CoordinatorStreamManager coordinatorStreamManager;
+ private final MetadataStore metadataStore;
+ private final CoordinatorStreamValueSerde valueSerde;
/**
- * Construct changelog manager with a bootstrapped coordinator stream.
+ * Builds the ChangelogStreamManager based upon the provided {@link
MetadataStore} that is instantiated.
+ * Setting up a metadata store instance is expensive which requires opening
multiple connections
+ * and reading tons of information. Fully instantiated metadata store is
taken as a constructor argument
+ * to reuse it across different utility classes. Uses the {@link
CoordinatorStreamValueSerde} to serialize
+ * messages before reading/writing into metadata store.
*
- * @param coordinatorStreamManager Coordinator stream manager.
+ * @param metadataStore an instance of {@link MetadataStore} to read/write
the container locality.
*/
- public ChangelogStreamManager(CoordinatorStreamManager
coordinatorStreamManager) {
- this.coordinatorStreamManager = coordinatorStreamManager;
+ public ChangelogStreamManager(MetadataStore metadataStore) {
+ this.metadataStore = metadataStore;
+ this.valueSerde = new
CoordinatorStreamValueSerde(SetChangelogMapping.TYPE);
}
/**
- * Read the taskName to partition mapping that is being maintained by this
ChangelogManager
+ * Reads the taskName to changelog partition assignments from the {@link
MetadataStore}.
+ *
* @return TaskName to change LOG partition mapping, or an empty map if
there were no messages.
*/
public Map<TaskName, Integer> readPartitionMapping() {
LOG.debug("Reading changelog partition information");
- final HashMap<TaskName, Integer> changelogMapping = new HashMap<>();
- for (CoordinatorStreamMessage coordinatorStreamMessage :
coordinatorStreamManager.getBootstrappedStream(SetChangelogMapping.TYPE)) {
- SetChangelogMapping changelogMapEntry = new
SetChangelogMapping(coordinatorStreamMessage);
- changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()),
changelogMapEntry.getPartition());
- LOG.debug("TaskName: {} is mapped to {}",
changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
- }
+ final Map<TaskName, Integer> changelogMapping = new HashMap<>();
+ metadataStore.all().forEach((taskName, partitionIdAsBytes) -> {
+ String partitionId = valueSerde.fromBytes(partitionIdAsBytes);
+ LOG.debug("TaskName: {} is mapped to {}", taskName, partitionId);
+ if (StringUtils.isNotBlank(partitionId)) {
+ changelogMapping.put(new TaskName(taskName),
Integer.valueOf(partitionId));
+ }
+ });
return changelogMapping;
}
/**
- * Write the taskName to partition mapping.
- * @param changelogEntries The entries that needs to be written to the
coordinator stream, the map takes the taskName
- * and it's corresponding changelog partition.
+ * Writes the taskName to changelog partition assignments to the {@link
MetadataStore}.
+ * @param changelogEntries a map of the taskName to the changelog partition
to be written to
+ * metadata store.
*/
public void writePartitionMapping(Map<TaskName, Integer> changelogEntries) {
LOG.debug("Updating changelog information with: ");
for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
- LOG.debug("TaskName: {} to Partition: {}", entry.getKey().getTaskName(),
entry.getValue());
- coordinatorStreamManager.send(new SetChangelogMapping(SOURCE,
entry.getKey().getTaskName(), entry.getValue()));
+ Preconditions.checkNotNull(entry.getKey());
+ String taskName = entry.getKey().getTaskName();
+ if (entry.getValue() != null) {
+ String changeLogPartitionId = String.valueOf(entry.getValue());
+ LOG.debug("TaskName: {} to Partition: {}", taskName, entry.getValue());
+ metadataStore.put(taskName, valueSerde.toBytes(changeLogPartitionId));
+ } else {
+ LOG.debug("Deleting the TaskName: {}", taskName);
+ metadataStore.delete(taskName);
+ }
}
}
/**
- * Merge previous and new taskName to partition mapping and write it.
+ * Merges the previous and the new taskName to changelog partition mapping.
+ * Writes the merged taskName to partition mapping to {@link MetadataStore}.
* @param prevChangelogEntries The previous map of taskName to changelog
partition.
* @param newChangelogEntries The new map of taskName to changelog partition.
*/
@@ -101,10 +117,10 @@ public class ChangelogStreamManager {
}
/**
- * Utility method to create and validate changelog streams. The method is
static because it does not require an
- * instance of the {@link CoordinatorStreamManager}
- * @param config Config with changelog info
- * @param maxChangeLogStreamPartitions Maximum changelog stream partitions
to create
+ * Creates and validates the changelog streams of a samza job.
+ *
+ * @param config the configuration with changelog info.
+ * @param maxChangeLogStreamPartitions the maximum number of changelog
stream partitions to create.
*/
public static void createChangelogStreams(Config config, int
maxChangeLogStreamPartitions) {
// Get changelog store config
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 79491d3..4a9ab26 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -33,7 +33,7 @@ import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.ContainerContextImpl;
import org.apache.samza.context.JobContextImpl;
import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
@@ -46,6 +46,7 @@ import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Clock;
import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.ScalaJavaUtil;
import org.apache.samza.util.StreamUtil;
import org.apache.samza.util.SystemClock;
@@ -61,17 +62,17 @@ import scala.Option;
* from the job's config file.
*/
public class StorageRecovery extends CommandLine {
+ private static final Logger LOG =
LoggerFactory.getLogger(StorageRecovery.class);
+
+ private final Config jobConfig;
+ private final File storeBaseDir;
+ private final SystemAdmins systemAdmins;
+ private final Map<String, SystemStream> changeLogSystemStreams = new
HashMap<>();
+ private final Map<String, StorageEngineFactory<Object, Object>>
storageEngineFactories = new HashMap<>();
+ private final Map<String, ContainerStorageManager> containerStorageManagers
= new HashMap<>();
- private Config jobConfig;
private int maxPartitionNumber = 0;
- private File storeBaseDir = null;
- private HashMap<String, SystemStream> changeLogSystemStreams = new
HashMap<>();
- private HashMap<String, StorageEngineFactory<Object, Object>>
storageEngineFactories = new HashMap<>();
private Map<String, ContainerModel> containers = new HashMap<>();
- private Map<String, ContainerStorageManager> containerStorageManagers = new
HashMap<>();
-
- private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
- private SystemAdmins systemAdmins = null;
/**
* Construct the StorageRecovery
@@ -92,7 +93,7 @@ public class StorageRecovery extends CommandLine {
* tasks.
*/
private void setup() {
- log.info("setting up the recovery...");
+ LOG.info("setting up the recovery...");
getContainerModels();
getChangeLogSystemStreamsAndStorageFactories();
@@ -106,7 +107,7 @@ public class StorageRecovery extends CommandLine {
public void run() {
setup();
- log.info("start recovering...");
+ LOG.info("start recovering...");
systemAdmins.start();
this.containerStorageManagers.forEach((containerName,
containerStorageManager) -> {
@@ -117,7 +118,7 @@ public class StorageRecovery extends CommandLine {
});
systemAdmins.stop();
- log.info("successfully recovered in " + storeBaseDir.toString());
+ LOG.info("successfully recovered in " + storeBaseDir.toString());
}
/**
@@ -125,14 +126,18 @@ public class StorageRecovery extends CommandLine {
*/
private void getContainerModels() {
MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
- CoordinatorStreamManager coordinatorStreamManager = new
CoordinatorStreamManager(jobConfig, metricsRegistryMap);
- coordinatorStreamManager.register(getClass().getSimpleName());
- coordinatorStreamManager.start();
- coordinatorStreamManager.bootstrap();
- ChangelogStreamManager changelogStreamManager = new
ChangelogStreamManager(coordinatorStreamManager);
- JobModel jobModel =
JobModelManager.apply(coordinatorStreamManager.getConfig(),
changelogStreamManager.readPartitionMapping(), metricsRegistryMap).jobModel();
- containers = jobModel.getContainers();
- coordinatorStreamManager.stop();
+ CoordinatorStreamStore coordinatorStreamStore = new
CoordinatorStreamStore(jobConfig, metricsRegistryMap);
+ coordinatorStreamStore.init();
+ try {
+ Config configFromCoordinatorStream =
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
+ ChangelogStreamManager changelogStreamManager = new
ChangelogStreamManager(coordinatorStreamStore);
+ JobModelManager jobModelManager =
JobModelManager.apply(configFromCoordinatorStream,
changelogStreamManager.readPartitionMapping(),
+
coordinatorStreamStore, metricsRegistryMap);
+ JobModel jobModel = jobModelManager.jobModel();
+ containers = jobModel.getContainers();
+ } finally {
+ coordinatorStreamStore.close();
+ }
}
/**
@@ -143,12 +148,12 @@ public class StorageRecovery extends CommandLine {
JavaStorageConfig config = new JavaStorageConfig(jobConfig);
List<String> storeNames = config.getStoreNames();
- log.info("Got store names: " + storeNames.toString());
+ LOG.info("Got store names: " + storeNames.toString());
for (String storeName : storeNames) {
String streamName = config.getChangelogStream(storeName);
- log.info("stream name for " + storeName + " is " + streamName);
+ LOG.info("stream name for " + storeName + " is " + streamName);
if (streamName != null) {
changeLogSystemStreams.put(storeName,
StreamUtil.getSystemStreamFromNames(streamName));
diff --git
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 25f7a17..2284ce4 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -29,16 +29,17 @@ import
org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config._
import org.apache.samza.container.TaskName
-import org.apache.samza.job.JobRunner.{info, warn, _}
+import org.apache.samza.job.JobRunner.info
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{CommandLine, Logging, Util}
+import org.apache.samza.util.{CommandLine, CoordinatorStreamUtil, Logging,
Util}
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import scala.collection.JavaConverters._
import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
import org.apache.samza.execution.JobPlanner
import org.apache.samza.storage.ChangelogStreamManager
@@ -126,8 +127,9 @@ object CheckpointTool {
}
def apply(config: Config, offsets: TaskNameToCheckpointMap): CheckpointTool
= {
- val coordinatorStreamManager = new CoordinatorStreamManager(config, new
MetricsRegistryMap())
- new CheckpointTool(offsets, coordinatorStreamManager)
+ val metadataStore: CoordinatorStreamStore = new
CoordinatorStreamStore(config, new MetricsRegistryMap())
+ metadataStore.init()
+ new CheckpointTool(offsets, metadataStore, config)
}
def rewriteConfig(config: JobConfig): Config = {
@@ -158,61 +160,70 @@ object CheckpointTool {
}
}
-class CheckpointTool(newOffsets: TaskNameToCheckpointMap,
coordinatorStreamManager: CoordinatorStreamManager) extends Logging {
+class CheckpointTool(newOffsets: TaskNameToCheckpointMap,
coordinatorStreamStore: CoordinatorStreamStore, userDefinedConfig: Config)
extends Logging {
def run() {
- // Read the configuration stored in the coordinator stream.
- coordinatorStreamManager.register(getClass.getSimpleName)
- coordinatorStreamManager.start()
- coordinatorStreamManager.bootstrap()
- val configFromCoordinatorStream: Config =
coordinatorStreamManager.getConfig
+ val configFromCoordinatorStream: Config =
getConfigFromCoordinatorStream(coordinatorStreamStore)
+
+ println("Configuration read from the coordinator stream")
+ println(configFromCoordinatorStream)
+
+ val combinedConfigMap: util.Map[String, String] = new util.HashMap[String,
String]()
+ combinedConfigMap.putAll(configFromCoordinatorStream)
+ combinedConfigMap.putAll(userDefinedConfig)
+ val combinedConfig: Config = new MapConfig(combinedConfigMap)
// Instantiate the checkpoint manager with coordinator stream
configuration.
- val checkpointManager: CheckpointManager =
configFromCoordinatorStream.getCheckpointManagerFactory() match {
+ val checkpointManager: CheckpointManager =
combinedConfig.getCheckpointManagerFactory() match {
case Some(className) =>
Util.getObj(className, classOf[CheckpointManagerFactory])
- .getCheckpointManager(configFromCoordinatorStream, new
MetricsRegistryMap)
+ .getCheckpointManager(combinedConfig, new MetricsRegistryMap)
case _ =>
throw new SamzaException("Configuration: task.checkpoint.factory is
not defined.")
}
-
- // Find all the TaskNames that would be generated for this job config
- val changelogManager = new ChangelogStreamManager(coordinatorStreamManager)
- val jobModelManager = JobModelManager(configFromCoordinatorStream,
changelogManager.readPartitionMapping())
- val taskNames = jobModelManager
- .jobModel
- .getContainers
- .values
- .asScala
- .flatMap(_.getTasks.asScala.keys)
- .toSet
-
- taskNames.foreach(checkpointManager.register)
- checkpointManager.start()
-
- val lastCheckpoints = taskNames.map(taskName => {
- taskName -> Option(checkpointManager.readLastCheckpoint(taskName))
- .getOrElse(new Checkpoint(new
java.util.HashMap[SystemStreamPartition, String]()))
- .getOffsets
- .asScala
- .toMap
- }).toMap
-
- lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2, "Current
checkpoint for task: "+ lcp._1))
-
- if (newOffsets != null) {
- newOffsets.foreach {
- case (taskName: TaskName, offsets: Map[SystemStreamPartition, String])
=> {
- logCheckpoint(taskName, offsets, "New offset to be written for task:
" + taskName)
- val checkpoint = new Checkpoint(offsets.asJava)
- checkpointManager.writeCheckpoint(taskName, checkpoint)
- info(s"Updated the checkpoint of the task: $taskName to: $offsets")
+ try {
+ // Find all the TaskNames that would be generated for this job config
+ val changelogManager = new ChangelogStreamManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE))
+ val jobModelManager = JobModelManager(combinedConfig,
changelogManager.readPartitionMapping(), coordinatorStreamStore, new
MetricsRegistryMap())
+ val taskNames = jobModelManager
+ .jobModel
+ .getContainers
+ .values
+ .asScala
+ .flatMap(_.getTasks.asScala.keys)
+ .toSet
+
+ taskNames.foreach(checkpointManager.register)
+ checkpointManager.start()
+
+ val lastCheckpoints = taskNames.map(taskName => {
+ taskName -> Option(checkpointManager.readLastCheckpoint(taskName))
+ .getOrElse(new Checkpoint(new
java.util.HashMap[SystemStreamPartition, String]()))
+ .getOffsets
+ .asScala
+ .toMap
+ }).toMap
+
+ lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2, "Current
checkpoint for task: "+ lcp._1))
+
+ if (newOffsets != null) {
+ newOffsets.foreach {
+ case (taskName: TaskName, offsets: Map[SystemStreamPartition,
String]) => {
+ logCheckpoint(taskName, offsets, "New offset to be written for
task: " + taskName)
+ val checkpoint = new Checkpoint(offsets.asJava)
+ checkpointManager.writeCheckpoint(taskName, checkpoint)
+ info(s"Updated the checkpoint of the task: $taskName to: $offsets")
+ }
}
}
- }
+ } finally {
+ checkpointManager.stop()
+ coordinatorStreamStore.close()
+ }
+ }
- checkpointManager.stop()
- coordinatorStreamManager.stop()
+ def getConfigFromCoordinatorStream(coordinatorStreamStore:
CoordinatorStreamStore): Config = {
+ return
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
}
def logCheckpoint(tn: TaskName, checkpoint: Map[SystemStreamPartition,
String], prefix: String) {
diff --git
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 230a62d..4ada2f5 100644
---
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -30,22 +30,20 @@ import org.apache.samza.config.Config
import org.apache.samza.container.grouper.stream.SSPGrouperProxy
import
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
import org.apache.samza.container.grouper.task._
+import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
import org.apache.samza.container.LocalityManager
import org.apache.samza.container.TaskName
-import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory
-import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
import org.apache.samza.coordinator.server.HttpServer
import org.apache.samza.coordinator.server.JobServlet
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
-import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping
-import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping
-import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.TaskMode
import org.apache.samza.job.model.TaskModel
-import org.apache.samza.metadatastore.MetadataStore
-import org.apache.samza.metadatastore.MetadataStoreFactory
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.runtime.LocationId
@@ -81,10 +79,9 @@ object JobModelManager extends Logging {
* @param metricsRegistry the registry for reporting metrics.
* @return the instantiated {@see JobModelManager}.
*/
- def apply(config: Config, changelogPartitionMapping: util.Map[TaskName,
Integer], metricsRegistry: MetricsRegistry = new MetricsRegistryMap()):
JobModelManager = {
- val coordinatorStreamStoreFactory: MetadataStoreFactory = new
CoordinatorStreamMetadataStoreFactory()
- val coordinatorStreamStore: MetadataStore =
coordinatorStreamStoreFactory.getMetadataStore(SOURCE, config, metricsRegistry)
- coordinatorStreamStore.init()
+ def apply(config: Config, changelogPartitionMapping: util.Map[TaskName,
Integer],
+ coordinatorStreamStore: CoordinatorStreamStore,
+ metricsRegistry: MetricsRegistry = new MetricsRegistryMap()):
JobModelManager = {
// Instantiate the respective metadata store util classes which uses the
same coordinator metadata store.
val localityManager = new LocalityManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetContainerHostMapping.TYPE))
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index d16c294..40884d6 100644
---
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -20,16 +20,19 @@
package org.apache.samza.job.local
import java.util
+
import org.apache.samza.SamzaException
import org.apache.samza.config.{Config, JobConfig, TaskConfigJava}
import org.apache.samza.config.TaskConfig._
import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob,
StreamJobFactory}
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.util.{CoordinatorStreamUtil, Logging, Util}
+
import scala.collection.JavaConversions._
/**
@@ -44,59 +47,63 @@ class ProcessJobFactory extends StreamJobFactory with
Logging {
}
val metricsRegistry = new MetricsRegistryMap()
- val coordinatorStreamManager = new CoordinatorStreamManager(config,
metricsRegistry)
- coordinatorStreamManager.register(getClass.getSimpleName)
- coordinatorStreamManager.start
- coordinatorStreamManager.bootstrap
- val changelogStreamManager = new
ChangelogStreamManager(coordinatorStreamManager)
-
- val coordinator = JobModelManager(coordinatorStreamManager.getConfig,
changelogStreamManager.readPartitionMapping(), metricsRegistry)
- val jobModel = coordinator.jobModel
-
- val taskPartitionMappings: util.Map[TaskName, Integer] = new
util.HashMap[TaskName, Integer]
- for (containerModel <- jobModel.getContainers.values) {
- for (taskModel <- containerModel.getTasks.values) {
- taskPartitionMappings.put(taskModel.getTaskName,
taskModel.getChangelogPartition.getPartitionId)
+ val coordinatorStreamStore: CoordinatorStreamStore = new
CoordinatorStreamStore(config, new MetricsRegistryMap())
+ coordinatorStreamStore.init()
+
+ try {
+ val configFromCoordinatorStream: Config =
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
+
+ val changelogStreamManager = new ChangelogStreamManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE))
+
+ val coordinator = JobModelManager(configFromCoordinatorStream,
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore,
metricsRegistry)
+ val jobModel = coordinator.jobModel
+
+ val taskPartitionMappings: util.Map[TaskName, Integer] = new
util.HashMap[TaskName, Integer]
+ for (containerModel <- jobModel.getContainers.values) {
+ for (taskModel <- containerModel.getTasks.values) {
+ taskPartitionMappings.put(taskModel.getTaskName,
taskModel.getChangelogPartition.getPartitionId)
+ }
}
- }
- changelogStreamManager.writePartitionMapping(taskPartitionMappings)
- coordinatorStreamManager.stop()
+ changelogStreamManager.writePartitionMapping(taskPartitionMappings)
- //create necessary checkpoint and changelog streams
- val checkpointManager = new
TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
- if (checkpointManager != null) {
- checkpointManager.createResources()
- }
- ChangelogStreamManager.createChangelogStreams(jobModel.getConfig,
jobModel.maxChangeLogStreamPartitions)
+ //create necessary checkpoint and changelog streams
+ val checkpointManager = new
TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
+ if (checkpointManager != null) {
+ checkpointManager.createResources()
+ }
+ ChangelogStreamManager.createChangelogStreams(jobModel.getConfig,
jobModel.maxChangeLogStreamPartitions)
- val containerModel = coordinator.jobModel.getContainers.get(0)
+ val containerModel = coordinator.jobModel.getContainers.get(0)
- val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is
configured
- info("Process job. using fwkPath = " + fwkPath)
+ val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is
configured
+ info("Process job. using fwkPath = " + fwkPath)
- val commandBuilder = {
- config.getCommandClass match {
- case Some(cmdBuilderClassName) => {
- // A command class was specified, so we need to use a process job to
- // execute the command in its own process.
- Util.getObj(cmdBuilderClassName, classOf[CommandBuilder])
- }
- case _ => {
- info("Defaulting to ShellCommandBuilder")
- new ShellCommandBuilder
+ val commandBuilder = {
+ config.getCommandClass match {
+ case Some(cmdBuilderClassName) => {
+ // A command class was specified, so we need to use a process job
to
+ // execute the command in its own process.
+ Util.getObj(cmdBuilderClassName, classOf[CommandBuilder])
+ }
+ case _ => {
+ info("Defaulting to ShellCommandBuilder")
+ new ShellCommandBuilder
+ }
}
}
- }
- // JobCoordinator is stopped by ProcessJob when it exits
- coordinator.start
+ // JobCoordinator is stopped by ProcessJob when it exits
+ coordinator.start
- commandBuilder
- .setConfig(config)
- .setId("0")
- .setUrl(coordinator.server.getUrl)
- .setCommandPath(fwkPath)
+ commandBuilder
+ .setConfig(config)
+ .setId("0")
+ .setUrl(coordinator.server.getUrl)
+ .setCommandPath(fwkPath)
- new ProcessJob(commandBuilder, coordinator)
+ new ProcessJob(commandBuilder, coordinator)
+ } finally {
+ coordinatorStreamStore.close()
+ }
}
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index f505f22..d4b88d3 100644
---
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -21,19 +21,21 @@ package org.apache.samza.job.local
import org.apache.samza.application.ApplicationUtil
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
+import org.apache.samza.clustermanager.ClusterBasedJobCoordinator
import org.apache.samza.config.JobConfig._
import org.apache.samza.config.ShellCommandConfig._
import org.apache.samza.config.{Config, JobConfig, TaskConfigJava}
import org.apache.samza.container.{SamzaContainer, SamzaContainerListener,
TaskName}
import org.apache.samza.context.{ExternalContext, JobContextImpl}
import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
import org.apache.samza.job.{StreamJob, StreamJobFactory}
import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap,
MetricsReporter}
import org.apache.samza.runtime.ProcessorContext
import org.apache.samza.storage.ChangelogStreamManager
import org.apache.samza.task.{TaskFactory, TaskFactoryUtil}
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{CoordinatorStreamUtil, Logging}
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -46,14 +48,14 @@ class ThreadJobFactory extends StreamJobFactory with
Logging {
info("Creating a ThreadJob, which is only meant for debugging.")
val metricsRegistry = new MetricsRegistryMap()
- val coordinatorStreamManager = new CoordinatorStreamManager(config,
metricsRegistry)
- coordinatorStreamManager.register(getClass.getSimpleName)
- coordinatorStreamManager.start
- coordinatorStreamManager.bootstrap
- val changelogStreamManager = new
ChangelogStreamManager(coordinatorStreamManager)
+ val coordinatorStreamStore: CoordinatorStreamStore = new
CoordinatorStreamStore(config, new MetricsRegistryMap())
+ coordinatorStreamStore.init()
- val coordinator = JobModelManager(coordinatorStreamManager.getConfig,
changelogStreamManager.readPartitionMapping(), metricsRegistry)
+ val configFromCoordinatorStream: Config =
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
+ val changelogStreamManager = new ChangelogStreamManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE))
+
+ val coordinator = JobModelManager(configFromCoordinatorStream,
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore,
metricsRegistry)
val jobModel = coordinator.jobModel
val taskPartitionMappings: mutable.Map[TaskName, Integer] =
mutable.Map[TaskName, Integer]()
@@ -76,7 +78,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
val containerId = "0"
var jmxServer: JmxServer = null
if (new JobConfig(config).getJMXEnabled) {
- jmxServer = new JmxServer();
+ jmxServer = new JmxServer()
}
val appDesc =
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config),
config)
@@ -130,10 +132,10 @@ class ThreadJobFactory extends StreamJobFactory with
Logging {
threadJob
} finally {
coordinator.stop
- coordinatorStreamManager.stop()
if (jmxServer != null) {
jmxServer.stop
}
+ coordinatorStreamStore.close()
}
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 0d767c6..2fca36c 100644
---
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -21,10 +21,14 @@
package org.apache.samza.util
+import java.util
import org.apache.samza.SamzaException
import org.apache.samza.config._
import org.apache.samza.system.{SystemFactory, SystemStream}
import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde
+import org.apache.samza.coordinator.stream.messages.SetConfig
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import scala.collection.immutable.Map
@@ -91,4 +95,21 @@ object CoordinatorStreamUtil {
(config.getName.getOrElse(throw new ConfigException("Missing required
config: job.name")),
config.getJobId)
}
+
+ /**
+ * Reads and returns the complete configuration stored in the coordinator
stream.
+ * @param coordinatorStreamStore an instance of the instantiated {@link
CoordinatorStreamStore}.
+ * @return the configuration read from the coordinator stream.
+ */
+ def readConfigFromCoordinatorStream(coordinatorStreamStore:
CoordinatorStreamStore): Config = {
+ val namespaceAwareCoordinatorStreamStore:
NamespaceAwareCoordinatorStreamStore = new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE)
+ val configFromCoordinatorStream: util.Map[String, Array[Byte]] =
namespaceAwareCoordinatorStreamStore.all
+ val configMap: util.Map[String, String] = new util.HashMap[String, String]
+ for ((key: String, valueAsBytes: Array[Byte]) <-
configFromCoordinatorStream.asScala) {
+ val valueSerde: CoordinatorStreamValueSerde = new
CoordinatorStreamValueSerde(SetConfig.TYPE)
+ val valueAsString: String = valueSerde.fromBytes(valueAsBytes)
+ configMap.put(key, valueAsString)
+ }
+ new MapConfig(configMap)
+ }
}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index e2776b2..cb1e099 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
@@ -55,7 +56,7 @@ public class TestClusterBasedJobCoordinator {
Map<String, String> configMap;
@Before
- public void setUp() throws NoSuchFieldException, NoSuchMethodException {
+ public void setUp() {
configMap = new HashMap<>();
configMap.put("job.name", "test-job");
configMap.put("job.coordinator.system", "kafka");
@@ -81,6 +82,8 @@ public class TestClusterBasedJobCoordinator {
@Test
public void testPartitionCountMonitorWithDurableStates() {
configMap.put("stores.mystore.changelog", "mychangelog");
+ configMap.put(JobConfig.JOB_CONTAINER_COUNT(), "1");
+
when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(new
MapConfig(configMap));
Config config = new MapConfig(configMap);
// mimic job runner code to write the config to coordinator stream
@@ -99,6 +102,8 @@ public class TestClusterBasedJobCoordinator {
@Test
public void testPartitionCountMonitorWithoutDurableStates() {
+ configMap.put(JobConfig.JOB_CONTAINER_COUNT(), "1");
+
when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(new
MapConfig(configMap));
Config config = new MapConfig(configMap);
// mimic job runner code to write the config to coordinator stream
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
index 9ec548d..d6f5e73 100644
---
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
@@ -40,12 +40,16 @@ public class CoordinatorStreamStoreTestUtil {
private final Config config;
public CoordinatorStreamStoreTestUtil(Config config) {
+ this(config, "test-kafka");
+ }
+
+ public CoordinatorStreamStoreTestUtil(Config config, String systemName) {
this.config = config;
this.systemFactory = new MockCoordinatorStreamSystemFactory();
MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
- SystemConsumer systemConsumer = systemFactory.getConsumer("test-kafka",
config, new NoOpMetricsRegistry());
- SystemProducer systemProducer = systemFactory.getProducer("test-kafka",
config, new NoOpMetricsRegistry());
- SystemAdmin systemAdmin = systemFactory.getAdmin("test-kafka", config);
+ SystemConsumer systemConsumer = systemFactory.getConsumer(systemName,
config, new NoOpMetricsRegistry());
+ SystemProducer systemProducer = systemFactory.getProducer(systemName,
config, new NoOpMetricsRegistry());
+ SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
this.coordinatorStreamStore = new CoordinatorStreamStore(config,
systemProducer, systemConsumer, systemAdmin);
this.coordinatorStreamStore.init();
}
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 7b7d41f..6035fc2 100644
---
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -95,6 +95,7 @@ public class MockCoordinatorStreamSystemFactory implements
SystemFactory {
String streamName =
CoordinatorStreamUtil.getCoordinatorStreamName(jobName, jobId);
SystemStreamPartition systemStreamPartition = new
SystemStreamPartition(systemName, streamName, new Partition(0));
mockConsumer = new
MockCoordinatorStreamWrappedConsumer(systemStreamPartition, config);
+ mockConsumer.register(systemStreamPartition, "0");
return mockConsumer;
}
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
index b50aa3c..136fe5e 100644
---
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -65,10 +65,6 @@ public class MockCoordinatorStreamWrappedConsumer extends
BlockingEnvelopeMap {
convertConfigToCoordinatorMessage(config);
}
- public void addMoreMessages(Config config) {
- convertConfigToCoordinatorMessage(config);
- }
-
public void addMessageEnvelope(IncomingMessageEnvelope envelope) throws
IOException, InterruptedException {
put(systemStreamPartition, envelope);
setIsAtHead(systemStreamPartition, true);
@@ -77,8 +73,8 @@ public class MockCoordinatorStreamWrappedConsumer extends
BlockingEnvelopeMap {
private void convertConfigToCoordinatorMessage(Config config) {
try {
for (Map.Entry<String, String> configPair : config.entrySet()) {
- byte[] keyBytes = null;
- byte[] messgeBytes = null;
+ byte[] keyBytes;
+ byte[] messgeBytes;
if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
String[] changelogInfo = configPair.getKey().split(":");
String changeLogPartition = configPair.getValue();
@@ -111,11 +107,5 @@ public class MockCoordinatorStreamWrappedConsumer extends
BlockingEnvelopeMap {
return super.poll(systemStreamPartitions, timeout);
}
- public CountDownLatch blockPool() {
- blockpollFlag = true;
- return blockConsumerPoll;
- }
-
-
public void stop() {}
}
diff --git
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 93c7096..971e55f 100644
---
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -28,6 +28,7 @@ import org.apache.samza.container.TaskName
import
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
import org.apache.samza.checkpoint.TestCheckpointTool.MockSystemFactory
import org.apache.samza.config._
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
CoordinatorStreamStoreTestUtil}
import org.apache.samza.metrics.MetricsRegistry
import
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system.SystemAdmin
@@ -44,9 +45,9 @@ import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mockito.MockitoSugar
import scala.collection.JavaConverters._
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
import org.apache.samza.execution.JobPlanner
+import org.mockito.Mockito
object TestCheckpointTool {
var checkpointManager: CheckpointManager = _
@@ -145,7 +146,6 @@ class TestCheckpointTool extends AssertionsForJUnit with
MockitoSugar {
def
testShouldInstantiateCheckpointManagerWithConfigurationFromCoordinatorStream():
Unit = {
val offsetMap: TaskNameToCheckpointMap = Map(tn0 -> Map(new
SystemStreamPartition("test", "foo", p0) -> "42"),
tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
- val coordinatorStreamManager = mock[CoordinatorStreamManager]
val userDefinedConfig: MapConfig = new MapConfig(Map(
ApplicationConfig.APP_NAME -> "test",
JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
@@ -156,16 +156,20 @@ class TestCheckpointTool extends AssertionsForJUnit with
MockitoSugar {
TaskConfig.GROUPER_FACTORY ->
"org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
).asJava)
val generatedConfigs: MapConfig =
JobPlanner.generateSingleJobConfig(userDefinedConfig)
- when(coordinatorStreamManager.getConfig).thenReturn(generatedConfigs)
+ val coordinatorStreamStoreTestUtil: CoordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(config)
- val checkpointTool: CheckpointTool = new CheckpointTool(offsetMap,
coordinatorStreamManager)
+
+ val coordinatorStreamStore: CoordinatorStreamStore =
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore
+ val checkpointTool: CheckpointTool = Mockito.spy(new
CheckpointTool(offsetMap, coordinatorStreamStore, generatedConfigs))
+
Mockito.when(checkpointTool.getConfigFromCoordinatorStream(coordinatorStreamStore)).thenReturn(userDefinedConfig)
checkpointTool.run()
verify(TestCheckpointTool.checkpointManager)
.writeCheckpoint(tn0, new Checkpoint(Map(new
SystemStreamPartition("test", "foo", p0) -> "42").asJava))
verify(TestCheckpointTool.checkpointManager)
.writeCheckpoint(tn1, new Checkpoint(Map(new
SystemStreamPartition("test", "foo", p1) -> "43").asJava))
- verify(coordinatorStreamManager).getConfig
- assert(TestCheckpointTool.coordinatorConfig == generatedConfigs)
+
+ // Two configurations job.id, job.name are populated in the coordinator
config by SamzaRuntime and it is not present in generated config.
+
assert(generatedConfigs.entrySet().containsAll(TestCheckpointTool.coordinatorConfig.entrySet()))
}
}
diff --git
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 5b43840..3c4e4fe 100644
---
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -25,7 +25,9 @@ import org.apache.samza.Partition
import
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
import org.apache.samza.config._
import org.apache.samza.container.{SamzaContainer, TaskName}
-import org.apache.samza.coordinator.stream.{CoordinatorStreamManager,
MockCoordinatorStreamSystemFactory, MockCoordinatorStreamWrappedConsumer}
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
CoordinatorStreamStoreTestUtil, NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
+import
org.apache.samza.coordinator.stream.{MockCoordinatorStreamSystemFactory,
MockCoordinatorStreamWrappedConsumer}
import org.apache.samza.job.MockJobFactory
import org.apache.samza.job.local.{ProcessJobFactory, ThreadJobFactory}
import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
@@ -106,16 +108,20 @@ class TestJobCoordinator extends FlatSpec with
PrivateMethodTester {
// Verify that the atomicReference is initialized
assertNotNull(JobModelManager.jobModelRef.get())
- assertEquals(expectedJobModel, JobModelManager.jobModelRef.get())
+ val expectedContainerModels = new util.TreeMap[String,
ContainerModel](expectedJobModel.getContainers)
+ val actualContainerModels = new util.TreeMap[String,
ContainerModel](JobModelManager.jobModelRef.get().getContainers)
+ assertEquals(expectedContainerModels, actualContainerModels)
coordinator.start
- assertEquals(new MapConfig(config.asJava), coordinator.jobModel.getConfig)
- assertEquals(expectedJobModel, coordinator.jobModel)
+ val expectedConfig: Config = coordinator.jobModel.getConfig
+ val actualConfig: Config = new MapConfig(config.asJava)
+ assertTrue(expectedConfig.entrySet().containsAll(actualConfig.entrySet()))
+ assertEquals(expectedJobModel.getContainers,
coordinator.jobModel.getContainers)
val response = HttpUtil.read(coordinator.server.getUrl)
// Verify that the JobServlet is serving the correct jobModel
val jobModelFromCoordinatorUrl =
SamzaObjectMapper.getObjectMapper.readValue(response, classOf[JobModel])
- assertEquals(expectedJobModel, jobModelFromCoordinatorUrl)
+ assertEquals(expectedJobModel.getContainers,
jobModelFromCoordinatorUrl.getContainers)
coordinator.stop
}
@@ -270,22 +276,24 @@ class TestJobCoordinator extends FlatSpec with
PrivateMethodTester {
}
def getTestJobModelManager(config: MapConfig) = {
- val coordinatorStreamManager = new CoordinatorStreamManager(config, new
MetricsRegistryMap)
- coordinatorStreamManager.register("TestJobCoordinator")
- coordinatorStreamManager.start
- coordinatorStreamManager.bootstrap
- val changelogPartitionManager = new
ChangelogStreamManager(coordinatorStreamManager)
- val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig,
changelogPartitionManager.readPartitionMapping())
- coordinatorStreamManager.stop()
- jobModelManager
+ val coordinatorStreamTestUtil: CoordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(config, "coordinator")
+ val coordinatorStreamStore: CoordinatorStreamStore =
coordinatorStreamTestUtil.getCoordinatorStreamStore
+ val namespaceAwareCoordinatorStore: NamespaceAwareCoordinatorStreamStore =
new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE)
+ try {
+ val changelogPartitionManager = new
ChangelogStreamManager(namespaceAwareCoordinatorStore)
+ val jobModelManager = JobModelManager(config,
changelogPartitionManager.readPartitionMapping(), coordinatorStreamStore, new
MetricsRegistryMap())
+ jobModelManager
+ } finally {
+ coordinatorStreamStore.close()
+ }
}
@Before
def setUp() {
// setup the test stream metadata
- MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test",
"stream1", new Partition(0)), new util.ArrayList[IncomingMessageEnvelope]());
- MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test",
"stream1", new Partition(1)), new util.ArrayList[IncomingMessageEnvelope]());
- MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test",
"stream1", new Partition(2)), new util.ArrayList[IncomingMessageEnvelope]());
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test",
"stream1", new Partition(0)), new util.ArrayList[IncomingMessageEnvelope]())
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test",
"stream1", new Partition(1)), new util.ArrayList[IncomingMessageEnvelope]())
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test",
"stream1", new Partition(2)), new util.ArrayList[IncomingMessageEnvelope]())
}
@After
diff --git
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index a32ea81..3f45e67 100644
---
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -33,17 +33,22 @@ import
org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.samza.SamzaException;
+import org.apache.samza.clustermanager.ClusterBasedJobCoordinator;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.yarn.ClientHelper;
import org.apache.samza.metrics.JmxMetricsAccessor;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsValidator;
import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.Util;
import org.apache.samza.util.hadoop.HttpFileSystem;
import org.apache.samza.util.CommandLine;
@@ -154,26 +159,29 @@ public class YarnJobValidationTool {
public void validateJmxMetrics() throws Exception {
MetricsRegistry metricsRegistry = new MetricsRegistryMap();
- CoordinatorStreamManager coordinatorStreamManager = new
CoordinatorStreamManager(config, metricsRegistry);
- coordinatorStreamManager.register(getClass().getSimpleName());
- coordinatorStreamManager.start();
- coordinatorStreamManager.bootstrap();
- ChangelogStreamManager changelogStreamManager = new
ChangelogStreamManager(coordinatorStreamManager);
- JobModelManager jobModelManager =
JobModelManager.apply(coordinatorStreamManager.getConfig(),
changelogStreamManager.readPartitionMapping(), metricsRegistry);
- validator.init(config);
- Map<String, String> jmxUrls =
jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
- for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
- String containerId = entry.getKey();
- String jmxUrl = entry.getValue();
- log.info("validate container " + containerId + " metrics with JMX: " +
jmxUrl);
- JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
- jmxMetrics.connect();
- validator.validate(jmxMetrics);
- jmxMetrics.close();
- log.info("validate container " + containerId + " successfully");
+ CoordinatorStreamStore coordinatorStreamStore = new
CoordinatorStreamStore(config, metricsRegistry);
+ coordinatorStreamStore.init();
+ try{
+ Config configFromCoordinatorStream =
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
+ ChangelogStreamManager changelogStreamManager = new
ChangelogStreamManager(coordinatorStreamStore);
+ JobModelManager jobModelManager =
JobModelManager.apply(configFromCoordinatorStream,
changelogStreamManager.readPartitionMapping(),
+
coordinatorStreamStore, metricsRegistry);
+ validator.init(config);
+ Map<String, String> jmxUrls =
jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
+ for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
+ String containerId = entry.getKey();
+ String jmxUrl = entry.getValue();
+ log.info("validate container " + containerId + " metrics with JMX: " +
jmxUrl);
+ JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
+ jmxMetrics.connect();
+ validator.validate(jmxMetrics);
+ jmxMetrics.close();
+ log.info("validate container " + containerId + " successfully");
+ }
+ validator.complete();
+ } finally {
+ coordinatorStreamStore.close();
}
- validator.complete();
- coordinatorStreamManager.stop();
}
public static void main(String [] args) throws Exception {
diff --git
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
index 1ad4522..e1dde51 100644
---
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
+++
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
@@ -23,11 +23,12 @@ import java.io.{BufferedReader, InputStreamReader}
import java.net.URL
import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.samza.clustermanager.SamzaApplicationState
+import org.apache.samza.clustermanager.{ClusterBasedJobCoordinator,
SamzaApplicationState}
import org.apache.samza.config.{Config, MapConfig}
-import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.stream.{CoordinatorStreamManager,
MockCoordinatorStreamSystemFactory}
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
CoordinatorStreamStoreTestUtil, NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
import org.apache.samza.metrics._
import org.apache.samza.storage.ChangelogStreamManager
import org.junit.Assert._
@@ -39,14 +40,14 @@ class TestSamzaYarnAppMasterService {
@Test
def testAppMasterDashboardShouldStart {
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
val config = getDummyConfig
val jobModelManager = getTestJobModelManager(config)
val samzaState = new SamzaApplicationState(jobModelManager)
val registry = new MetricsRegistryMap()
- val state = new YarnAppState(-1,
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
"testHost", 1, 1);
+ val state = new YarnAppState(-1,
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
"testHost", 1, 1)
val service = new SamzaYarnAppMasterService(config, samzaState, state,
registry, null)
- val taskName = new TaskName("test")
// start the dashboard
service.onInit
@@ -56,15 +57,15 @@ class TestSamzaYarnAppMasterService {
// check to see if it's running
val url = new URL(state.rpcUrl.toString + "am")
- val is = url.openConnection().getInputStream();
- val reader = new BufferedReader(new InputStreamReader(is));
- var line: String = null;
+ val is = url.openConnection().getInputStream()
+ val reader = new BufferedReader(new InputStreamReader(is))
+ var line: String = null
do {
line = reader.readLine()
} while (line != null)
- reader.close();
+ reader.close()
}
/**
@@ -77,7 +78,7 @@ class TestSamzaYarnAppMasterService {
val config = getDummyConfig
val jobModelManager = getTestJobModelManager(config)
val samzaState = new SamzaApplicationState(jobModelManager)
- val state = new YarnAppState(-1,
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
"testHost", 1, 1);
+ val state = new YarnAppState(-1,
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"),
"testHost", 1, 1)
val registry = new MetricsRegistryMap()
val service = new SamzaYarnAppMasterService(config, samzaState, state,
registry, null)
@@ -101,14 +102,16 @@ class TestSamzaYarnAppMasterService {
}
private def getTestJobModelManager(config: Config) = {
- val coordinatorStreamManager = new CoordinatorStreamManager(config, new
MetricsRegistryMap)
- coordinatorStreamManager.register("TestJobCoordinator")
- coordinatorStreamManager.start
- coordinatorStreamManager.bootstrap
- val changelogPartitionManager = new
ChangelogStreamManager(coordinatorStreamManager)
- val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig,
changelogPartitionManager.readPartitionMapping())
- coordinatorStreamManager.stop()
- jobModelManager
+ val coordinatorStreamTestUtil: CoordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(config)
+ val coordinatorStreamStore: CoordinatorStreamStore =
coordinatorStreamTestUtil.getCoordinatorStreamStore
+ val namespaceAwareCoordinatorStore: NamespaceAwareCoordinatorStreamStore =
new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE)
+ try {
+ val changelogPartitionManager = new
ChangelogStreamManager(namespaceAwareCoordinatorStore)
+ val jobModelManager = JobModelManager(getDummyConfig,
changelogPartitionManager.readPartitionMapping(), coordinatorStreamStore, new
MetricsRegistryMap())
+ jobModelManager
+ } finally {
+ coordinatorStreamStore.close()
+ }
}
private def getDummyConfig: Config = new MapConfig(Map[String, String](
@@ -124,8 +127,4 @@ class TestSamzaYarnAppMasterService {
"yarn.container.retry.window.ms" -> "1999999999",
"job.coordinator.system" -> "coordinator",
"systems.coordinator.samza.factory" ->
classOf[MockCoordinatorStreamSystemFactory].getCanonicalName).asJava)
-}
-
-
-
-
+}
\ No newline at end of file