Repository: samza Updated Branches: refs/heads/master a1e03af0d -> 20b427cc8
SAMZA-1214: Bug - system-scoped stream default configs may not be honored * Re-introduced deprecated system-stream configs into config table * Fixed position of task.consumer.batch.size in config table * Moved system-scoped defaults from StreamConfig to SystemConfig Author: Jacob Maes <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #150 from jmakes/samza-1214 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/20b427cc Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/20b427cc Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/20b427cc Branch: refs/heads/master Commit: 20b427cc88c5f8d2a6b2e0283ce18b4b5f01cc17 Parents: a1e03af Author: Jacob Maes <[email protected]> Authored: Mon May 1 13:52:13 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Mon May 1 13:52:13 2017 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 93 ++++++++++++++++++-- .../apache/samza/config/JavaStorageConfig.java | 2 + .../apache/samza/config/JavaSystemConfig.java | 3 + .../org/apache/samza/config/StreamConfig.scala | 7 +- .../org/apache/samza/config/SystemConfig.scala | 16 +++- .../apache/samza/config/TestStreamConfig.java | 3 +- 6 files changed, 105 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 0fc30c5..afa42f5 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -689,6 +689,17 @@ </tr> <tr> + <td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td> + <td class="default">1</td> + <td class="description"> + If set to a positive integer, the task will try to consume + <a href="../container/streams.html#batching">batches</a> with the given number of messages + from each input stream, rather than consuming round-robin from all the input streams on + each individual message. Setting this property can improve performance in some cases. + </td> + </tr> + + <tr> <th colspan="3" class="section" id="systems">Systems</th> </tr> @@ -785,13 +796,77 @@ </tr> <tr> - <td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td> - <td class="default">1</td> + <td class="property" id="systems-samza-key-serde-legacy">systems.<span class="system">system-name</span>.<br>samza.key.serde</td> + <td class="default" rowspan="2"></td> <td class="description"> - If set to a positive integer, the task will try to consume - <a href="../container/streams.html#batching">batches</a> with the given number of messages - from each input stream, rather than consuming round-robin from all the input streams on - each individual message. Setting this property can improve performance in some cases. + This is deprecated in favor of <a href="#systems-samza-key-serde" class="property"> + systems.<span class="system">system-name</span>.default.stream.samza.key.serde</a>. + </td> + </tr> + <tr> + <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.key.serde</td> + <td class="description"> + This is deprecated in favor of <a href="#streams-samza-key-serde" class="property"> + streams.<span class="stream">stream-id</span>.samza.key.serde</a>. + </td> + </tr> + + <tr> + <td class="property" id="systems-samza-msg-serde-legacy">systems.<span class="system">system-name</span>.<br>samza.msg.serde</td> + <td class="default" rowspan="2"></td> + <td class="description"> + This is deprecated in favor of <a href="#systems-samza-msg-serde" class="property"> + systems.<span class="system">system-name</span>.default.stream.samza.msg.serde</a>. + </td> + </tr> + <tr> + <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.msg.serde</td> + <td class="description"> + This is deprecated in favor of <a href="#streams-samza-msg-serde" class="property"> + streams.<span class="stream">stream-id</span>.samza.msg.serde</a>. + </td> + </tr> + + <tr> + <td class="property" id="systems-samza-offset-default-legacy">systems.<span class="system">system-name</span>.<br>samza.offset.default</td> + <td class="default" rowspan="2">upcoming</td> + <td class="description"> + This is deprecated in favor of <a href="#systems-samza-offset-default" class="property"> + systems.<span class="system">system-name</span>.default.stream.samza.offset.default</a>. + </td> + </tr> + <tr> + <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.offset.default</td> + <td class="description"> + This is deprecated in favor of <a href="#streams-samza-offset-default" class="property"> + streams.<span class="stream">stream-id</span>.samza.offset.default</a>. + </td> + </tr> + + <tr> + <td class="property" id="systems-streams-samza-reset-offset-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.reset.offset</td> + <td>false</td> + <td> + This is deprecated in favor of <a href="#streams-samza-reset-offset" class="property"> + streams.<span class="stream">stream-id</span>.samza.reset.offset</a>. + </td> + </tr> + + <tr> + <td class="property" id="systems-streams-samza-priority-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.priority</td> + <td>-1</td> + <td> + This is deprecated in favor of <a href="#streams-samza-priority" class="property"> + streams.<span class="stream">stream-id</span>.samza.priority</a>. + </td> + </tr> + + <tr> + <td class="property" id="systems-streams-samza-bootstrap-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.bootstrap</td> + <td>false</td> + <td> + This is deprecated in favor of <a href="#streams-samza-bootstrap" class="property"> + streams.<span class="stream">stream-id</span>.samza.bootstrap</a>. </td> </tr> @@ -875,7 +950,7 @@ </tr> <tr> - <td class="property" id="streams-streams-samza-reset-offset">streams.<span class="stream">stream-id</span>.<br>samza.reset.offset</td> + <td class="property" id="streams-samza-reset-offset">streams.<span class="stream">stream-id</span>.<br>samza.reset.offset</td> <td class="default">false</td> <td class="description"> If set to <code>true</code>, when a Samza container starts up, it ignores any @@ -888,7 +963,7 @@ </tr> <tr> - <td class="property" id="streams-streams-samza-priority">streams.<span class="stream">stream-id</span>.<br>samza.priority</td> + <td class="property" id="streams-samza-priority">streams.<span class="stream">stream-id</span>.<br>samza.priority</td> <td class="default">-1</td> <td class="description"> If one or more streams have a priority set (any positive integer), they will be processed @@ -901,7 +976,7 @@ </tr> <tr> - <td class="property" id="streams-streams-samza-bootstrap">streams.<span class="stream">stream-id</span>.<br>samza.bootstrap</td> + <td class="property" id="streams-samza-bootstrap">streams.<span class="stream">stream-id</span>.<br>samza.bootstrap</td> <td class="default">false</td> <td class="description"> If set to <code>true</code>, this stream will be processed as a http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java index a1f0ec0..c26601c 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java @@ -98,6 +98,8 @@ public class JavaStorageConfig extends MapConfig { * stores.storeName.changelog=streamName * * If the former syntax is used, that system name will still be honored. For the latter syntax, this method is used. + * + * @return the name of the system to use by default for all changelogs, if defined. */ public String getChangelogSystem() { return get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM(), null)); http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java index 6408438..350f20c 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java @@ -103,6 +103,9 @@ public class JavaSystemConfig extends MapConfig { /** * Gets the system-wide defaults for streams. + * + * @param systemName the name of the system for which the defaults will be returned. + * @return a subset of the config with the system prefix removed. */ public Config getDefaultStreamProperties(String systemName) { return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX, systemName), true); http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala index 43cc9a9..c490642 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala @@ -81,14 +81,11 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { * Returns a list of all SystemStreams that have a serde defined from the config file. */ def getSerdeStreams(systemName: String) = { - val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(systemName) - val hasSystemDefaultSerde = defaultStreamProperties.containsKey(StreamConfig.MSG_SERDE) || defaultStreamProperties.containsKey(StreamConfig.KEY_SERDE) - val subConf = config.subset("systems.%s.streams." format systemName, true) val legacySystemStreams = subConf .asScala .keys - .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde) + .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE)) .map(k => { val streamName = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ ) new SystemStream(systemName, streamName) @@ -97,7 +94,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { val systemStreams = subset(StreamConfig.STREAMS_PREFIX) .asScala .keys - .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde) + .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE)) .map(k => k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )) .filter(streamId => systemName.equals(getSystem(streamId))) .map(streamId => streamIdToSystemStream(streamId)).toSet http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala index 69fc383..804955c 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala @@ -29,8 +29,6 @@ object SystemConfig { // system config constants val SYSTEM_PREFIX = "systems.%s." val SYSTEM_FACTORY = "systems.%s.samza.factory" - val KEY_SERDE = "systems.%s.samza.key.serde" - val MSG_SERDE = "systems.%s.samza.msg.serde" val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default" implicit def Config2System(config: Config) = new SystemConfig(config) @@ -39,9 +37,9 @@ object SystemConfig { class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getSystemFactory(name: String) = getOption(SystemConfig.SYSTEM_FACTORY format name) - def getSystemKeySerde(name: String) = getNonEmptyOption(SystemConfig.KEY_SERDE format name) + def getSystemKeySerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.KEY_SERDE) - def getSystemMsgSerde(name: String) = getNonEmptyOption(SystemConfig.MSG_SERDE format name) + def getSystemMsgSerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.MSG_SERDE) def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName)) @@ -54,4 +52,14 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging { // find all .samza.factory keys, and strip the suffix subConf.asScala.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", "")) } + + private def getSystemDefaultStreamProperty(name: String, property: String) = { + val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(name) + val streamDefault = defaultStreamProperties.get(property) + if (!(streamDefault == null || streamDefault.isEmpty)) { + Option(streamDefault) + } else { + getNonEmptyOption((SystemConfig.SYSTEM_PREFIX + property) format name) + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/20b427cc/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java index 4580cc4..f6a617c 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java @@ -216,7 +216,8 @@ public class TestStreamConfig { // Ensure that we can set legacy system properties via the new system wide default assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get()); - assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size()); + assertEquals(0, config.getSerdeStreams(STREAM1_SYSTEM).size()); + assertEquals("value1", new SystemConfig(config).getSystemKeySerde(STREAM1_SYSTEM).get()); assertEquals("newest", config.getDefaultStreamOffset(SYSTEM_STREAM_1).get()); // Property set via systems.x.default.stream.* only
