This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e31503  disable transactional state restore in cases where standby or 
async commit are enabled (#1340)
5e31503 is described below

commit 5e3150312a100011f84bda464ab338bc8301a2fb
Author: bkonold <[email protected]>
AuthorDate: Tue Apr 7 17:12:42 2020 -0700

    disable transactional state restore in cases where standby or async commit 
are enabled (#1340)
---
 .../java/org/apache/samza/config/TaskConfig.java   | 10 ++++++++-
 .../org/apache/samza/config/TestTaskConfig.java    | 25 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index 468d9c9..f5f09b2 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -320,7 +320,15 @@ public class TaskConfig extends MapConfig {
   }
 
   public boolean getTransactionalStateRestoreEnabled() {
-    return getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED, 
DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED);
+    JobConfig jobConfig = new JobConfig(this);
+
+    boolean standByEnabled = jobConfig.getStandbyTasksEnabled();
+    boolean asyncCommitEnabled = getAsyncCommit();
+
+    // TODO remove check of standby enabled when SAMZA-2353 is completed
+    // TODO remove check of async commit when SAMZA-2505 is completed
+    // transactional state restore must remain disabled until it is supported 
in the above use cases
+    return !standByEnabled && !asyncCommitEnabled && 
getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED, 
DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED);
   }
 
   public boolean getTransactionalStateRetainExistingState() {
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
index f55b5a9..f6df6f7 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.*;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
@@ -333,6 +334,30 @@ public class TestTaskConfig {
     assertEquals(TaskConfig.DEFAULT_TASK_SHUTDOWN_MS, new TaskConfig(new 
MapConfig()).getShutdownMs());
   }
 
+  @Test
+  public void testGetTransactionalStateRestoreEnabled() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
+
+    // standby and async commit both off; transactional state restore returned 
as enabled
+    assertTrue(new TaskConfig(new 
MapConfig(configMap)).getTransactionalStateRestoreEnabled());
+
+    // standby off and async commit on; transactional state restore returned 
as disabled
+    configMap.put(TaskConfig.ASYNC_COMMIT, "true");
+    configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "1");
+    assertFalse(new TaskConfig(new 
MapConfig(configMap)).getTransactionalStateRestoreEnabled());
+
+    // standby on and async commit off; transactional state restore returned 
as disabled
+    configMap.put(TaskConfig.ASYNC_COMMIT, "false");
+    configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "2");
+    assertFalse(new TaskConfig(new 
MapConfig(configMap)).getTransactionalStateRestoreEnabled());
+
+    // standby on and async commit on; transactional state restore returned as 
disabled
+    configMap.put(TaskConfig.ASYNC_COMMIT, "true");
+    configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "2");
+    assertFalse(new TaskConfig(new 
MapConfig(configMap)).getTransactionalStateRestoreEnabled());
+  }
+
   /**
    * Used for testing classloading a {@link CheckpointManagerFactory}.
    */

Reply via email to