This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 10915fb  SAMZA-2518: Update JobCoordinatorLaunchUtil to fetch launch 
config from metadata store. (#1354)
10915fb is described below

commit 10915fb3cda40ae3733d82f1290bd854af113af6
Author: Ke Wu <[email protected]>
AuthorDate: Mon May 4 17:04:21 2020 -0700

    SAMZA-2518: Update JobCoordinatorLaunchUtil to fetch launch config from 
metadata store. (#1354)
    
    * SAMZA-2518: Update JobCoordinatorLaunchUtil to fetch launch config from 
metadata store.
    
    Design:
    
https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner
    
    Changes:
    1. Add readLaunchConfigFromCoordinatorStream() in CoordinatorStreamUtil
    2. Add an extra step in JobCoordinatorLaunchUtil to invoke 
readLaunchConfigFromCoordinatorStream()
    
    API Changes:
    None
    
    Upgrade Instructions:
    None
    
    Usage Instructions:
    None
    
    Tests:
    Unit Tests
    
    * Update to address comments
    
    * Simplify implementation
    
    Co-authored-by: Ke Wu <[email protected]>
---
 .../clustermanager/JobCoordinatorLaunchUtil.java   | 12 +++++---
 .../java/org/apache/samza/config/JobConfig.java    |  2 +-
 .../apache/samza/util/CoordinatorStreamUtil.scala  | 32 +++++++++++++++++-----
 .../TestJobCoordinatorLaunchUtil.java              | 12 +++++---
 .../samza/util/TestCoordinatorStreamUtil.scala     | 27 +++++++++++++++++-
 5 files changed, 68 insertions(+), 17 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index fc1d34e..6915614 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -26,6 +26,7 @@ import 
org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.metadatastore.MetadataStore;
@@ -57,14 +58,17 @@ public class JobCoordinatorLaunchUtil {
       throw new SamzaException("Only support single remote job is supported.");
     }
 
-    Config finalConfig = jobConfigs.get(0);
+    Config fullConfig = jobConfigs.get(0);
+    MetricsRegistryMap metrics = new MetricsRegistryMap();
+    MetadataStore
+        metadataStore = new 
CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(fullConfig),
 metrics);
+    // Reads extra launch config from metadata store.
+    Config launchConfig = 
CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(fullConfig, 
metadataStore);
+    Config finalConfig = new MapConfig(launchConfig, fullConfig);
 
     // This needs to be consistent with RemoteApplicationRunner#run where 
JobRunner#submit to be called instead of JobRunner#run
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
-    MetricsRegistryMap metrics = new MetricsRegistryMap();
-    MetadataStore
-        metadataStore = new 
CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(finalConfig),
 metrics);
     // MetadataStore will be closed in ClusterBasedJobCoordinator#onShutDown
     // initialization of MetadataStore can be moved to 
ClusterBasedJobCoordinator after we clean up
     // ClusterBasedJobCoordinator#createFromMetadataStore
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 93b4b60..dc03644 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -351,7 +351,7 @@ public class JobConfig extends MapConfig {
    * @param configParam the config param to determine
    * @return true if the config is related to autosizing, false otherwise
    */
-  public boolean isAutosizingConfig(String configParam) {
+  public static boolean isAutosizingConfig(String configParam) {
     return configParam.startsWith(JOB_AUTOSIZING_CONFIG_PREFIX);
   }
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 518639f..bf6aeb3 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -24,10 +24,11 @@ import java.util
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config._
-import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
NamespaceAwareCoordinatorStreamStore}
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
 import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, 
CoordinatorStreamSystemProducer, CoordinatorStreamValueSerde}
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
 import org.apache.samza.job.JobRunner
+import org.apache.samza.metadatastore.MetadataStore
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemAdmins, 
SystemFactory, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
@@ -44,7 +45,7 @@ object CoordinatorStreamUtil extends Logging {
     val buildConfigFactory = jobConfig.getCoordinatorStreamFactory
     val coordinatorSystemConfig = 
Class.forName(buildConfigFactory).newInstance().asInstanceOf[CoordinatorStreamConfigFactory].buildCoordinatorStreamConfig(config)
 
-    new MapConfig(coordinatorSystemConfig);
+    new MapConfig(coordinatorSystemConfig)
   }
 
   /**
@@ -111,12 +112,29 @@ object CoordinatorStreamUtil extends Logging {
   }
 
   /**
+   * Reads and returns launch config persisted in coordinator stream. Only 
job.auto sizing configs are currently supported.
+   * @param config full job config
+   * @param metadataStore an instance of the instantiated MetadataStore
+   * @return empty config if auto sizing is disabled, otherwise auto sizing 
related configs.
+   */
+  def readLaunchConfigFromCoordinatorStream(config: Config, metadataStore: 
MetadataStore): Config = {
+    if (!config.getBoolean(JobConfig.JOB_AUTOSIZING_ENABLED, false)) {
+      new MapConfig()
+    } else {
+      val config = readConfigFromCoordinatorStream(metadataStore)
+      val launchConfig = config.asScala.filterKeys(key => 
JobConfig.isAutosizingConfig(key)).asJava
+
+      new MapConfig(launchConfig)
+    }
+  }
+
+  /**
     * Reads and returns the complete configuration stored in the coordinator 
stream.
-    * @param coordinatorStreamStore an instance of the instantiated {@link 
CoordinatorStreamStore}.
+    * @param metadataStore an instance of the instantiated {@link 
CoordinatorStreamStore}.
     * @return the configuration read from the coordinator stream.
     */
