This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d91400  SAMZA-2328: [Scala cleanup] Convert StreamConfig to Java 
(part 1: update test coverage and clarify functionality in javadocs) (#1166)
6d91400 is described below

commit 6d91400536612aa75af752fa66a31f99236c7b54
Author: Cameron Lee <[email protected]>
AuthorDate: Thu Sep 26 09:16:15 2019 -0700

    SAMZA-2328: [Scala cleanup] Convert StreamConfig to Java (part 1: update 
test coverage and clarify functionality in javadocs) (#1166)
---
 .../java/org/apache/samza/config/SystemConfig.java |   3 +-
 .../org/apache/samza/config/StreamConfig.scala     | 167 ++--
 .../org/apache/samza/config/TestStreamConfig.java  | 853 ++++++++++++++++-----
 3 files changed, 780 insertions(+), 243 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
index 93a0c32..1403b0d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
@@ -40,8 +40,7 @@ public class SystemConfig extends MapConfig {
 
   private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
   public static final String SYSTEM_FACTORY_FORMAT = SYSTEMS_PREFIX + "%s" + 
SYSTEM_FACTORY_SUFFIX;
-  @VisibleForTesting
-  static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_ID_PREFIX 
+ "default.stream.";
+  public static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = 
SYSTEM_ID_PREFIX + "default.stream.";
 
   // If true, automatically delete committed messages from streams whose 
committed messages can be deleted.
   // A stream's committed messages can be deleted if it is a intermediate 
stream, or if user has manually
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 321af6e..6030b93 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -58,6 +58,23 @@ object StreamConfig {
   implicit def Config2Stream(config: Config) = new StreamConfig(config)
 }
 
+/**
+ * Helper for accessing configs related to stream properties.
+ *
+ * For most configs, this currently supports two different formats for 
specifying stream properties:
+ * 1) "streams.{streamId}.{property}" (recommended to use this format)
+ * 2) "systems.{systemName}.streams.{streamName}.{property}" (legacy)
+ * Note that some config lookups are only supported through the 
"streams.{streamId}.{property}". See the specific
+ * accessor method to determine which formats are supported.
+ *
+ * Summary of terms:
+ * - streamId: logical identifier used for a stream; configs are specified 
using this streamId
+ * - physical stream: concrete name for a stream (if the physical stream is 
not explicitly configured, then the streamId
+ *   is used as the physical stream
+ * - streamName: within the javadoc for this class, streamName is the same as 
physical stream
+ * - samza property: property which is Samza-specific, which will have 
"samza." as a prefix (e.g. "samza.key.serde");
+ *   this is in contrast to stream-specific properties which are related to 
specific stream technologies
+ */
 class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging 
{
   def getStreamMsgSerde(systemStream: SystemStream) = 
nonEmptyOption(getSamzaProperty(systemStream, StreamConfig.MSG_SERDE))
 
@@ -118,18 +135,19 @@ class StreamConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   }
 
   /**
-    * Gets the properties for the specified streamId from the config.
-    * It first applies any legacy configs from this config location:
-    * systems.{system}.streams.{stream}.*
-    *
-    * It then overrides them with properties of the new config format:
-    * streams.{streamId}.*
-    *
-    * Only returns properties of the stream itself, not any of the samza 
properties for the stream.
-    *
-    * @param streamId the identifier for the stream in the config.
-    * @return         the merged map of config properties from both the legacy 
and new config styles
-    */
+   * Gets the properties for the streamId which are not Samza properties (i.e. 
do not have a "samza." prefix). This
+   * includes current and legacy config styles.
+   * This will return a Config with the properties that match the following 
formats (if a property is specified through
+   * multiple formats, priority is top to bottom):
+   * 1) "streams.{streamId}.{property}"
+   * 2) "systems.{systemName}.streams.{streamName}.{property}" where 
systemName is the system mapped to the streamId in
+   * the config and streamName is the physical stream name mapped to the 
stream id
+   * 3) "systems.{systemName}.default.stream.{property}" where systemName is 
the system mapped to the streamId in the
+   * config
+   *
+   * @param streamId the identifier for the stream in the config.
+   * @return         the merged map of config properties from both the legacy 
and new config styles
+   */
   def getStreamProperties(streamId: String) = {
     val allProperties = getAllStreamProperties(streamId)
     val samzaProperties = allProperties.subset(StreamConfig.SAMZA_PROPERTY, 
false)
@@ -199,16 +217,31 @@ class StreamConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   }
 
   /**
-    * Gets the specified Samza property for a SystemStream. A Samza property 
is a property that controls how Samza
-    * interacts with the stream, as opposed to a property of the stream itself.
-    *
-    * Note, because the translation is not perfect between SystemStream and 
streamId,
-    * this method is not identical to getProperty(streamId, property)
-    *
-    * @param systemStream the SystemStream for which the property value will 
be retrieved.
-    * @param property the samza property name excluding the leading delimiter. 
e.g. "samza.x.y"
-    */
-  protected def getSamzaProperty(systemStream: SystemStream, property: 
String): String = {
+   * Gets the specified Samza property for a SystemStream. A Samza property is 
a property that controls how Samza
+   * interacts with the stream, as opposed to a property of the stream itself.
+   *
+   * First, tries to map the systemStream to a streamId. This will only find a 
streamId if the stream is a physical name
+   * (explicitly mapped physical name or a stream id without a physical name 
mapping). That means this will not map a
+   * stream id to itself if there is a mapping from the stream id to a 
physical stream name. This also requires that the
+   * stream id is mapped to a system in the config.
+   * If a stream id is found:
+   * 1) Look for "streams.{streamId}.{property}" for the stream id.
+   * 2) Otherwise, look for 
"systems.{systemName}.streams.{streamName}.{property}" in which the systemName 
is the system
+   * mapped to the stream id and the streamName is the physical stream name 
for the stream id.
+   * 3) Otherwise, look for "systems.{systemName}.default.stream.{property}" 
in which the systemName is the system
+   * mapped to the stream id.
+   * If a stream id was not found or no property could be found using the 
above keys:
+   * 1) Look for "systems.{systemName}.streams.{streamName}.{property}" in 
which the systemName is the system in the
+   * input systemStream and the streamName is the stream from the input 
systemStream.
+   * 2) Otherwise, look for "systems.{systemName}.default.stream.{property}" 
in which the systemName is the system
+   * in the input systemStream.
+   *
+   * @param systemStream the SystemStream for which the property value will be 
retrieved.
+   * @param property the samza property key (including the "samza." prefix); 
for example, for both
+   *                 "streams.streamId.samza.prop.key" and 
"systems.system.streams.streamName.samza.prop.key", this
+   *                 argument should have the value "samza.prop.key"
+   */
+  private def getSamzaProperty(systemStream: SystemStream, property: String): 
String = {
     if (!property.startsWith(StreamConfig.SAMZA_PROPERTY)) {
       throw new IllegalArgumentException(
         "Attempt to fetch a non samza property for SystemStream %s named %s" 
format(systemStream, property))
@@ -223,18 +256,16 @@ class StreamConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   }
 
   /**
-    * Gets the specified Samza property for a SystemStream. A Samza property 
is a property that controls how Samza
-    * interacts with the stream, as opposed to a property of the stream itself.
-    *
-    * Note, because the translation is not perfect between SystemStream and 
streamId,
-    * this method is not identical to getProperty(streamId, property)
-    *
-    * @param systemStream the SystemStream for which the property value will 
be retrieved.
-    * @param property the samza property name excluding the leading delimiter. 
e.g. "samza.x.y"
-    * @param defaultValue the default value to use if the property value is 
not found
-    *
-    */
-  protected def getSamzaProperty(systemStream: SystemStream, property: String, 
defaultValue: String): String = {
+   * Gets a Samza property, with a default value used if no property value is 
found.
+   * See getSamzaProperty(SystemStream, String).
+   *
+   * @param systemStream the SystemStream for which the property value will be 
retrieved.
+   * @param property the samza property key (including the "samza." prefix); 
for example, for both
+   *                 "streams.streamId.samza.prop.key" and 
"systems.system.streams.streamName.samza.prop.key", this
+   *                 argument should have the value "samza.prop.key"
+   * @param defaultValue the default value to use if the property value is not 
found
+   */
+  private def getSamzaProperty(systemStream: SystemStream, property: String, 
defaultValue: String): String = {
     val streamVal = getSamzaProperty(systemStream, property)
 
     if (streamVal != null) {
@@ -245,13 +276,15 @@ class StreamConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   }
 
   /**
-    * Gets the specified Samza property for a SystemStream. A Samza property 
is a property that controls how Samza
-    * interacts with the stream, as opposed to a property of the stream itself.
-    *
-    * Note, because the translation is not perfect between SystemStream and 
streamId,
-    * this method is not identical to getProperty(streamId, property)
-    */
-  protected def containsSamzaProperty(systemStream: SystemStream, property: 
String): Boolean = {
+   * Determines if a Samza property is specified.
+   * See getSamzaProperty(SystemStream, String).
+   *
+   * @param systemStream the SystemStream for the property value to check
+   * @param property the samza property key (including the "samza." prefix); 
for example, for both
+   *                 "streams.streamId.samza.prop.key" and 
"systems.system.streams.streamName.samza.prop.key", this
+   *                 argument should have the value "samza.prop.key"
+   */
+  private def containsSamzaProperty(systemStream: SystemStream, property: 
String): Boolean = {
     if (!property.startsWith(StreamConfig.SAMZA_PROPERTY)) {
       throw new IllegalArgumentException(
         "Attempt to fetch a non samza property for SystemStream %s named %s" 
format(systemStream, property))
@@ -261,13 +294,16 @@ class StreamConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
 
 
   /**
-    * Gets the stream properties from the legacy config style:
-    * systems.{system}.streams.{streams}.*
-    *
-    * @param systemName the system name under which the properties are 
configured
-    * @param streamName the stream name
-    * @return           the map of properties for the stream
-    */
+   * Finds the properties from the legacy config style (config key includes 
system).
+   * This will return a Config with the properties that match the following 
formats (if a property is specified through
+   * multiple formats, priority is top to bottom):
+   * 1) "systems.{systemName}.streams.{streamName}.{property}"
+   * 2) "systems.{systemName}.default.stream.{property}"
+   *
+   * @param systemName the system name under which the properties are 
configured
+   * @param streamName the stream name
+   * @return           the map of properties for the stream
+   */
   private def getSystemStreamProperties(systemName: String, streamName: 
String) = {
     if (systemName == null) {
       Map()
@@ -279,16 +315,18 @@ class StreamConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   }
 
   /**
-    * Gets the properties for the specified streamId from the config.
-    * It first applies any legacy configs from this config location:
-    * systems.{system}.streams.{stream}.*
-    *
-    * It then overrides them with properties of the new config format:
-    * streams.{streamId}.*
-    *
-    * @param streamId the identifier for the stream in the config.
-    * @return         the merged map of config properties from both the legacy 
and new config styles
-    */
+   * Gets all of the properties for the specified streamId (includes current 
and legacy config styles).
+   * This will return a Config with the properties that match the following 
formats (if a property is specified through
+   * multiple formats, priority is top to bottom):
+   * 1) "streams.{streamId}.{property}"
+   * 2) "systems.{systemName}.streams.{streamName}.{property}" where 
systemName is the system mapped to the streamId in
+   * the config and streamName is the physical stream name mapped to the 
stream id
+   * 3) "systems.{systemName}.default.stream.{property}" where systemName is 
the system mapped to the streamId in the
+   * config
+   *
+   * @param streamId the identifier for the stream in the config.
+   * @return         the merged map of config properties from both the legacy 
and new config styles
+   */
   private def getAllStreamProperties(streamId: String) = {
     val allProperties = subset(StreamConfig.STREAM_ID_PREFIX format streamId)
     val inheritedLegacyProperties: java.util.Map[String, String] =
@@ -300,7 +338,18 @@ class StreamConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
     getStreamIds().filter(streamId => system.equals(getSystem(streamId)))
   }
 
-  def systemStreamToStreamId(systemStream: SystemStream): String = {
+  /**
+   * Finds the stream id which corresponds to the systemStream.
+   * This finds the stream id that is mapped to the system in systemStream 
through the config and that has a physical
+   * name (the physical name might be the stream id itself if there is no 
explicit mapping) that matches the stream in
+   * systemStream.
+   * Note: If the stream in the systemStream is a stream id which is mapped to 
a physical stream, then that stream won't
+   * be returned as a stream id here, since the stream in systemStream doesn't 
match the physical stream name.
+   *
+   * @param systemStream system stream to map to stream id
+   * @return             stream id corresponding to the system stream
+   */
+  private def systemStreamToStreamId(systemStream: SystemStream): String = {
     val streamIds = getStreamIdsForSystem(systemStream.getSystem)
       .filter(streamId => 
systemStream.getStream().equals(getPhysicalName(streamId)))
     if (streamIds.size > 1) {
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
index 474a382..b91eeb0 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
@@ -18,252 +18,741 @@
  */
 package org.apache.samza.config;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Collections;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import org.apache.samza.system.SystemStream;
 import org.junit.Test;
+import scala.Option;
+import scala.collection.JavaConverters;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
-public class TestStreamConfig {
-  private static final String SYSTEM_STREAM_PATTERN = "systems.%s.streams.%s.";
-  private static final String STREAM_ID_PATTERN = "streams.%s.";
-
-  private static final String STREAM1_SYSTEM = "Sys1";
-  private static final String STREAM1_PHYSICAL_NAME = "Str1";
-  private static final String STREAM1_STREAM_ID = "streamId1";
-  private static final SystemStream SYSTEM_STREAM_1 = new 
SystemStream(STREAM1_SYSTEM, STREAM1_PHYSICAL_NAME);
 
-  private static final String STREAM2_SYSTEM = "Sys2";
-  private static final String STREAM2_PHYSICAL_NAME = "Str2";
-  private static final String STREAM2_STREAM_ID = "streamId2";
-  private static final SystemStream SYSTEM_STREAM_2 = new 
SystemStream(STREAM2_SYSTEM, STREAM2_PHYSICAL_NAME);
-
-  private static final String STREAM3_SYSTEM = "Sys3";
-  private static final String STREAM3_PHYSICAL_NAME = "Str3";
-  private static final String STREAM3_STREAM_ID = "streamId3";
-  private static final SystemStream SYSTEM_STREAM_3 = new 
SystemStream(STREAM3_SYSTEM, STREAM3_PHYSICAL_NAME);
-
-  private static final String SYSTEM_DEFAULT_STREAM_PATTERN = 
"systems.%s.default.stream.";
+public class TestStreamConfig {
+  private static final String SYSTEM = "system";
+  private static final String STREAM_ID = "streamId";
+  private static final SystemStream SYSTEM_STREAM = new SystemStream(SYSTEM, 
STREAM_ID);
+  private static final String OTHER_STREAM_ID = "otherStreamId";
+  private static final String PHYSICAL_STREAM = "physicalStream";
+  private static final SystemStream SYSTEM_STREAM_PHYSICAL = new 
SystemStream(SYSTEM, PHYSICAL_STREAM);
+  private static final String SAMZA_IGNORED_PROPERTY = 
"samza.ignored.property";
+  private static final String UNUSED_VALUE = "should_not_be_used";
 
+  @Test
+  public void testGetStreamMsgSerde() {
+    String value = "my.msg.serde";
+    doTestSamzaProperty(StreamConfig.MSG_SERDE(), value,
+        (config, systemStream) -> assertEquals(Option.apply(value), 
config.getStreamMsgSerde(systemStream)));
+    doTestSamzaProperty(StreamConfig.MSG_SERDE(), "",
+        (config, systemStream) -> assertEquals(Option.empty(), 
config.getStreamMsgSerde(systemStream)));
+    doTestSamzaPropertyDoesNotExist(StreamConfig.MSG_SERDE(),
+        (config, systemStream) -> assertEquals(Option.empty(), 
config.getStreamMsgSerde(systemStream)));
+    doTestSamzaPropertyInvalidConfig(StreamConfig::getStreamMsgSerde);
+  }
 
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetSamzaPropertyThrowsIfInvalidPropertyName() {
-    StreamConfig config = buildConfig("key1", "value1", "key2", "value2");
-    config.getSamzaProperty(SYSTEM_STREAM_1, "key1");
+  @Test
+  public void testGetStreamKeySerde() {
+    String value = "my.key.serde";
+    doTestSamzaProperty(StreamConfig.KEY_SERDE(), value,
+        (config, systemStream) -> assertEquals(Option.apply(value), 
config.getStreamKeySerde(systemStream)));
+    doTestSamzaProperty(StreamConfig.KEY_SERDE(), "",
+        (config, systemStream) -> assertEquals(Option.empty(), 
config.getStreamKeySerde(systemStream)));
+    doTestSamzaPropertyDoesNotExist(StreamConfig.KEY_SERDE(),
+        (config, systemStream) -> assertEquals(Option.empty(), 
config.getStreamKeySerde(systemStream)));
+    doTestSamzaPropertyInvalidConfig(StreamConfig::getStreamKeySerde);
   }
 
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetSamzaPropertyWithDefaultThrowsIfInvalidPropertyName() {
-    StreamConfig config = buildConfig("key1", "value1", "key2", "value2");
-    config.getSamzaProperty(SYSTEM_STREAM_1, "key1", "default");
+  @Test
+  public void testGetResetOffset() {
+    doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "true",
+        (config, systemStream) -> 
assertTrue(config.getResetOffset(systemStream)));
+    doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "false",
+        (config, systemStream) -> 
assertFalse(config.getResetOffset(systemStream)));
+    // if not true/false, then use false
+    doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "unknown_value",
+        (config, systemStream) -> 
assertFalse(config.getResetOffset(systemStream)));
+    doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_RESET_OFFSET(),
+        (config, systemStream) -> 
assertFalse(config.getResetOffset(systemStream)));
+    doTestSamzaPropertyInvalidConfig(StreamConfig::getResetOffset);
   }
 
-  // 00
   @Test
-  public void testGetSamzaPropertyDoesNotExist() {
-    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, 
"samza.key1"), "value1", "key2", "value2");
-    assertNull(config.getSamzaProperty(SYSTEM_STREAM_1, 
"samza.keyNonExistent"));
+  public void testIsResetOffsetConfigured() {
+    doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "true",
+        (config, systemStream) -> 
assertTrue(config.isResetOffsetConfigured(systemStream)));
+    doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "false",
+        (config, systemStream) -> 
assertTrue(config.isResetOffsetConfigured(systemStream)));
+    // if not true/false, then use false
+    doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "unknown_value",
+        (config, systemStream) -> 
assertTrue(config.isResetOffsetConfigured(systemStream)));
+    doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_RESET_OFFSET(),
+        (config, systemStream) -> 
assertFalse(config.isResetOffsetConfigured(systemStream)));
+    doTestSamzaPropertyInvalidConfig(StreamConfig::isResetOffsetConfigured);
   }
 
