This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 85afbb7 SAMZA-2537: Fix JobCoordinatorLaunchUtil's execution sequence.
85afbb7 is described below
commit 85afbb7c65f097e098fdd1d4601a73c87e990953
Author: Ke Wu <[email protected]>
AuthorDate: Tue Jun 2 18:09:47 2020 -0700
SAMZA-2537: Fix JobCoordinatorLaunchUtil's execution sequence.
Symptom: A new Samza job fail to deploy.
Cause: For a new samza job, coordinator stream may have not being created
yet, therefore, JobCoordinatorLaunchUtil will fail when trying to fetch launch
config from coordinator stream.
Changes: Update JobCoordinatorLaunchUtil to create coordinator stream if
does not exist before fetching launch config from it.
Tests: None
API Changes: None
Upgrade Instructions: None
Usage Instructions: None
Author: Ke Wu <[email protected]>
Reviewers: mynameborat <[email protected]>
Closes #1370 from kw2542/SAMZA-2537
---
.../clustermanager/JobCoordinatorLaunchUtil.java | 2 ++
.../apache/samza/util/CoordinatorStreamUtil.scala | 26 ++++++++++++++--------
.../TestJobCoordinatorLaunchUtil.java | 8 +++++++
3 files changed, 27 insertions(+), 9 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index c943c0c..63a1f5c 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -59,6 +59,8 @@ public class JobCoordinatorLaunchUtil {
}
Config fullConfig = jobConfigs.get(0);
+ // Create coordinator stream if does not exist before fetching launch
config from it.
+ CoordinatorStreamUtil.createCoordinatorStream(fullConfig);
MetricsRegistryMap metrics = new MetricsRegistryMap();
MetadataStore
metadataStore = new
CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(fullConfig),
metrics);
diff --git
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index bf6aeb3..37d1393 100644
---
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -49,6 +49,22 @@ object CoordinatorStreamUtil extends Logging {
}
/**
+ * Creates coordinator stream from config if it does not exist, otherwise
no-op.
+ *
+ * @param config to create coordinator stream.
+ */
+ def createCoordinatorStream(config: Config): Unit = {
+ val systemAdmins = new SystemAdmins(config)
+
+ info("Creating coordinator stream")
+ val coordinatorSystemStream =
CoordinatorStreamUtil.getCoordinatorSystemStream(config)
+ val coordinatorSystemAdmin =
systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
+ coordinatorSystemAdmin.start()
+ CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream,
coordinatorSystemAdmin)
+ coordinatorSystemAdmin.stop()
+ }
+
+ /**
* Creates a coordinator stream.
* @param coordinatorSystemStream the {@see SystemStream} that describes
the stream to create.
* @param coordinatorSystemAdmin the {@see SystemAdmin} used to create the
stream.
@@ -157,15 +173,7 @@ object CoordinatorStreamUtil extends Logging {
debug("config: %s" format config)
val coordinatorSystemConsumer = new
CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
val coordinatorSystemProducer = new
CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
- val systemAdmins = new SystemAdmins(config)
-
- // Create the coordinator stream if it doesn't exist
- info("Creating coordinator stream")
- val coordinatorSystemStream =
CoordinatorStreamUtil.getCoordinatorSystemStream(config)
- val coordinatorSystemAdmin =
systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
- coordinatorSystemAdmin.start()
- CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream,
coordinatorSystemAdmin)
- coordinatorSystemAdmin.stop()
+ CoordinatorStreamUtil.createCoordinatorStream(config)
if (resetJobConfig) {
info("Storing config in coordinator stream.")
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
index 4bf0aaa..af27423 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
@@ -38,12 +38,15 @@ import
org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.verifyNew;
+import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+
@RunWith(PowerMockRunner.class)
@PrepareForTest({
@@ -68,6 +71,7 @@ public class TestJobCoordinatorLaunchUtil {
ClusterBasedJobCoordinator mockJC = mock(ClusterBasedJobCoordinator.class);
PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+ PowerMockito.doNothing().when(CoordinatorStreamUtil.class,
"createCoordinatorStream", any());
PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class,
"buildCoordinatorStreamConfig", any());
PowerMockito.doReturn(autoSizingConfig).when(CoordinatorStreamUtil.class,
"readLaunchConfigFromCoordinatorStream", any(), any());
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
@@ -79,5 +83,9 @@ public class TestJobCoordinatorLaunchUtil {
verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class),
eq(mockCoordinatorStreamStore), eq(finalConfig));
verify(mockJC, times(1)).run();
+ verifyStatic(times(1));
+ CoordinatorStreamUtil.createCoordinatorStream(any());
+ verifyStatic(times(1));
+ CoordinatorStreamUtil.writeConfigToCoordinatorStream(any(), anyBoolean());
}
}