This is an automated email from the ASF dual-hosted git repository.
lhaiesp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 160bbb2 SAMZA-2527:MetadataStore is not initialized before reading in
JobCoordinatorLaunchUtil (#1365)
160bbb2 is described below
commit 160bbb22a9812a78054723279e87a8b618418775
Author: Ke Wu <[email protected]>
AuthorDate: Tue May 19 15:28:42 2020 -0700
SAMZA-2527:MetadataStore is not initialized before reading in
JobCoordinatorLaunchUtil (#1365)
Symtom: Job failed to launch.
Cause: Metadata store is not initialized before invoking
readLaunchConfigFromCoordinatorStream()
Changes: Init metadata store before invoking
readLaunchConfigFromCoordinatorStream
Tests: None
API Changes: None
Upgrade Instructions: None
Usage Instructions: None
Co-authored-by: Ke Wu <[email protected]>
---
.../org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index 6915614..c943c0c 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -62,6 +62,10 @@ public class JobCoordinatorLaunchUtil {
MetricsRegistryMap metrics = new MetricsRegistryMap();
MetadataStore
metadataStore = new
CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(fullConfig),
metrics);
+ // MetadataStore will be closed in ClusterBasedJobCoordinator#onShutDown
+ // initialization of MetadataStore can be moved to
ClusterBasedJobCoordinator after we clean up
+ // ClusterBasedJobCoordinator#createFromMetadataStore
+ metadataStore.init();
// Reads extra launch config from metadata store.
Config launchConfig =
CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(fullConfig,
metadataStore);
Config finalConfig = new MapConfig(launchConfig, fullConfig);
@@ -69,10 +73,6 @@ public class JobCoordinatorLaunchUtil {
// This needs to be consistent with RemoteApplicationRunner#run where
JobRunner#submit to be called instead of JobRunner#run
CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
DiagnosticsUtil.createDiagnosticsStream(finalConfig);
- // MetadataStore will be closed in ClusterBasedJobCoordinator#onShutDown
- // initialization of MetadataStore can be moved to
ClusterBasedJobCoordinator after we clean up
- // ClusterBasedJobCoordinator#createFromMetadataStore
- metadataStore.init();
ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
metrics,