-  // 01
   @Test
-  public void testGetSamzaPropertyFromSystemStream() {
-    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, 
"samza.key1"), "value1", "key2", "value2");
-    assertEquals("value1", config.getSamzaProperty(SYSTEM_STREAM_1, 
"samza.key1"));
+  public void testGetDefaultStreamOffset() {
+    String value = "my_offset_default";
+    doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT(), value,
+        (config, systemStream) -> assertEquals(Option.apply(value), 
config.getDefaultStreamOffset(systemStream)));
+    doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT(), "",
+        (config, systemStream) -> assertEquals(Option.apply(""), 
config.getDefaultStreamOffset(systemStream)));
+    doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_OFFSET_DEFAULT(),
+        (config, systemStream) -> assertEquals(Option.empty(),
+            new StreamConfig(config).getDefaultStreamOffset(systemStream)));
+    doTestSamzaPropertyInvalidConfig(StreamConfig::getDefaultStreamOffset);
   }
 
-  // 10
   @Test
-  public void testGetSamzaPropertyFromStreamId() {
-    StreamConfig config = buildConfig("key1", "value1",
-        buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2",
-        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
-        buildProp(STREAM1_STREAM_ID, StreamConfig.BROADCAST()), "true",
-        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), 
STREAM1_PHYSICAL_NAME);
-    assertEquals("value2", config.getSamzaProperty(SYSTEM_STREAM_1, 
"samza.key2"));
-    assertEquals("true", config.getSamzaProperty(SYSTEM_STREAM_1, 
"samza.broadcast"));
+  public void testIsDefaultStreamOffsetConfigured() {
+    String value = "my_offset_default";
+    doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT(), value,
+        (config, systemStream) -> 
assertTrue(config.isDefaultStreamOffsetConfigured(systemStream)));
+    doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT(), "",
+        (config, systemStream) -> 
assertTrue(config.isDefaultStreamOffsetConfigured(systemStream)));
+    doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_OFFSET_DEFAULT(),
+        (config, systemStream) -> 
assertFalse(config.isDefaultStreamOffsetConfigured(systemStream)));
+    
doTestSamzaPropertyInvalidConfig(StreamConfig::isDefaultStreamOffsetConfigured);
   }
 
