This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 5e31503 disable transactional state restore in cases where standby or
async commit are enabled (#1340)
5e31503 is described below
commit 5e3150312a100011f84bda464ab338bc8301a2fb
Author: bkonold <[email protected]>
AuthorDate: Tue Apr 7 17:12:42 2020 -0700
disable transactional state restore in cases where standby or async commit
are enabled (#1340)
---
.../java/org/apache/samza/config/TaskConfig.java | 10 ++++++++-
.../org/apache/samza/config/TestTaskConfig.java | 25 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index 468d9c9..f5f09b2 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -320,7 +320,15 @@ public class TaskConfig extends MapConfig {
}
public boolean getTransactionalStateRestoreEnabled() {
- return getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED,
DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED);
+ JobConfig jobConfig = new JobConfig(this);
+
+ boolean standByEnabled = jobConfig.getStandbyTasksEnabled();
+ boolean asyncCommitEnabled = getAsyncCommit();
+
+ // TODO remove check of standby enabled when SAMZA-2353 is completed
+ // TODO remove check of async commit when SAMZA-2505 is completed
+ // transactional state restore must remain disabled until it is supported
in the above use cases
+ return !standByEnabled && !asyncCommitEnabled &&
getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED,
DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED);
}
public boolean getTransactionalStateRetainExistingState() {
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
index f55b5a9..f6df6f7 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -333,6 +334,30 @@ public class TestTaskConfig {
assertEquals(TaskConfig.DEFAULT_TASK_SHUTDOWN_MS, new TaskConfig(new
MapConfig()).getShutdownMs());
}
+ @Test
+ public void testGetTransactionalStateRestoreEnabled() {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
+
+ // standby and async commit both off; transactional state restore returned
as enabled
+ assertTrue(new TaskConfig(new
MapConfig(configMap)).getTransactionalStateRestoreEnabled());
+
+ // standby off and async commit on; transactional state restore returned
as disabled
+ configMap.put(TaskConfig.ASYNC_COMMIT, "true");
+ configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "1");
+ assertFalse(new TaskConfig(new
MapConfig(configMap)).getTransactionalStateRestoreEnabled());
+
+ // standby on and async commit off; transactional state restore returned
as disabled
+ configMap.put(TaskConfig.ASYNC_COMMIT, "false");
+ configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "2");
+ assertFalse(new TaskConfig(new
MapConfig(configMap)).getTransactionalStateRestoreEnabled());
+
+ // standby on and async commit on; transactional state restore returned as
disabled
+ configMap.put(TaskConfig.ASYNC_COMMIT, "true");
+ configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "2");
+ assertFalse(new TaskConfig(new
MapConfig(configMap)).getTransactionalStateRestoreEnabled());
+ }
+
/**
* Used for testing classloading a {@link CheckpointManagerFactory}.
*/