-  def readConfigFromCoordinatorStream(coordinatorStreamStore: 
CoordinatorStreamStore): Config = {
-    val namespaceAwareCoordinatorStreamStore: 
NamespaceAwareCoordinatorStreamStore = new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE)
+  def readConfigFromCoordinatorStream(metadataStore: MetadataStore): Config = {
+    val namespaceAwareCoordinatorStreamStore: 
NamespaceAwareCoordinatorStreamStore = new 
NamespaceAwareCoordinatorStreamStore(metadataStore, SetConfig.TYPE)
     val configFromCoordinatorStream: util.Map[String, Array[Byte]] = 
namespaceAwareCoordinatorStreamStore.all
     val configMap: util.Map[String, String] = new util.HashMap[String, String]
     for ((key: String, valueAsBytes: Array[Byte]) <- 
configFromCoordinatorStream.asScala) {
@@ -136,7 +154,7 @@ object CoordinatorStreamUtil extends Logging {
   }
 
   def writeConfigToCoordinatorStream(config: Config, resetJobConfig: Boolean = 
true) {
-    debug("config: %s" format (config))
+    debug("config: %s" format config)
     val coordinatorSystemConsumer = new 
CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
     val coordinatorSystemProducer = new 
CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
     val systemAdmins = new SystemAdmins(config)
@@ -168,7 +186,7 @@ object CoordinatorStreamUtil extends Logging {
       val jobConfig = new JobConfig(config)
       if (jobConfig.getAutosizingEnabled) {
         // If autosizing is enabled, we retain auto-sizing related configs
-        keysToRemove = keysToRemove.filter(configKey => 
!jobConfig.isAutosizingConfig(configKey))
+        keysToRemove = keysToRemove.filter(configKey => 
!JobConfig.isAutosizingConfig(configKey))
       }
 
       info("Deleting old configs that are no longer defined: 
%s".format(keysToRemove))
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
index 827e312..4bf0aaa 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.application.MockStreamApplication;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
@@ -57,8 +58,10 @@ public class TestJobCoordinatorLaunchUtil {
     config.put(JobConfig.CONFIG_LOADER_FACTORY, 
PropertiesConfigLoaderFactory.class.getName());
     config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + 
"path",
         getClass().getResource("/test.properties").getPath());
-    JobConfig originalConfig = new JobConfig(ConfigUtil.loadConfig(new 
MapConfig(config)));
-    JobConfig fullJobConfig = new JobConfig(new MapConfig(originalConfig, 
Collections.singletonMap("isAfterPlanning", "true")));
+    Config originalConfig = new JobConfig(ConfigUtil.loadConfig(new 
MapConfig(config)));
+    Config fullConfig = new MapConfig(originalConfig, 
Collections.singletonMap("isAfterPlanning", "true"));
+    Config autoSizingConfig = new 
MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT, 
"10"));
+    Config finalConfig = new MapConfig(autoSizingConfig, fullConfig);
 
     RemoteJobPlanner mockJobPlanner = mock(RemoteJobPlanner.class);
     CoordinatorStreamStore mockCoordinatorStreamStore = 
mock(CoordinatorStreamStore.class);
@@ -66,14 +69,15 @@ public class TestJobCoordinatorLaunchUtil {
 
     PowerMockito.mockStatic(CoordinatorStreamUtil.class);
     PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, 
"buildCoordinatorStreamConfig", any());
+    PowerMockito.doReturn(autoSizingConfig).when(CoordinatorStreamUtil.class, 
"readLaunchConfigFromCoordinatorStream", any(), any());
     
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
     
PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner);
     
PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mockJC);
-    
when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullJobConfig));
+    
when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(new 
JobConfig(fullConfig)));
 
     JobCoordinatorLaunchUtil.run(new MockStreamApplication(), originalConfig);
 
-    
verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class),
 eq(mockCoordinatorStreamStore), eq(fullJobConfig));
+    
verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class),
 eq(mockCoordinatorStreamStore), eq(finalConfig));
     verify(mockJC, times(1)).run();
   }
 }
diff --git 
a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
 
b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
index f8a9f40..f520c6d 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
@@ -19,6 +19,7 @@
 package org.apache.samza.util
 
 import java.util
+import java.util.Collections
 
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde
@@ -27,7 +28,8 @@ import org.apache.samza.system.{StreamSpec, SystemAdmin, 
SystemStream}
 import org.junit.{Assert, Test}
 import org.mockito.Matchers.any
 import org.mockito.Mockito
-import org.apache.samza.config.MapConfig
+import org.apache.samza.config.{JobConfig, MapConfig}
+import org.apache.samza.metadatastore.MetadataStore
 
 class TestCoordinatorStreamUtil {
 
@@ -99,4 +101,27 @@ class TestCoordinatorStreamUtil {
 
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(configMap)
   }
+
+  @Test
+  def testReadLaunchConfigFromCoordinatorStream() {
+    // Empty config when auto sizing is disabled.
+    Assert.assertEquals(new MapConfig(),  
CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(new MapConfig(), 
null))
+
+    val valueSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE)
+    val config = new 
MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_ENABLED, "true"))
+    val expected = new 
MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT, 
"20"))
+    val mockMetadataStore = Mockito.mock(classOf[MetadataStore])
+    val configMap = new util.HashMap[String, Array[Byte]]() {
+      
put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE,
+        JobConfig.JOB_ID),
+        valueSerde.toBytes("321"))
+      
put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE,
+        JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT),
+        valueSerde.toBytes("20"))
+    }
+    Mockito.when(mockMetadataStore.all()).thenReturn(configMap)
+
+    // Verify the launch config is expected
+    Assert.assertEquals(expected, 
CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(config, 
mockMetadataStore))
+  }
 }

Reply via email to