-  // 11
   @Test
-  public void testGetSamzaPropertyFromSystemStreamAndStreamId() {
-    StreamConfig config = buildConfig("key1", "value1",
-        buildProp(SYSTEM_STREAM_1, "samza.key2"), "value2",
-        buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2OVERRIDE",
-        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
-        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), 
STREAM1_PHYSICAL_NAME);
-    assertEquals("value2OVERRIDE", config.getSamzaProperty(SYSTEM_STREAM_1, 
"samza.key2"));
+  public void testGetBootstrapEnabled() {
+    doTestSamzaProperty(StreamConfig.BOOTSTRAP(), "true",
+        (config, systemStream) -> 
assertTrue(config.getBootstrapEnabled(systemStream)));
+    doTestSamzaProperty(StreamConfig.BOOTSTRAP(), "false",
+        (config, systemStream) -> 
assertFalse(config.getBootstrapEnabled(systemStream)));
+    // if not true/false, then use false
+    doTestSamzaProperty(StreamConfig.BOOTSTRAP(), "unknown_value",
+        (config, systemStream) -> 
assertFalse(config.getBootstrapEnabled(systemStream)));
+    doTestSamzaPropertyDoesNotExist(StreamConfig.BOOTSTRAP(),
+        (config, systemStream) -> 
assertFalse(config.getBootstrapEnabled(systemStream)));
+    doTestSamzaPropertyInvalidConfig(StreamConfig::getBootstrapEnabled);
   }
 
