This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 303a02e0f SAMZA-2685: fall back to ClusterBasedJobCoordinator when 
job.coordinator.factory is the empty string for cluster-based deployments 
(#1594)
303a02e0f is described below

commit 303a02e0fdea278f5ad5917ca76c47deefbc13fe
Author: Cameron Lee <[email protected]>
AuthorDate: Tue Apr 5 16:42:31 2022 -0700

    SAMZA-2685: fall back to ClusterBasedJobCoordinator when 
job.coordinator.factory is the empty string for cluster-based deployments 
(#1594)
    
    API/usage changes: Using the empty string as the value for 
job.coordinator.factory when running in a cluster-based system (e.g. YARN) 
results in using ClusterBasedJobCoordinator instead of trying to load a class 
with the name of an empty string (which would fail).
---
 .../java/org/apache/samza/config/JobCoordinatorConfig.java    |  6 ++----
 .../org/apache/samza/config/TestJobCoordinatorConfig.java     | 11 ++++++++---
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
index eed704c3f..b2940c228 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -74,13 +74,11 @@ public class JobCoordinatorConfig extends MapConfig {
   }
 
   public String getJobCoordinatorFactoryClassName() {
-    return getOptionalJobCoordinatorFactoryClassName()
-        .filter(className -> !Strings.isNullOrEmpty(className))
-        .orElse(ZkJobCoordinatorFactory.class.getName());
+    return 
getOptionalJobCoordinatorFactoryClassName().orElse(ZkJobCoordinatorFactory.class.getName());
   }
 
   public Optional<String> getOptionalJobCoordinatorFactoryClassName() {
-    return Optional.ofNullable(get(JOB_COORDINATOR_FACTORY));
+    return Optional.ofNullable(get(JOB_COORDINATOR_FACTORY)).filter(className 
-> !Strings.isNullOrEmpty(className));
   }
 
   public String getJobRestartSignalFactory() {
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
 
b/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
index ffb80f934..88dfb1972 100644
--- 
a/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
+++ 
b/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.config;
 
+import java.util.Optional;
 import com.google.common.collect.ImmutableMap;
 import org.apache.samza.coordinator.lifecycle.NoOpJobRestartSignalFactory;
 import org.apache.samza.zk.ZkJobCoordinatorFactory;
@@ -25,7 +26,6 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 
 public class TestJobCoordinatorConfig {
@@ -47,9 +47,14 @@ public class TestJobCoordinatorConfig {
   public void getOptionalJobCoordinatorFactoryClassName() {
     assertFalse(new JobCoordinatorConfig(new 
MapConfig()).getOptionalJobCoordinatorFactoryClassName().isPresent());
 
-    JobCoordinatorConfig jobCoordinatorConfig = new JobCoordinatorConfig(new 
MapConfig(
+    JobCoordinatorConfig jobCoordinatorConfig =
+        new JobCoordinatorConfig(new 
MapConfig(ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "")));
+    
assertFalse(jobCoordinatorConfig.getOptionalJobCoordinatorFactoryClassName().isPresent());
+
+    jobCoordinatorConfig = new JobCoordinatorConfig(new MapConfig(
         ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.custom.MyJobCoordinatorFactory")));
-    
assertTrue(jobCoordinatorConfig.getOptionalJobCoordinatorFactoryClassName().isPresent());
+    assertEquals(Optional.of("org.custom.MyJobCoordinatorFactory"),
+        jobCoordinatorConfig.getOptionalJobCoordinatorFactoryClassName());
   }
 
   @Test

Reply via email to