This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d8926ca SAMZA-2264 : Offset.default for intermediate streams (#1095)
d8926ca is described below
commit d8926ca90c35c3137a332b1edeb7ced1f0961bbb
Author: rmatharu <[email protected]>
AuthorDate: Tue Jul 9 13:46:17 2019 -0700
SAMZA-2264 : Offset.default for intermediate streams (#1095)
* Offset.default for intermediate streams should default to upcoming for
non-batch jobs
---
.../java/org/apache/samza/execution/StreamEdge.java | 21 ++++++++++++++-------
.../TestJobNodeConfigurationGenerator.java | 1 -
.../org/apache/samza/execution/TestStreamEdge.java | 1 -
3 files changed, 14 insertions(+), 9 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index c318118..80d1bad 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
@@ -120,16 +121,22 @@ public class StreamEdge {
Config generateConfig() {
Map<String, String> streamConfig = new HashMap<>();
StreamSpec spec = getStreamSpec();
- streamConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(),
spec.getId()), spec.getSystemName());
- streamConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(),
spec.getId()), spec.getPhysicalName());
+ String streamId = spec.getId();
+ streamConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(),
streamId), spec.getSystemName());
+ streamConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(),
streamId), spec.getPhysicalName());
if (isIntermediate()) {
-
streamConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(),
spec.getId()), "true");
-
streamConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(),
spec.getId()), "true");
-
streamConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(),
spec.getId()), "oldest");
- streamConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(),
spec.getId()), String.valueOf(Integer.MAX_VALUE));
+
streamConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(),
streamId), "true");
+
streamConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(),
streamId), "true");
+
+ // Setting offset.default to oldest only if the job is running in batch
mode
+ if (ApplicationConfig.ApplicationMode.BATCH.equals(new
ApplicationConfig(config).getAppMode())) {
+
streamConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(),
streamId), "oldest");
+ }
+
+ streamConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(),
streamId), String.valueOf(Integer.MAX_VALUE));
}
spec.getConfig().forEach((property, value) -> {
- streamConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(),
spec.getId()) + property, value);
+ streamConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(),
streamId) + property, value);
});
return new MapConfig(streamConfig);
diff --git
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
index 6f1b9c0..61a0f68 100644
---
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
+++
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -329,7 +329,6 @@ public class TestJobNodeConfigurationGenerator extends
ExecutionPlannerTestBase
assertEquals("true",
intStreamConfig.get("samza.delete.committed.messages"));
assertEquals(physicalName, intStreamConfig.get("samza.physical.name"));
assertEquals("true", intStreamConfig.get("samza.intermediate"));
- assertEquals("oldest", intStreamConfig.get("samza.offset.default"));
}
private void validateStreamConfigures(Config config, Map<String, Serde>
deserializedSerdes) {
diff --git
a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
index 408f638..94275dd 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
@@ -84,7 +84,6 @@ public class TestStreamEdge {
config = edge.generateConfig();
streamConfig = new StreamConfig(config);
assertEquals(streamConfig.getIsIntermediateStream(spec.getId()), true);
-
assertEquals(streamConfig.getDefaultStreamOffset(spec.toSystemStream()).get(),
"oldest");
assertEquals(streamConfig.getPriority(spec.toSystemStream()),
Integer.MAX_VALUE);
}
}