-  @Test(expected = IllegalArgumentException.class)
-  public void testContainsSamzaPropertyThrowsIfInvalidPropertyName() {
-    StreamConfig config = buildConfig("key1", "value1", "key2", "value2");
-    config.containsSamzaProperty(new SystemStream("SysX", "StrX"), "key1");
+  @Test
+  public void testGetBroadcastEnabled() {
+    doTestSamzaProperty(StreamConfig.BROADCAST(), "true",
+        (config, systemStream) -> 
assertTrue(config.getBroadcastEnabled(systemStream)));
+    doTestSamzaProperty(StreamConfig.BROADCAST(), "false",
+        (config, systemStream) -> 
assertFalse(config.getBroadcastEnabled(systemStream)));
+    // if not true/false, then use false
+    doTestSamzaProperty(StreamConfig.BROADCAST(), "unknown_value",
+        (config, systemStream) -> 
assertFalse(config.getBroadcastEnabled(systemStream)));
+    doTestSamzaPropertyDoesNotExist(StreamConfig.BROADCAST(),
+        (config, systemStream) -> 
assertFalse(config.getBroadcastEnabled(systemStream)));
+    doTestSamzaPropertyInvalidConfig(StreamConfig::getBroadcastEnabled);
   }
 
-  // 00
   @Test
-  public void testContainsSamzaPropertyDoesNotExist() {
-    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, 
"samza.key1"), "value1", "key2", "value2");
-    assertFalse(config.containsSamzaProperty(SYSTEM_STREAM_1, 
"samza.keyNonExistent"));
+  public void testGetPriority() {
+    doTestSamzaProperty(StreamConfig.PRIORITY(), "0",
+        (config, systemStream) -> assertEquals(0, 
config.getPriority(systemStream)));
+    doTestSamzaProperty(StreamConfig.PRIORITY(), "100",
+        (config, systemStream) -> assertEquals(100, 
config.getPriority(systemStream)));
+    doTestSamzaProperty(StreamConfig.PRIORITY(), "-1",
+        (config, systemStream) -> assertEquals(-1, 
config.getPriority(systemStream)));
+    doTestSamzaPropertyDoesNotExist(StreamConfig.PRIORITY(),
+        (config, systemStream) -> assertEquals(-1, 
config.getPriority(systemStream)));
+    doTestSamzaPropertyInvalidConfig(StreamConfig::getPriority);
   }
 
-  // 01
   @Test
