This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d8926ca  SAMZA-2264 : Offset.default for intermediate streams (#1095)
d8926ca is described below

commit d8926ca90c35c3137a332b1edeb7ced1f0961bbb
Author: rmatharu <[email protected]>
AuthorDate: Tue Jul 9 13:46:17 2019 -0700

    SAMZA-2264 : Offset.default for intermediate streams (#1095)
    
    * Offset.default for intermediate streams should default to upcoming for 
non-batch jobs
---
 .../java/org/apache/samza/execution/StreamEdge.java | 21 ++++++++++++++-------
 .../TestJobNodeConfigurationGenerator.java          |  1 -
 .../org/apache/samza/execution/TestStreamEdge.java  |  1 -
 3 files changed, 14 insertions(+), 9 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java 
b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index c318118..80d1bad 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
@@ -120,16 +121,22 @@ public class StreamEdge {
   Config generateConfig() {
     Map<String, String> streamConfig = new HashMap<>();
     StreamSpec spec = getStreamSpec();
-    streamConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
spec.getId()), spec.getSystemName());
-    streamConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
spec.getId()), spec.getPhysicalName());
+    String streamId = spec.getId();
+    streamConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
streamId), spec.getSystemName());
+    streamConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
streamId), spec.getPhysicalName());
     if (isIntermediate()) {
-      
streamConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), 
spec.getId()), "true");
-      
streamConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(),
 spec.getId()), "true");
-      
streamConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(),
 spec.getId()), "oldest");
-      streamConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), 
spec.getId()), String.valueOf(Integer.MAX_VALUE));
+      
streamConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), 
streamId), "true");
+      
streamConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(),
 streamId), "true");
+
+      // Setting offset.default to oldest only if the job is running in batch 
mode
+      if (ApplicationConfig.ApplicationMode.BATCH.equals(new 
ApplicationConfig(config).getAppMode())) {
+        
streamConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(),
 streamId), "oldest");
+      }
+
+      streamConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), 
streamId), String.valueOf(Integer.MAX_VALUE));
     }
     spec.getConfig().forEach((property, value) -> {
-        streamConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(), 
spec.getId()) + property, value);
+        streamConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(), 
streamId) + property, value);
       });
 
     return new MapConfig(streamConfig);
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
index 6f1b9c0..61a0f68 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -329,7 +329,6 @@ public class TestJobNodeConfigurationGenerator extends 
ExecutionPlannerTestBase
     assertEquals("true", 
intStreamConfig.get("samza.delete.committed.messages"));
     assertEquals(physicalName, intStreamConfig.get("samza.physical.name"));
     assertEquals("true", intStreamConfig.get("samza.intermediate"));
-    assertEquals("oldest", intStreamConfig.get("samza.offset.default"));
   }
 
   private void validateStreamConfigures(Config config, Map<String, Serde> 
deserializedSerdes) {
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
index 408f638..94275dd 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
@@ -84,7 +84,6 @@ public class TestStreamEdge {
     config = edge.generateConfig();
     streamConfig = new StreamConfig(config);
     assertEquals(streamConfig.getIsIntermediateStream(spec.getId()), true);
-    
assertEquals(streamConfig.getDefaultStreamOffset(spec.toSystemStream()).get(), 
"oldest");
     assertEquals(streamConfig.getPriority(spec.toSystemStream()), 
Integer.MAX_VALUE);
   }
 }

Reply via email to