Repository: samza Updated Branches: refs/heads/master b938df623 -> daed7a9a3
SAMZA-1167: New streamId-specific configs do not override equivalent system-scoped configs Author: Jacob Maes <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #101 from jmakes/samza-1167 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/daed7a9a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/daed7a9a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/daed7a9a Branch: refs/heads/master Commit: daed7a9a31f30dd5d993407b33b5dd30f4ef7dda Parents: b938df6 Author: Jacob Maes <[email protected]> Authored: Wed Mar 29 13:57:26 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Wed Mar 29 13:57:26 2017 -0700 ---------------------------------------------------------------------- .../samza/config/DefaultChooserConfig.java | 10 +- .../samza/checkpoint/CheckpointTool.scala | 2 +- .../org/apache/samza/config/StreamConfig.scala | 146 +++++++++---- .../apache/samza/config/TestStreamConfig.java | 215 +++++++++++++++++++ .../runtime/TestAbstractApplicationRunner.java | 1 + 5 files changed, 332 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/daed7a9a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java index 237c6f9..ff344c9 100644 --- a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java @@ -30,15 +30,15 @@ import org.apache.samza.system.SystemStream; * A convenience class for fetching configs related to the {@link org.apache.samza.system.chooser.DefaultChooser} */ public class DefaultChooserConfig extends MapConfig { - public static final String BOOTSTRAP = StreamConfig.STREAM_PREFIX() + "samza.bootstrap"; - public static final String PRIORITY = StreamConfig.STREAM_PREFIX() + "samza.priority"; - public static final String BATCH_SIZE = "task.consumer.batch.size"; + private static final String BATCH_SIZE = "task.consumer.batch.size"; private final TaskConfigJava taskConfigJava; + private final StreamConfig streamConfig; public DefaultChooserConfig(Config config) { super(config); taskConfigJava = new TaskConfigJava(config); + streamConfig = new StreamConfig(config); } /** @@ -55,7 +55,7 @@ public class DefaultChooserConfig extends MapConfig { Set<SystemStream> bootstrapInputs = new HashSet<>(); Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams(); for (SystemStream systemStream : allInputs) { - if (getBoolean(String.format(BOOTSTRAP, systemStream.getSystem(), systemStream.getStream()), false)) { + if (streamConfig.getBootstrapEnabled(systemStream)) { bootstrapInputs.add(systemStream); } } @@ -75,7 +75,7 @@ public class DefaultChooserConfig extends MapConfig { Map<SystemStream, Integer> priorityStreams = new HashMap<>(); for (SystemStream systemStream : allInputs) { - int priority = getInt(String.format(PRIORITY, systemStream.getSystem(), systemStream.getStream()), -1); + int priority = streamConfig.getPriority(systemStream); if (priority >= 0) { priorityStreams.put(systemStream, priority); } http://git-wip-us.apache.org/repos/asf/samza/blob/daed7a9a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala index 449b402..9f4aa54 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala @@ -66,7 +66,7 @@ import scala.collection.immutable.HashMap */ object CheckpointTool { /** Format in which SystemStreamPartition is represented in a properties file */ - val SSP_PATTERN = "tasknames.%s." + StreamConfig.STREAM_PREFIX + "partitions.%d" + val SSP_PATTERN = "tasknames.%s.systems.%s.streams.%s.partitions.%d" val SSP_REGEX = Pattern.compile("tasknames\\.(.+)\\.systems\\.(.+)\\.streams\\.(.+)\\.partitions\\.([0-9]+)") type TaskNameToCheckpointMap = Map[TaskName, Map[SystemStreamPartition, String]] http://git-wip-us.apache.org/repos/asf/samza/blob/daed7a9a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala index 4cce32f..4f2c688 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala @@ -34,25 +34,27 @@ object StreamConfig { val KEY_SERDE = SAMZA_PROPERTY + "key.serde" val CONSUMER_RESET_OFFSET = SAMZA_PROPERTY + "reset.offset" val CONSUMER_OFFSET_DEFAULT = SAMZA_PROPERTY + "offset.default" + val BOOTSTRAP = SAMZA_PROPERTY + "bootstrap" + val PRIORITY = SAMZA_PROPERTY + "priority" - val STREAMS_PREFIX = "streams." - val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s." - val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM - val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME - val SAMZA_STREAM_PROPERTY_PREFIX = STREAM_ID_PREFIX + SAMZA_PROPERTY + // We don't want any external dependencies on these patterns while both exist. Use getProperty to ensure proper values. + private val STREAMS_PREFIX = "streams." + private val STREAM_PREFIX = "systems.%s.streams.%s." - val STREAM_PREFIX = "systems.%s.streams.%s." + protected val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s." + protected val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM + protected val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME implicit def Config2Stream(config: Config) = new StreamConfig(config) } class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { - def getStreamMsgSerde(systemStream: SystemStream) = nonEmptyOption(getProperty(systemStream, StreamConfig.MSG_SERDE)) + def getStreamMsgSerde(systemStream: SystemStream) = nonEmptyOption(getSamzaProperty(systemStream, StreamConfig.MSG_SERDE)) - def getStreamKeySerde(systemStream: SystemStream) = nonEmptyOption(getProperty(systemStream, StreamConfig.KEY_SERDE)) + def getStreamKeySerde(systemStream: SystemStream) = nonEmptyOption(getSamzaProperty(systemStream, StreamConfig.KEY_SERDE)) def getResetOffset(systemStream: SystemStream) = - Option(getProperty(systemStream, StreamConfig.CONSUMER_RESET_OFFSET)) match { + Option(getSamzaProperty(systemStream, StreamConfig.CONSUMER_RESET_OFFSET)) match { case Some("true") => true case Some("false") => false case Some(resetOffset) => @@ -62,8 +64,18 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { case _ => false } + def isResetOffsetConfigured(systemStream: SystemStream) = containsSamzaProperty(systemStream, StreamConfig.CONSUMER_RESET_OFFSET) + def getDefaultStreamOffset(systemStream: SystemStream) = - Option(getProperty(systemStream, StreamConfig.CONSUMER_OFFSET_DEFAULT)) + Option(getSamzaProperty(systemStream, StreamConfig.CONSUMER_OFFSET_DEFAULT)) + + def isDefaultStreamOffsetConfigured(systemStream: SystemStream) = containsSamzaProperty(systemStream, StreamConfig.CONSUMER_OFFSET_DEFAULT) + + def getBootstrapEnabled(systemStream: SystemStream) = + java.lang.Boolean.parseBoolean(getSamzaProperty(systemStream, StreamConfig.BOOTSTRAP)) + + def getPriority(systemStream: SystemStream) = + java.lang.Integer.parseInt(getSamzaProperty(systemStream, StreamConfig.PRIORITY, "-1")) /** * Returns a list of all SystemStreams that have a serde defined from the config file. @@ -81,30 +93,14 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { val systemStreams = subset(StreamConfig.STREAMS_PREFIX) .keys .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE)) - .map(k => { - val streamId = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ ) - streamIdToSystemStream(streamId) - }).toSet + .map(k => k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )) + .filter(streamId => systemName.equals(getSystem(streamId))) + .map(streamId => streamIdToSystemStream(streamId)).toSet legacySystemStreams.union(systemStreams) } /** - * Gets the stream properties from the legacy config style: - * systems.{system}.streams.{streams}.* - * - * @param systemName the system name under which the properties are configured - * @param streamName the stream name - * @return the map of properties for the stream - */ - private def getSystemStreamProperties(systemName: String, streamName: String) = { - if (systemName == null || streamName == null) { - Map() - } - config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true) - } - - /** * Gets the properties for the specified streamId from the config. * It first applies any legacy configs from this config location: * systems.{system}.streams.{stream}.* @@ -112,15 +108,16 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { * It then overrides them with properties of the new config format: * streams.{streamId}.* * + * Only returns properties of the stream itself, not any of the samza properties for the stream. + * * @param streamId the identifier for the stream in the config. * @return the merged map of config properties from both the legacy and new config styles */ def getStreamProperties(streamId: String) = { - val allProperties = subset(StreamConfig.STREAM_ID_PREFIX format streamId) + val allProperties = getAllStreamProperties(streamId) val samzaProperties = allProperties.subset(StreamConfig.SAMZA_PROPERTY, false) val filteredStreamProperties:java.util.Map[String, String] = allProperties.filterKeys(k => !samzaProperties.containsKey(k)) - val inheritedLegacyProperties:java.util.Map[String, String] = getSystemStreamProperties(getSystem(streamId), getPhysicalName(streamId)) - new MapConfig(java.util.Arrays.asList(inheritedLegacyProperties, filteredStreamProperties)) + new MapConfig(filteredStreamProperties) } /** @@ -153,14 +150,21 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { } /** - * Gets the specified property for a SystemStream. + * Gets the specified Samza property for a SystemStream. A Samza property is a property that controls how Samza + * interacts with the stream, as opposed to a property of the stream itself. * * Note, because the translation is not perfect between SystemStream and streamId, * this method is not identical to getProperty(streamId, property) + * + * @param systemStream the SystemStream for which the property value will be retrieved. + * @param property the samza property name excluding the leading delimiter. e.g. "samza.x.y" */ - private def getProperty(systemStream: SystemStream, property: String): String = { - val streamVal = getStreamProperties(systemStreamToStreamId(systemStream)).get(property) + protected def getSamzaProperty(systemStream: SystemStream, property: String): String = { + if (!property.startsWith(StreamConfig.SAMZA_PROPERTY)) { + throw new IllegalArgumentException("Attempt to fetch a non samza property for SystemStream %s named %s" format(systemStream, property)) + } + val streamVal = getAllStreamProperties(systemStreamToStreamId(systemStream)).get(property) if (streamVal != null) { streamVal } else { @@ -168,8 +172,78 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { } } + /** + * Gets the specified Samza property for a SystemStream. A Samza property is a property that controls how Samza + * interacts with the stream, as opposed to a property of the stream itself. + * + * Note, because the translation is not perfect between SystemStream and streamId, + * this method is not identical to getProperty(streamId, property) + * + * @param systemStream the SystemStream for which the property value will be retrieved. + * @param property the samza property name excluding the leading delimiter. e.g. "samza.x.y" + * @param defaultValue the default value to use if the property value is not found + * + */ + protected def getSamzaProperty(systemStream: SystemStream, property: String, defaultValue: String): String = { + val streamVal = getSamzaProperty(systemStream, property) + + if (streamVal != null) { + streamVal + } else { + defaultValue + } + } + + /** + * Gets the specified Samza property for a SystemStream. A Samza property is a property that controls how Samza + * interacts with the stream, as opposed to a property of the stream itself. + * + * Note, because the translation is not perfect between SystemStream and streamId, + * this method is not identical to getProperty(streamId, property) + */ + protected def containsSamzaProperty(systemStream: SystemStream, property: String): Boolean = { + if (!property.startsWith(StreamConfig.SAMZA_PROPERTY)) { + throw new IllegalArgumentException("Attempt to fetch a non samza property for SystemStream %s named %s" format(systemStream, property)) + } + return getSamzaProperty(systemStream, property) != null + } + + + /** + * Gets the stream properties from the legacy config style: + * systems.{system}.streams.{streams}.* + * + * @param systemName the system name under which the properties are configured + * @param streamName the stream name + * @return the map of properties for the stream + */ + private def getSystemStreamProperties(systemName: String, streamName: String) = { + if (systemName == null || streamName == null) { + Map() + } + config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true) + } + + /** + * Gets the properties for the specified streamId from the config. + * It first applies any legacy configs from this config location: + * systems.{system}.streams.{stream}.* + * + * It then overrides them with properties of the new config format: + * streams.{streamId}.* + * + * @param streamId the identifier for the stream in the config. + * @return the merged map of config properties from both the legacy and new config styles + */ + private def getAllStreamProperties(streamId: String) = { + val allProperties = subset(StreamConfig.STREAM_ID_PREFIX format streamId) + val inheritedLegacyProperties:java.util.Map[String, String] = getSystemStreamProperties(getSystem(streamId), getPhysicalName(streamId)) + new MapConfig(java.util.Arrays.asList(inheritedLegacyProperties, allProperties)) + } + private def getStreamIds(): Iterable[String] = { - subset(StreamConfig.STREAMS_PREFIX).keys + // StreamIds are not allowed to have '.' so the first index of '.' marks the end of the streamId. + subset(StreamConfig.STREAMS_PREFIX).keys.map(key => key.substring(0, key.indexOf("."))) } private def getStreamIdsForSystem(system: String): Iterable[String] = { http://git-wip-us.apache.org/repos/asf/samza/blob/daed7a9a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java new file mode 100644 index 0000000..639be3d --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.config; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.system.SystemStream; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestStreamConfig { + private static final String SYSTEM_STREAM_PATTERN = "systems.%s.streams.%s."; + private static final String STREAM_ID_PATTERN = "streams.%s."; + + private static final String STREAM1_SYSTEM = "Sys1"; + private static final String STREAM1_PHYSICAL_NAME = "Str1"; + private static final String STREAM1_STREAM_ID = "streamId1"; + private static final SystemStream SYSTEM_STREAM_1 = new SystemStream(STREAM1_SYSTEM, STREAM1_PHYSICAL_NAME); + + private static final String STREAM2_SYSTEM = "Sys2"; + private static final String STREAM2_PHYSICAL_NAME = "Str2"; + private static final String STREAM2_STREAM_ID = "streamId2"; + private static final SystemStream SYSTEM_STREAM_2 = new SystemStream(STREAM2_SYSTEM, STREAM2_PHYSICAL_NAME); + + + @Test(expected = IllegalArgumentException.class) + public void testGetSamzaPropertyThrowsIfInvalidPropertyName() { + StreamConfig config = buildConfig("key1", "value1", "key2", "value2"); + config.getSamzaProperty(SYSTEM_STREAM_1, "key1"); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetSamzaPropertyWithDefaultThrowsIfInvalidPropertyName() { + StreamConfig config = buildConfig("key1", "value1", "key2", "value2"); + config.getSamzaProperty(SYSTEM_STREAM_1, "key1", "default"); + } + + // 00 + @Test + public void testGetSamzaPropertyDoesNotExist() { + StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, "samza.key1"), "value1", "key2", "value2"); + assertNull(config.getSamzaProperty(SYSTEM_STREAM_1, "samza.keyNonExistent")); + } + + // 01 + @Test + public void testGetSamzaPropertyFromSystemStream() { + StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, "samza.key1"), "value1", "key2", "value2"); + assertEquals("value1", config.getSamzaProperty(SYSTEM_STREAM_1, "samza.key1")); + } + + // 10 + @Test + public void testGetSamzaPropertyFromStreamId() { + StreamConfig config = buildConfig("key1", "value1", + buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2", + buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM, + buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME); + assertEquals("value2", config.getSamzaProperty(SYSTEM_STREAM_1, "samza.key2")); + } + + // 11 + @Test + public void testGetSamzaPropertyFromSystemStreamAndStreamId() { + StreamConfig config = buildConfig("key1", "value1", + buildProp(SYSTEM_STREAM_1, "samza.key2"), "value2", + buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2OVERRIDE", + buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM, + buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME); + assertEquals("value2OVERRIDE", config.getSamzaProperty(SYSTEM_STREAM_1, "samza.key2")); + } + + @Test(expected = IllegalArgumentException.class) + public void testContainsSamzaPropertyThrowsIfInvalidPropertyName() { + StreamConfig config = buildConfig("key1", "value1", "key2", "value2"); + config.containsSamzaProperty(new SystemStream("SysX", "StrX"), "key1"); + } + + // 00 + @Test + public void testContainsSamzaPropertyDoesNotExist() { + StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, "samza.key1"), "value1", "key2", "value2"); + assertFalse(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.keyNonExistent")); + } + + // 01 + @Test + public void testContainsSamzaPropertyFromSystemStream() { + StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, "samza.key1"), "value1", "key2", "value2"); + assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key1")); + } + + // 10 + @Test + public void testContainsSamzaPropertyFromStreamId() { + StreamConfig config = buildConfig("key1", "value1", + buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2", + buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM, + buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME); + assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key2")); + } + + // 11 + @Test + public void testContainsSamzaPropertyFromSystemStreamAndStreamId() { + StreamConfig config = buildConfig("key1", "value1", + buildProp(SYSTEM_STREAM_1, "samza.key2"), "value2", + buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2OVERRIDE", + buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM, + buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME); + assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key2")); + } + + // 00 + @Test + public void testGetSerdeStreamsDoesNotExist() { + StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, "samza.key1"), "value1", "key2", "value2"); + assertTrue(config.getSerdeStreams(STREAM1_SYSTEM).isEmpty()); + } + + // 01 + @Test + public void testGetSerdeStreamsFromSystemStream() { + StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, StreamConfig.KEY_SERDE()), "value1", + buildProp(SYSTEM_STREAM_2, StreamConfig.MSG_SERDE()), "value2", + "key3", "value3"); + assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size()); + assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size()); + assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get()); + assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get()); + } + + // 10 + @Test + public void testGetSerdeStreamsFromStreamId() { + StreamConfig config = buildConfig( + buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM, + buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME, + buildProp(STREAM1_STREAM_ID, StreamConfig.KEY_SERDE()), "value1", + + buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM, + buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM2_PHYSICAL_NAME, + buildProp(STREAM2_STREAM_ID, StreamConfig.MSG_SERDE()), "value2", + "key3", "value3"); + assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size()); + assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size()); + assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get()); + assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get()); + } + + // 11 + @Test + public void testGetSerdeStreamsFromSystemStreamAndStreamId() { + StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, StreamConfig.KEY_SERDE()), "value1", + buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM, + buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME, + buildProp(STREAM1_STREAM_ID, StreamConfig.KEY_SERDE()), "value1OVERRIDE", + buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM, + buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM2_PHYSICAL_NAME, + buildProp(STREAM2_STREAM_ID, StreamConfig.MSG_SERDE()), "value2", + "key3", "value3"); + + assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size()); + assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size()); + assertEquals("value1OVERRIDE", config.getStreamKeySerde(SYSTEM_STREAM_1).get()); + assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get()); + } + + private StreamConfig buildConfig(String... kvs) { + if (kvs.length % 2 != 0) { + throw new IllegalArgumentException("There must be parity between the keys and values"); + } + + Map<String, String> configMap = new HashMap<>(); + for (int i = 0; i < kvs.length - 1; i += 2) { + configMap.put(kvs[i], kvs[i + 1]); + } + return new StreamConfig(new MapConfig(configMap)); + } + + private String buildProp(String streamId, String suffix) { + return String.format(STREAM_ID_PATTERN, streamId) + suffix; + } + + private String buildProp(SystemStream systemStream, String suffix) { + return String.format(SYSTEM_STREAM_PATTERN, systemStream.getSystem(), systemStream.getStream()) + suffix; + } + + private Config addConfigs(Config original, String... kvs) { + Map<String, String> result = new HashMap<>(); + result.putAll(original); + result.putAll(buildConfig(kvs)); + return new MapConfig(result); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/daed7a9a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java index 8d7db9f..eeb783c 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java @@ -30,6 +30,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; + public class TestAbstractApplicationRunner { private static final String STREAM_ID = "t3st-Stream_Id"; private static final String STREAM_ID_INVALID = "test#Str3amId!";