-  public void testContainsSamzaPropertyFromSystemStream() {
-    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, 
"samza.key1"), "value1", "key2", "value2");
-    assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key1"));
+  public void testGetSerdeStreams() {
+    assertEquals(Collections.emptySet(),
+        JavaConverters.setAsJavaSetConverter(new StreamConfig(new 
MapConfig()).getSerdeStreams(SYSTEM)).asJava());
+
+    // not key/msg serde property for "streams."
+    StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
SAMZA_IGNORED_PROPERTY, UNUSED_VALUE)));
+    assertEquals(Collections.emptySet(),
+        
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+    // not matching system for "streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
StreamConfig.KEY_SERDE(), UNUSED_VALUE,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), 
"otherSystem")));
+    assertEquals(Collections.emptySet(),
+        
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+    // not key/msg serde property for "systems.<system>.streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
SAMZA_IGNORED_PROPERTY, UNUSED_VALUE)));
+    assertEquals(Collections.emptySet(),
+        
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+    // not matching system for "systems.<system>.streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), "otherSystem", STREAM_ID) 
+ StreamConfig.KEY_SERDE(),
+        UNUSED_VALUE)));
+    assertEquals(Collections.emptySet(),
+        
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+    // not matching system for "streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
StreamConfig.KEY_SERDE(), UNUSED_VALUE,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), 
"otherSystem")));
+    assertEquals(Collections.emptySet(),
+        
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+    String serdeValue = "my.serde.class";
+
+    // key serde for "streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
StreamConfig.KEY_SERDE(), serdeValue)));
+    assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)),
+        
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+    // msg serde for "streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
StreamConfig.MSG_SERDE(), serdeValue)));
+    assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)),
+        
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+    // serde for "streams." with physical stream name mapping
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM,
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
StreamConfig.KEY_SERDE(), serdeValue)));
+    assertEquals(Collections.singleton(new SystemStream(SYSTEM, 
PHYSICAL_STREAM)),
+        
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+    // key serde for "systems.<system>.streams."
+    streamConfig = new StreamConfig(new MapConfig(
+        ImmutableMap.of(String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, 
STREAM_ID) + StreamConfig.KEY_SERDE(),
+            serdeValue)));
+    assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)),
+        
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+    // msg serde for "systems.<system>.streams."
+    streamConfig = new StreamConfig(new MapConfig(
+        ImmutableMap.of(String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, 
STREAM_ID) + StreamConfig.MSG_SERDE(),
+            serdeValue)));
+    assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)),
+        
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+    // merge several different ways of providing serdes
+    String streamIdWithPhysicalName = "streamIdWithPhysicalName";
+    streamConfig = new StreamConfig(new MapConfig(new 
ImmutableMap.Builder<String, String>()
+        // need to map the stream ids to the system
+        .put(JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM)
+        // key and msg serde for "streams."
+        .put(String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
StreamConfig.KEY_SERDE(), serdeValue)
+        .put(String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
StreamConfig.MSG_SERDE(), serdeValue)
+        // key serde for "streams." with physical stream name mapping
+        .put(String.format(StreamConfig.STREAM_ID_PREFIX(), 
streamIdWithPhysicalName) + StreamConfig.KEY_SERDE(),
+            serdeValue)
+        .put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
streamIdWithPhysicalName), PHYSICAL_STREAM)
+        // key serde for "systems.<system>.streams."
+        .put(String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, 
OTHER_STREAM_ID) + StreamConfig.KEY_SERDE(),
+            serdeValue)
+        .build()));
+    assertEquals(Sets.newHashSet(new SystemStream(SYSTEM, STREAM_ID), new 
SystemStream(SYSTEM, PHYSICAL_STREAM),
+        new SystemStream(SYSTEM, OTHER_STREAM_ID)),
+        
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
   }
 
-  // 10
   @Test
-  public void testContainsSamzaPropertyFromStreamId() {
-    StreamConfig config = buildConfig("key1", "value1",
-        buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2",
-        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
-        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), 
STREAM1_PHYSICAL_NAME);
-    assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key2"));
+  public void testGetStreamProperties() {
+    assertEquals(new MapConfig(), new StreamConfig(new 
MapConfig()).getStreamProperties(STREAM_ID));
+
+    String propertyName = "stream.property.name";
+
+    // BEGIN: tests in which properties cannot be found in the config
+
+    // not matching stream id for "streams."
+    StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), OTHER_STREAM_ID) + 
propertyName, UNUSED_VALUE)));
+    assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
+
+    // not matching stream id for "systems.<system>.streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, OTHER_STREAM_ID) + 
propertyName, UNUSED_VALUE,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), OTHER_STREAM_ID), 
SYSTEM)));
+    assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
+
+    // no system mapping when using "systems.<system>.streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, UNUSED_VALUE)));
+    assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
+
+    // ignore property with "samza" prefix for "streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
SAMZA_IGNORED_PROPERTY, UNUSED_VALUE)));
+    assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
+
+    // ignore property with "samza" prefix for "streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
SAMZA_IGNORED_PROPERTY, UNUSED_VALUE,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), 
SYSTEM)));
+    assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
+
+    // should not map physical name back to stream id if physical name is 
passed as stream id
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, UNUSED_VALUE,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM)));
+    assertEquals(new MapConfig(), 
streamConfig.getStreamProperties(PHYSICAL_STREAM));
+
+    // BEGIN: tests in which properties can be found in the config
+
+    String propertyValue = "value";
+
+    // "streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
propertyName, propertyValue)));
+    assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+        streamConfig.getStreamProperties(STREAM_ID));
+
+    // "systems.<system>.streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, propertyValue,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), 
SYSTEM)));
+    assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+        streamConfig.getStreamProperties(STREAM_ID));
+
+    // "systems.<system>.default.stream."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, 
SYSTEM) + propertyName, propertyValue,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), 
SYSTEM)));
+    assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+        streamConfig.getStreamProperties(STREAM_ID));
+
+    // use physical name mapping for "systems.<system>.streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, PHYSICAL_STREAM) + 
propertyName, propertyValue,
+        // should not use stream id since there is physical stream
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, UNUSED_VALUE,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM)));
+    assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+        streamConfig.getStreamProperties(STREAM_ID));
+
+    // "streams." should override "systems.<system>.streams."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
propertyName, propertyValue,
+        // should not use "systems.<system>.streams." since there is a 
"streams." config
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, UNUSED_VALUE,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), 
SYSTEM)));
+    assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+        streamConfig.getStreamProperties(STREAM_ID));
+
+    // "systems.<system>.streams." should override 
"systems.<system>.default.stream."
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, propertyValue,
+        // should not use "systems.<system>.default.stream." since there is a 
"systems.<system>.streams."
+        String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, 
SYSTEM) + propertyName, UNUSED_VALUE,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), 
SYSTEM)));
+    assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+        streamConfig.getStreamProperties(STREAM_ID));
+
+    // merge multiple ways of specifying configs
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        // "streams."
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
"from.stream.id.property", "fromStreamIdValue",
+        // second "streams." property
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
"from.stream.id.other.property",
+        "fromStreamIdOtherValue",
+        // "systems.<system>.streams."
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
"from.system.stream.property",
+        "fromSystemStreamValue",
+        // need to map the stream id to a system
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), 
SYSTEM)));
+    assertEquals(new MapConfig(ImmutableMap.of(
+        "from.stream.id.property", "fromStreamIdValue",
+        "from.stream.id.other.property", "fromStreamIdOtherValue",
+        "from.system.stream.property", "fromSystemStreamValue")),
+        streamConfig.getStreamProperties(STREAM_ID));
   }
 
-  // 11
   @Test
