This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0080433 Setting transactional state restore to default enabled (#1346)
0080433 is described below
commit 0080433d0a97a4de9173732dec12812a23b350c9
Author: bkonold <[email protected]>
AuthorDate: Wed May 6 18:43:31 2020 -0700
Setting transactional state restore to default enabled (#1346)
---
samza-core/src/main/java/org/apache/samza/config/TaskConfig.java | 2 +-
.../src/test/java/org/apache/samza/system/MockSystemFactory.java | 4 ++--
.../src/test/scala/org/apache/samza/config/TestKafkaConfig.scala | 2 +-
3 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index f5f09b2..461b647 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -111,7 +111,7 @@ public class TaskConfig extends MapConfig {
public static final String TRANSACTIONAL_STATE_CHECKPOINT_ENABLED =
"task.transactional.state.checkpoint.enabled";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED
= true;
public static final String TRANSACTIONAL_STATE_RESTORE_ENABLED =
"task.transactional.state.restore.enabled";
- private static final boolean DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED =
false;
+ private static final boolean DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED =
true;
public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE =
"task.transactional.state.retain.existing.state";
private static final boolean
DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;
diff --git
a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
index e3030ab..9c8dc58 100644
--- a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
@@ -154,14 +154,14 @@ public class MockSystemFactory implements SystemFactory {
@Override
public Integer offsetComparator(String offset1, String offset2) {
- return null;
+ return offset1.compareTo(offset2);
}
@Override
public Map<String, SystemStreamMetadata>
getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
return getSystemStreamMetadata(streamNames);
}
-
+
@Override
public boolean createStream(StreamSpec streamSpec) {
return true;
diff --git
a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 64b476b..f62e8a3 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -169,7 +169,7 @@ class TestKafkaConfig {
assertEquals("otherstream", storeToChangelog.getOrDefault("test3", ""))
assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"))
assertNull(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"))
-
assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
+
assertNotNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
props.setProperty("systems." + SYSTEM_NAME + ".samza.factory",
"org.apache.samza.system.kafka.SomeOtherFactory")
val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores()