This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 303a02e0f SAMZA-2685: fall back to ClusterBasedJobCoordinator when
job.coordinator.factory is the empty string for cluster-based deployments
(#1594)
303a02e0f is described below
commit 303a02e0fdea278f5ad5917ca76c47deefbc13fe
Author: Cameron Lee <[email protected]>
AuthorDate: Tue Apr 5 16:42:31 2022 -0700
SAMZA-2685: fall back to ClusterBasedJobCoordinator when
job.coordinator.factory is the empty string for cluster-based deployments
(#1594)
API/usage changes: Using the empty string as the value for
job.coordinator.factory when running in a cluster-based system (e.g. YARN)
results in using ClusterBasedJobCoordinator instead of trying to load a class
with the name of an empty string (which would fail).
---
.../java/org/apache/samza/config/JobCoordinatorConfig.java | 6 ++----
.../org/apache/samza/config/TestJobCoordinatorConfig.java | 11 ++++++++---
2 files changed, 10 insertions(+), 7 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
index eed704c3f..b2940c228 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -74,13 +74,11 @@ public class JobCoordinatorConfig extends MapConfig {
}
public String getJobCoordinatorFactoryClassName() {
- return getOptionalJobCoordinatorFactoryClassName()
- .filter(className -> !Strings.isNullOrEmpty(className))
- .orElse(ZkJobCoordinatorFactory.class.getName());
+ return
getOptionalJobCoordinatorFactoryClassName().orElse(ZkJobCoordinatorFactory.class.getName());
}
public Optional<String> getOptionalJobCoordinatorFactoryClassName() {
- return Optional.ofNullable(get(JOB_COORDINATOR_FACTORY));
+ return Optional.ofNullable(get(JOB_COORDINATOR_FACTORY)).filter(className
-> !Strings.isNullOrEmpty(className));
}
public String getJobRestartSignalFactory() {
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
index ffb80f934..88dfb1972 100644
---
a/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
+++
b/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.config;
+import java.util.Optional;
import com.google.common.collect.ImmutableMap;
import org.apache.samza.coordinator.lifecycle.NoOpJobRestartSignalFactory;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
@@ -25,7 +26,6 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
public class TestJobCoordinatorConfig {
@@ -47,9 +47,14 @@ public class TestJobCoordinatorConfig {
public void getOptionalJobCoordinatorFactoryClassName() {
assertFalse(new JobCoordinatorConfig(new
MapConfig()).getOptionalJobCoordinatorFactoryClassName().isPresent());
- JobCoordinatorConfig jobCoordinatorConfig = new JobCoordinatorConfig(new
MapConfig(
+ JobCoordinatorConfig jobCoordinatorConfig =
+ new JobCoordinatorConfig(new
MapConfig(ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "")));
+
assertFalse(jobCoordinatorConfig.getOptionalJobCoordinatorFactoryClassName().isPresent());
+
+ jobCoordinatorConfig = new JobCoordinatorConfig(new MapConfig(
ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
"org.custom.MyJobCoordinatorFactory")));
-
assertTrue(jobCoordinatorConfig.getOptionalJobCoordinatorFactoryClassName().isPresent());
+ assertEquals(Optional.of("org.custom.MyJobCoordinatorFactory"),
+ jobCoordinatorConfig.getOptionalJobCoordinatorFactoryClassName());
}
@Test