-  public void testContainsSamzaPropertyFromSystemStreamAndStreamId() {
-    StreamConfig config = buildConfig("key1", "value1",
-        buildProp(SYSTEM_STREAM_1, "samza.key2"), "value2",
-        buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2OVERRIDE",
-        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
-        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), 
STREAM1_PHYSICAL_NAME);
-    assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key2"));
+  public void testGetSystem() {
+    assertNull(new StreamConfig(new MapConfig()).getSystem(STREAM_ID));
+
+    // system is specified directly
+    StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+        JobConfig.JOB_DEFAULT_SYSTEM, "otherSystem",
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), OTHER_STREAM_ID), 
"otherSystem")));
+    assertEquals(SYSTEM, streamConfig.getSystem(STREAM_ID));
+
+    // fall back to job default system
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), OTHER_STREAM_ID), 
"otherSystem")));
+    assertEquals(SYSTEM, streamConfig.getSystem(STREAM_ID));
   }
 
-  // 00
   @Test
-  public void testGetSerdeStreamsDoesNotExist() {
-    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, 
"samza.key1"), "value1", "key2", "value2");
-    assertTrue(config.getSerdeStreams(STREAM1_SYSTEM).isEmpty());
+  public void testGetPhysicalName() {
+    assertEquals(STREAM_ID, new StreamConfig(new 
MapConfig()).getPhysicalName(STREAM_ID));
+
+    // ignore mapping for other stream ids
+    StreamConfig streamConfig = new StreamConfig(new MapConfig(
+        
ImmutableMap.of(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
OTHER_STREAM_ID), PHYSICAL_STREAM)));
+    assertEquals(STREAM_ID, streamConfig.getPhysicalName(STREAM_ID));
+
+    streamConfig = new StreamConfig(new MapConfig(
+        
ImmutableMap.of(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
STREAM_ID), PHYSICAL_STREAM)));
+    assertEquals(PHYSICAL_STREAM, streamConfig.getPhysicalName(STREAM_ID));
   }
 
-  // 01
   @Test
-  public void testGetSerdeStreamsFromSystemStream() {
-    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, 
StreamConfig.KEY_SERDE()), "value1",
-        buildProp(SYSTEM_STREAM_2, StreamConfig.MSG_SERDE()), "value2",
-        "key3", "value3");
-    assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
-    assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size());
-    assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
-    assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get());
+  public void testGetIsIntermediateStream() {
+    assertFalse(new StreamConfig(new 
MapConfig()).getIsIntermediateStream(STREAM_ID));
+
+    // ignore mapping for other stream ids
+    StreamConfig streamConfig = new StreamConfig(new MapConfig(
+        
ImmutableMap.of(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), 
OTHER_STREAM_ID), "true")));
+    assertFalse(streamConfig.getIsIntermediateStream(STREAM_ID));
+
+    // do not use stream id property if physical name is passed as input
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), 
STREAM_ID), "true",
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM)));
+    assertFalse(streamConfig.getIsIntermediateStream(PHYSICAL_STREAM));
+
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), 
STREAM_ID), "true")));
+    assertTrue(streamConfig.getIsIntermediateStream(STREAM_ID));
+
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), 
STREAM_ID), "false")));
+    assertFalse(streamConfig.getIsIntermediateStream(STREAM_ID));
   }
 
-  // 10
   @Test
-  public void testGetSerdeStreamsFromStreamId() {
-    StreamConfig config = buildConfig(
-        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
-        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), 
STREAM1_PHYSICAL_NAME,
-        buildProp(STREAM1_STREAM_ID, StreamConfig.KEY_SERDE()), "value1",
-
-        buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM,
-        buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()), 
STREAM2_PHYSICAL_NAME,
-        buildProp(STREAM2_STREAM_ID, StreamConfig.MSG_SERDE()), "value2",
-        "key3", "value3");
-    assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
-    assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size());
-    assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
-    assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get());
+  public void testGetDeleteCommittedMessages() {
+    assertFalse(new StreamConfig(new 
MapConfig()).getDeleteCommittedMessages(STREAM_ID));
+
+    // ignore mapping for other stream ids
+    StreamConfig streamConfig = new StreamConfig(new MapConfig(
+        
ImmutableMap.of(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(),
 OTHER_STREAM_ID),
+            "true")));
+    assertFalse(streamConfig.getDeleteCommittedMessages(STREAM_ID));
+
+    // do not use stream id property if physical name is passed as input
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), 
STREAM_ID), "true",
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM)));
+    assertFalse(streamConfig.getDeleteCommittedMessages(PHYSICAL_STREAM));
+
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), 
STREAM_ID), "true")));
+    assertTrue(streamConfig.getDeleteCommittedMessages(STREAM_ID));
+
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), 
STREAM_ID), "false")));
+    assertFalse(streamConfig.getDeleteCommittedMessages(STREAM_ID));
   }
 
-  // 11
   @Test
-  public void testGetSerdeStreamsFromSystemStreamAndStreamId() {
-    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, 
StreamConfig.KEY_SERDE()), "value1",
-        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
-        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), 
STREAM1_PHYSICAL_NAME,
-        buildProp(STREAM1_STREAM_ID, StreamConfig.KEY_SERDE()), 
"value1OVERRIDE",
-        buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM,
-        buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()), 
STREAM2_PHYSICAL_NAME,
-        buildProp(STREAM2_STREAM_ID, StreamConfig.MSG_SERDE()), "value2",
-        "key3", "value3");
-
-    assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
-    assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size());
-    assertEquals("value1OVERRIDE", 
config.getStreamKeySerde(SYSTEM_STREAM_1).get());
-    assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get());
+  public void testGetIsBounded() {
+    assertFalse(new StreamConfig(new MapConfig()).getIsBounded(STREAM_ID));
+
+    // ignore mapping for other stream ids
+    StreamConfig streamConfig = new StreamConfig(new MapConfig(
+        ImmutableMap.of(String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), 
OTHER_STREAM_ID),
+            "true")));
+    assertFalse(streamConfig.getIsBounded(STREAM_ID));
+
+    // do not use stream id property if physical name is passed as input
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), STREAM_ID), 
"true",
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM)));
+    assertFalse(streamConfig.getIsBounded(PHYSICAL_STREAM));
+
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), STREAM_ID), 
"true")));
+    assertTrue(streamConfig.getIsBounded(STREAM_ID));
+
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), STREAM_ID), 
"false")));
+    assertFalse(streamConfig.getIsBounded(STREAM_ID));
   }
 
   @Test
