This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch 1.5.0
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/1.5.0 by this push:
     new 10c1d46  SAMZA-2537: Fix JobCoordinatorLaunchUtil's execution sequence.
10c1d46 is described below

commit 10c1d46ef51a4267aa8438dc9d8760334844f1b0
Author: Ke Wu <[email protected]>
AuthorDate: Tue Jun 2 18:09:47 2020 -0700

    SAMZA-2537: Fix JobCoordinatorLaunchUtil's execution sequence.
    
    Symptom: A new Samza job fail to deploy.
    Cause: For a new samza job, coordinator stream may have not being created 
yet, therefore, JobCoordinatorLaunchUtil will fail when trying to fetch launch 
config from coordinator stream.
    Changes: Update JobCoordinatorLaunchUtil to create coordinator stream if 
does not exist before fetching launch config from it.
    Tests: None
    API Changes: None
    Upgrade Instructions: None
    Usage Instructions: None 
    
    Author: Ke Wu <[email protected]>
    
    Reviewers: mynameborat <[email protected]>
    
    Closes #1370 from kw2542/SAMZA-2537
    
    (cherry picked from commit 85afbb7c65f097e098fdd1d4601a73c87e990953)
    Signed-off-by: mynameborat <[email protected]>
---
 .../clustermanager/JobCoordinatorLaunchUtil.java   |  2 ++
 .../apache/samza/util/CoordinatorStreamUtil.scala  | 26 ++++++++++++++--------
 .../TestJobCoordinatorLaunchUtil.java              |  8 +++++++
 3 files changed, 27 insertions(+), 9 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index c943c0c..63a1f5c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -59,6 +59,8 @@ public class JobCoordinatorLaunchUtil {
     }
 
     Config fullConfig = jobConfigs.get(0);
+    // Create coordinator stream if does not exist before fetching launch 
config from it.
+    CoordinatorStreamUtil.createCoordinatorStream(fullConfig);
     MetricsRegistryMap metrics = new MetricsRegistryMap();
     MetadataStore
         metadataStore = new 
CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(fullConfig),
 metrics);
diff --git 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index bf6aeb3..37d1393 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -49,6 +49,22 @@ object CoordinatorStreamUtil extends Logging {
   }
 
   /**
+   * Creates coordinator stream from config if it does not exist, otherwise 
no-op.
+   *
+   * @param config to create coordinator stream.
+   */
+  def createCoordinatorStream(config: Config): Unit = {
+    val systemAdmins = new SystemAdmins(config)
+
+    info("Creating coordinator stream")
+    val coordinatorSystemStream = 
CoordinatorStreamUtil.getCoordinatorSystemStream(config)
+    val coordinatorSystemAdmin = 
systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
+    coordinatorSystemAdmin.start()
+    CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, 
coordinatorSystemAdmin)
+    coordinatorSystemAdmin.stop()
+  }
+
+  /**
     * Creates a coordinator stream.
     * @param coordinatorSystemStream the {@see SystemStream} that describes 
the stream to create.
     * @param coordinatorSystemAdmin the {@see SystemAdmin} used to create the 
stream.
@@ -157,15 +173,7 @@ object CoordinatorStreamUtil extends Logging {
     debug("config: %s" format config)
     val coordinatorSystemConsumer = new 
CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
     val coordinatorSystemProducer = new 
CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
-    val systemAdmins = new SystemAdmins(config)
-
-    // Create the coordinator stream if it doesn't exist
-    info("Creating coordinator stream")
-    val coordinatorSystemStream = 
CoordinatorStreamUtil.getCoordinatorSystemStream(config)
-    val coordinatorSystemAdmin = 
systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
-    coordinatorSystemAdmin.start()
-    CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, 
coordinatorSystemAdmin)
-    coordinatorSystemAdmin.stop()
+    CoordinatorStreamUtil.createCoordinatorStream(config)
 
     if (resetJobConfig) {
       info("Storing config in coordinator stream.")
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
index 4bf0aaa..af27423 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
@@ -38,12 +38,15 @@ import 
org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.verifyNew;
+import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({
@@ -68,6 +71,7 @@ public class TestJobCoordinatorLaunchUtil {
     ClusterBasedJobCoordinator mockJC = mock(ClusterBasedJobCoordinator.class);
 
     PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+    PowerMockito.doNothing().when(CoordinatorStreamUtil.class, 
"createCoordinatorStream", any());
     PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, 
"buildCoordinatorStreamConfig", any());
     PowerMockito.doReturn(autoSizingConfig).when(CoordinatorStreamUtil.class, 
"readLaunchConfigFromCoordinatorStream", any(), any());
     
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
@@ -79,5 +83,9 @@ public class TestJobCoordinatorLaunchUtil {
 
     
verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class),
 eq(mockCoordinatorStreamStore), eq(finalConfig));
     verify(mockJC, times(1)).run();
+    verifyStatic(times(1));
+    CoordinatorStreamUtil.createCoordinatorStream(any());
+    verifyStatic(times(1));
+    CoordinatorStreamUtil.writeConfigToCoordinatorStream(any(), anyBoolean());
   }
 }

Reply via email to