-  public void testStreamPropertyDefaults() {
-    final String nonSamzaProperty = "replication.factor";
-    StreamConfig config = buildConfig(
-        buildSystemDefaultProp(STREAM1_SYSTEM, nonSamzaProperty), "1",
-        buildSystemDefaultProp(STREAM1_SYSTEM, StreamConfig.KEY_SERDE()), 
"value1",
-        buildSystemDefaultProp(STREAM1_SYSTEM, 
StreamConfig.CONSUMER_OFFSET_DEFAULT()), "newest",
-        buildProp(SYSTEM_STREAM_1, "dummyStreamProperty"), "dummyValue",
-        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
-        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), 
STREAM1_PHYSICAL_NAME,
-        buildSystemDefaultProp(STREAM2_SYSTEM, nonSamzaProperty), "2",
-        buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM,
-        buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()), 
STREAM2_PHYSICAL_NAME,
-        buildProp(STREAM2_STREAM_ID, nonSamzaProperty), "3",
-        buildSystemDefaultProp(STREAM3_SYSTEM, nonSamzaProperty), "4",
-        buildProp(STREAM3_STREAM_ID, StreamConfig.SYSTEM()), STREAM3_SYSTEM,
-        buildProp(STREAM3_STREAM_ID, StreamConfig.PHYSICAL_NAME()), 
STREAM3_PHYSICAL_NAME,
-        buildProp(SYSTEM_STREAM_3, nonSamzaProperty), "5",
-        "key3", "value3");
-
-
-
-    // Ensure that we can set legacy system properties via the new system wide 
default
-    assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
-    assertEquals(0, config.getSerdeStreams(STREAM1_SYSTEM).size());
-    assertEquals("value1", new 
SystemConfig(config).getSystemKeySerde(STREAM1_SYSTEM).get());
-    assertEquals("newest", 
config.getDefaultStreamOffset(SYSTEM_STREAM_1).get());
-
-    // Property set via systems.x.default.stream.* only
-    assertEquals("1", 
config.getStreamProperties(STREAM1_STREAM_ID).get(nonSamzaProperty));
-
-    // Property set via systems.x.default.stream.* and streams.y.*
-    assertEquals("3", 
config.getStreamProperties(STREAM2_STREAM_ID).get(nonSamzaProperty));
-
-    // Property set via systems.x.default.stream.* and system.x.streams.z.*
-    assertEquals("5", 
config.getStreamProperties(STREAM3_STREAM_ID).get(nonSamzaProperty));
-
-    assertEquals(false, config.getBroadcastEnabled(SYSTEM_STREAM_1));
+  public void testGetStreamIds() {
+    assertEquals(ImmutableList.of(), ImmutableList.copyOf(
+        JavaConverters.asJavaIterableConverter(new StreamConfig(new 
MapConfig()).getStreamIds()).asJava()));
+
+    StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
".property", "value")));
+    assertEquals(ImmutableList.of(STREAM_ID),
+        
ImmutableList.copyOf(JavaConverters.asJavaIterableConverter(streamConfig.getStreamIds()).asJava()));
+
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
".property.subProperty", "value")));
+    assertEquals(ImmutableList.of(STREAM_ID),
+        
ImmutableList.copyOf(JavaConverters.asJavaIterableConverter(streamConfig.getStreamIds()).asJava()));
+
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
".property0", "value",
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
".property1", "value",
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
".property.subProperty0", "value",
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
".property.subProperty1", "value",
+        String.format(StreamConfig.STREAM_ID_PREFIX(), OTHER_STREAM_ID) + 
".property", "value")));
+    assertEquals(ImmutableList.of(STREAM_ID, OTHER_STREAM_ID),
+        
ImmutableList.copyOf(JavaConverters.asJavaIterableConverter(streamConfig.getStreamIds()).asJava()));
   }
 
+  private static void doTestSamzaProperty(String propertyName, String 
propertyValue, SamzaPropertyAssertion assertion) {
+    doTestSamzaPropertyAccess(propertyName, propertyValue, assertion);
+    doTestSamzaPropertyAccessWithPhysicalStream(propertyName, propertyValue, 
assertion);
+    doTestSamzaPropertyPriority(propertyName, propertyValue, assertion);
+    doTestSamzaPropertyMultipleStreams(propertyName, propertyValue, assertion);
+  }
 
-  private StreamConfig buildConfig(String... kvs) {
-    if (kvs.length % 2 != 0) {
-      throw new IllegalArgumentException("There must be parity between the 
keys and values");
-    }
+  /**
+   * Tests for Samza property access using different lookup methods, when 
using stream id in system stream.
+   * This includes tests in which there is no specified physical stream, so 
the stream id is used as the physical
+   * stream.
+   */
+  private static void doTestSamzaPropertyAccess(String propertyName, String 
value, SamzaPropertyAssertion assertion) {
+    // streams.<streamId>.<property>
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
propertyName, value,
+        // all streams need to have a system
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM))),
+        SYSTEM_STREAM);
+
+    // systems.<system>.streams.<stream>.<property> where stream id has no 
specified physical stream
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, value,
+        // specify the system for the stream id
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), 
SYSTEM))),
+        SYSTEM_STREAM);
+
+    // systems.<system>.streams.<stream>.<property> where stream is the 
streamId, system is from job default system
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, value,
+        // use job default system to get the system
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM))),
+        SYSTEM_STREAM);
+
+    // systems.<system>.default.stream.<property> where stream id has no 
specified physical stream
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, 
SYSTEM) + propertyName, value,
+        // specify the system for the stream id
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), 
SYSTEM))),
+        SYSTEM_STREAM);
+
+    // systems.<system>.streams.<stream>.<property> where no system mapping 
(fall back to SystemStream)
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, value,
+        // map the stream id to the physical stream
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM))),
+        SYSTEM_STREAM);
+
+    // systems.<system>.default.stream.<property> where no system mapping 
(fall back to SystemStream)
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, 
SYSTEM) + propertyName, value,
+        // map the stream id to the physical stream
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM))),
+        SYSTEM_STREAM);
+
+    // systems.<system>.streams.<stream>.<property> where stream id has a 
physical name but property is from stream id
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, value,
+        // use job default system to get the system
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+        // map the stream id to the physical stream
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM))),
+        SYSTEM_STREAM);
+
+    // systems.<system>.default.stream.<property> where stream id has a 
physical name but property is from stream id
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, 
SYSTEM) + propertyName, value,
+        // use job default system to get the system
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+        // map the stream id to the physical stream
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM))),
+        SYSTEM_STREAM);
+  }
 
-    Map<String, String> configMap = new HashMap<>();
-    for (int i = 0; i < kvs.length - 1; i += 2) {
-      configMap.put(kvs[i], kvs[i + 1]);
-    }
-    return new StreamConfig(new MapConfig(configMap));
+  /**
+   * Tests for Samza property access using different lookup methods, when 
using physical stream in system stream.
+   */
+  private static void doTestSamzaPropertyAccessWithPhysicalStream(String 
propertyName, String value,
+      SamzaPropertyAssertion assertion) {
+    // streams.<streamId>.<property>
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
propertyName, value,
+        // all streams need to have a system
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+        // map the stream id to the physical stream
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM))),
+        SYSTEM_STREAM_PHYSICAL);
+
+    // systems.<system>.streams.<stream>.<property> with a specific system for 
the stream id
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, PHYSICAL_STREAM) + 
propertyName, value,
+        // specify the system for the stream id
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+        // map the stream id to the physical stream
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM))),
+        SYSTEM_STREAM_PHYSICAL);
+
+    // systems.<system>.streams.<stream>.<property> with system coming from 
job default system
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, PHYSICAL_STREAM) + 
propertyName, value,
+        // use job default system to get the system
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+        // map the stream id to the physical stream
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM))),
+        SYSTEM_STREAM_PHYSICAL);
+
+    // systems.<system>.default.stream.<property>
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, 
SYSTEM) + propertyName, value,
+        // specify the system for the stream id
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+        // map the stream id to the physical stream
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM))),
+        SYSTEM_STREAM_PHYSICAL);
   }
 
-  private String buildProp(String streamId, String suffix) {
-    return String.format(STREAM_ID_PATTERN, streamId) + suffix;
+  /**
+   * Tests that certain properties have priority over others. For conciseness, 
this will not explicitly test priorities
+   * for all different ways of specifying a property. This will compare two 
options at a time, and then we can infer
+   * priorities transitively.
+   */
+  private static void doTestSamzaPropertyPriority(String propertyName, String 
value, SamzaPropertyAssertion assertion) {
+    // streams.<streamId>.<property> vs. 
systems.<system>.streams.<stream>.<property>
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
propertyName, value,
+        // all streams need to have a system
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+        // this config should not be used
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, UNUSED_VALUE))),
+        SYSTEM_STREAM);
+
+    // systems.<system>.streams.<stream>.<property> vs. 
systems.<system>.default.stream.<property>
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, value,
+        // specify the system for the stream id
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+        // this config should not be used
+        String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, 
SYSTEM) + propertyName, UNUSED_VALUE))),
+        SYSTEM_STREAM);
+
+    /*
+     * The next logical case to check would be 
systems.<system>.default.stream.<property> with a system mapping in the
+     * config vs. systems.<system>.streams.<stream>.<property> with a system 
mapping, but that is not possible, so move
+     * on to the next case.
+     */
+
+    /*
+     * systems.<system>.streams.<stream>.<property> without a system mapping 
in the config vs.
+     * systems.<system>.default.stream.<property> without a system mapping in 
the config
+     */
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, value,
+        // this config should not be used
+        String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, 
SYSTEM) + propertyName, UNUSED_VALUE))),
+        SYSTEM_STREAM);
   }
 
-  private String buildProp(SystemStream systemStream, String suffix) {
-    return String.format(SYSTEM_STREAM_PATTERN, systemStream.getSystem(), 
systemStream.getStream()) + suffix;
+  /**
+   * Tests for Samza property access in which multiple streams are configured.
+   * Only testing cases in which streams may share some properties.
+   */
+  private static void doTestSamzaPropertyMultipleStreams(String propertyName, 
String value, SamzaPropertyAssertion assertion) {
+    // systems.<system>.default.stream.<property> where stream id has no 
specified physical stream
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, 
SYSTEM) + propertyName, value,
+        // specify the systems for the streams
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+        String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), OTHER_STREAM_ID), 
SYSTEM))),
+        SYSTEM_STREAM);
   }
 
-  private String buildSystemDefaultProp(String system, String suffix) {
-    return String.format(SYSTEM_DEFAULT_STREAM_PATTERN, system) + suffix;
+  private static void doTestSamzaPropertyInvalidConfig(SamzaPropertyLookup 
lookup) {
+    // configure physical stream and have mapping from stream id to physical 
stream
+    StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM,
+        String.format(StreamConfig.STREAM_ID_PREFIX(), PHYSICAL_STREAM) + 
".property", "value",
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM)));
+    try {
+      lookup.doLookup(streamConfig, SYSTEM_STREAM_PHYSICAL);
+      fail("Expected an exception due to having too many mappings to a 
physical stream");
+    } catch (IllegalStateException e) {
+      // expected to reach here
+    }
+
+    // two separate stream ids map to same physical stream
+    streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM,
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
OTHER_STREAM_ID), PHYSICAL_STREAM,
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM)));
+    try {
+      lookup.doLookup(streamConfig, SYSTEM_STREAM_PHYSICAL);
+      fail("Expected an exception due to having too many mappings to a 
physical stream");
+    } catch (IllegalStateException e) {
+      // expected to reach here
+    }
   }
 
-  private Config addConfigs(Config original, String... kvs) {
-    Map<String, String> result = new HashMap<>();
-    result.putAll(original);
-    result.putAll(buildConfig(kvs));
-    return new MapConfig(result);
+  private static void doTestSamzaPropertyDoesNotExist(String propertyName, 
SamzaPropertyAssertion assertion) {
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        // just put in some value which will be ignored
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
SAMZA_IGNORED_PROPERTY, UNUSED_VALUE))),
+        SYSTEM_STREAM);
+
+    /*
+     * Won't use streams.<streamId>.<property> if streamId is mapped to a 
physical stream
+     */
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) + 
propertyName, UNUSED_VALUE,
+        // all streams need to have a system
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+        // map the stream id to the physical stream
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM))),
+        SYSTEM_STREAM);
+
+    /*
+     * Won't use systems.<system>.streams.<stream>.<property> if stream is 
mapped to a physical stream and the property
+     * is only specified using the physical stream
+     */
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, PHYSICAL_STREAM) + 
propertyName, UNUSED_VALUE,
+        // all streams need to have a system
+        JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+        // map the stream id to the physical stream
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM))),
+        SYSTEM_STREAM);
+
+    /*
+     * Won't use systems.<system>.streams.<stream>.<property> if stream is 
mapped to a physical stream and there is no
+     * system mapping for the stream id
+     */
+    assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+        String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) + 
propertyName, UNUSED_VALUE,
+        // map the stream id to the physical stream
+        String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID), 
PHYSICAL_STREAM))),
+        SYSTEM_STREAM_PHYSICAL);
   }
 
-}
+  /**
+   * Used to execute assertions for tests which look up specific Samza 
properties.
+   */
+  @FunctionalInterface
+  private interface SamzaPropertyAssertion {
+    void doAssertion(StreamConfig streamConfig, SystemStream systemStream);
+  }
+
+  /**
+   * Used to specify how to look up specific Samza properties.
+   */
+  @FunctionalInterface
+  private interface SamzaPropertyLookup {
+    void doLookup(StreamConfig streamConfig, SystemStream systemStream);
+  }
+}
\ No newline at end of file

Reply via email to