This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 6d91400 SAMZA-2328: [Scala cleanup] Convert StreamConfig to Java
(part 1: update test coverage and clarify functionality in javadocs) (#1166)
6d91400 is described below
commit 6d91400536612aa75af752fa66a31f99236c7b54
Author: Cameron Lee <[email protected]>
AuthorDate: Thu Sep 26 09:16:15 2019 -0700
SAMZA-2328: [Scala cleanup] Convert StreamConfig to Java (part 1: update
test coverage and clarify functionality in javadocs) (#1166)
---
.../java/org/apache/samza/config/SystemConfig.java | 3 +-
.../org/apache/samza/config/StreamConfig.scala | 167 ++--
.../org/apache/samza/config/TestStreamConfig.java | 853 ++++++++++++++++-----
3 files changed, 780 insertions(+), 243 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
index 93a0c32..1403b0d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
@@ -40,8 +40,7 @@ public class SystemConfig extends MapConfig {
private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
public static final String SYSTEM_FACTORY_FORMAT = SYSTEMS_PREFIX + "%s" +
SYSTEM_FACTORY_SUFFIX;
- @VisibleForTesting
- static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_ID_PREFIX
+ "default.stream.";
+ public static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT =
SYSTEM_ID_PREFIX + "default.stream.";
// If true, automatically delete committed messages from streams whose
committed messages can be deleted.
// A stream's committed messages can be deleted if it is a intermediate
stream, or if user has manually
diff --git
a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 321af6e..6030b93 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -58,6 +58,23 @@ object StreamConfig {
implicit def Config2Stream(config: Config) = new StreamConfig(config)
}
+/**
+ * Helper for accessing configs related to stream properties.
+ *
+ * For most configs, this currently supports two different formats for
specifying stream properties:
+ * 1) "streams.{streamId}.{property}" (recommended to use this format)
+ * 2) "systems.{systemName}.streams.{streamName}.{property}" (legacy)
+ * Note that some config lookups are only supported through the
"streams.{streamId}.{property}". See the specific
+ * accessor method to determine which formats are supported.
+ *
+ * Summary of terms:
+ * - streamId: logical identifier used for a stream; configs are specified
using this streamId
+ * - physical stream: concrete name for a stream (if the physical stream is
not explicitly configured, then the streamId
+ * is used as the physical stream
+ * - streamName: within the javadoc for this class, streamName is the same as
physical stream
+ * - samza property: property which is Samza-specific, which will have
"samza." as a prefix (e.g. "samza.key.serde");
+ * this is in contrast to stream-specific properties which are related to
specific stream technologies
+ */
class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging
{
def getStreamMsgSerde(systemStream: SystemStream) =
nonEmptyOption(getSamzaProperty(systemStream, StreamConfig.MSG_SERDE))
@@ -118,18 +135,19 @@ class StreamConfig(config: Config) extends
ScalaMapConfig(config) with Logging {
}
/**
- * Gets the properties for the specified streamId from the config.
- * It first applies any legacy configs from this config location:
- * systems.{system}.streams.{stream}.*
- *
- * It then overrides them with properties of the new config format:
- * streams.{streamId}.*
- *
- * Only returns properties of the stream itself, not any of the samza
properties for the stream.
- *
- * @param streamId the identifier for the stream in the config.
- * @return the merged map of config properties from both the legacy
and new config styles
- */
+ * Gets the properties for the streamId which are not Samza properties (i.e.
do not have a "samza." prefix). This
+ * includes current and legacy config styles.
+ * This will return a Config with the properties that match the following
formats (if a property is specified through
+ * multiple formats, priority is top to bottom):
+ * 1) "streams.{streamId}.{property}"
+ * 2) "systems.{systemName}.streams.{streamName}.{property}" where
systemName is the system mapped to the streamId in
+ * the config and streamName is the physical stream name mapped to the
stream id
+ * 3) "systems.{systemName}.default.stream.{property}" where systemName is
the system mapped to the streamId in the
+ * config
+ *
+ * @param streamId the identifier for the stream in the config.
+ * @return the merged map of config properties from both the legacy
and new config styles
+ */
def getStreamProperties(streamId: String) = {
val allProperties = getAllStreamProperties(streamId)
val samzaProperties = allProperties.subset(StreamConfig.SAMZA_PROPERTY,
false)
@@ -199,16 +217,31 @@ class StreamConfig(config: Config) extends
ScalaMapConfig(config) with Logging {
}
/**
- * Gets the specified Samza property for a SystemStream. A Samza property
is a property that controls how Samza
- * interacts with the stream, as opposed to a property of the stream itself.
- *
- * Note, because the translation is not perfect between SystemStream and
streamId,
- * this method is not identical to getProperty(streamId, property)
- *
- * @param systemStream the SystemStream for which the property value will
be retrieved.
- * @param property the samza property name excluding the leading delimiter.
e.g. "samza.x.y"
- */
- protected def getSamzaProperty(systemStream: SystemStream, property:
String): String = {
+ * Gets the specified Samza property for a SystemStream. A Samza property is
a property that controls how Samza
+ * interacts with the stream, as opposed to a property of the stream itself.
+ *
+ * First, tries to map the systemStream to a streamId. This will only find a
streamId if the stream is a physical name
+ * (explicitly mapped physical name or a stream id without a physical name
mapping). That means this will not map a
+ * stream id to itself if there is a mapping from the stream id to a
physical stream name. This also requires that the
+ * stream id is mapped to a system in the config.
+ * If a stream id is found:
+ * 1) Look for "streams.{streamId}.{property}" for the stream id.
+ * 2) Otherwise, look for
"systems.{systemName}.streams.{streamName}.{property}" in which the systemName
is the system
+ * mapped to the stream id and the streamName is the physical stream name
for the stream id.
+ * 3) Otherwise, look for "systems.{systemName}.default.stream.{property}"
in which the systemName is the system
+ * mapped to the stream id.
+ * If a stream id was not found or no property could be found using the
above keys:
+ * 1) Look for "systems.{systemName}.streams.{streamName}.{property}" in
which the systemName is the system in the
+ * input systemStream and the streamName is the stream from the input
systemStream.
+ * 2) Otherwise, look for "systems.{systemName}.default.stream.{property}"
in which the systemName is the system
+ * in the input systemStream.
+ *
+ * @param systemStream the SystemStream for which the property value will be
retrieved.
+ * @param property the samza property key (including the "samza." prefix);
for example, for both
+ * "streams.streamId.samza.prop.key" and
"systems.system.streams.streamName.samza.prop.key", this
+ * argument should have the value "samza.prop.key"
+ */
+ private def getSamzaProperty(systemStream: SystemStream, property: String):
String = {
if (!property.startsWith(StreamConfig.SAMZA_PROPERTY)) {
throw new IllegalArgumentException(
"Attempt to fetch a non samza property for SystemStream %s named %s"
format(systemStream, property))
@@ -223,18 +256,16 @@ class StreamConfig(config: Config) extends
ScalaMapConfig(config) with Logging {
}
/**
- * Gets the specified Samza property for a SystemStream. A Samza property
is a property that controls how Samza
- * interacts with the stream, as opposed to a property of the stream itself.
- *
- * Note, because the translation is not perfect between SystemStream and
streamId,
- * this method is not identical to getProperty(streamId, property)
- *
- * @param systemStream the SystemStream for which the property value will
be retrieved.
- * @param property the samza property name excluding the leading delimiter.
e.g. "samza.x.y"
- * @param defaultValue the default value to use if the property value is
not found
- *
- */
- protected def getSamzaProperty(systemStream: SystemStream, property: String,
defaultValue: String): String = {
+ * Gets a Samza property, with a default value used if no property value is
found.
+ * See getSamzaProperty(SystemStream, String).
+ *
+ * @param systemStream the SystemStream for which the property value will be
retrieved.
+ * @param property the samza property key (including the "samza." prefix);
for example, for both
+ * "streams.streamId.samza.prop.key" and
"systems.system.streams.streamName.samza.prop.key", this
+ * argument should have the value "samza.prop.key"
+ * @param defaultValue the default value to use if the property value is not
found
+ */
+ private def getSamzaProperty(systemStream: SystemStream, property: String,
defaultValue: String): String = {
val streamVal = getSamzaProperty(systemStream, property)
if (streamVal != null) {
@@ -245,13 +276,15 @@ class StreamConfig(config: Config) extends
ScalaMapConfig(config) with Logging {
}
/**
- * Gets the specified Samza property for a SystemStream. A Samza property
is a property that controls how Samza
- * interacts with the stream, as opposed to a property of the stream itself.
- *
- * Note, because the translation is not perfect between SystemStream and
streamId,
- * this method is not identical to getProperty(streamId, property)
- */
- protected def containsSamzaProperty(systemStream: SystemStream, property:
String): Boolean = {
+ * Determines if a Samza property is specified.
+ * See getSamzaProperty(SystemStream, String).
+ *
+ * @param systemStream the SystemStream for the property value to check
+ * @param property the samza property key (including the "samza." prefix);
for example, for both
+ * "streams.streamId.samza.prop.key" and
"systems.system.streams.streamName.samza.prop.key", this
+ * argument should have the value "samza.prop.key"
+ */
+ private def containsSamzaProperty(systemStream: SystemStream, property:
String): Boolean = {
if (!property.startsWith(StreamConfig.SAMZA_PROPERTY)) {
throw new IllegalArgumentException(
"Attempt to fetch a non samza property for SystemStream %s named %s"
format(systemStream, property))
@@ -261,13 +294,16 @@ class StreamConfig(config: Config) extends
ScalaMapConfig(config) with Logging {
/**
- * Gets the stream properties from the legacy config style:
- * systems.{system}.streams.{streams}.*
- *
- * @param systemName the system name under which the properties are
configured
- * @param streamName the stream name
- * @return the map of properties for the stream
- */
+ * Finds the properties from the legacy config style (config key includes
system).
+ * This will return a Config with the properties that match the following
formats (if a property is specified through
+ * multiple formats, priority is top to bottom):
+ * 1) "systems.{systemName}.streams.{streamName}.{property}"
+ * 2) "systems.{systemName}.default.stream.{property}"
+ *
+ * @param systemName the system name under which the properties are
configured
+ * @param streamName the stream name
+ * @return the map of properties for the stream
+ */
private def getSystemStreamProperties(systemName: String, streamName:
String) = {
if (systemName == null) {
Map()
@@ -279,16 +315,18 @@ class StreamConfig(config: Config) extends
ScalaMapConfig(config) with Logging {
}
/**
- * Gets the properties for the specified streamId from the config.
- * It first applies any legacy configs from this config location:
- * systems.{system}.streams.{stream}.*
- *
- * It then overrides them with properties of the new config format:
- * streams.{streamId}.*
- *
- * @param streamId the identifier for the stream in the config.
- * @return the merged map of config properties from both the legacy
and new config styles
- */
+ * Gets all of the properties for the specified streamId (includes current
and legacy config styles).
+ * This will return a Config with the properties that match the following
formats (if a property is specified through
+ * multiple formats, priority is top to bottom):
+ * 1) "streams.{streamId}.{property}"
+ * 2) "systems.{systemName}.streams.{streamName}.{property}" where
systemName is the system mapped to the streamId in
+ * the config and streamName is the physical stream name mapped to the
stream id
+ * 3) "systems.{systemName}.default.stream.{property}" where systemName is
the system mapped to the streamId in the
+ * config
+ *
+ * @param streamId the identifier for the stream in the config.
+ * @return the merged map of config properties from both the legacy
and new config styles
+ */
private def getAllStreamProperties(streamId: String) = {
val allProperties = subset(StreamConfig.STREAM_ID_PREFIX format streamId)
val inheritedLegacyProperties: java.util.Map[String, String] =
@@ -300,7 +338,18 @@ class StreamConfig(config: Config) extends
ScalaMapConfig(config) with Logging {
getStreamIds().filter(streamId => system.equals(getSystem(streamId)))
}
- def systemStreamToStreamId(systemStream: SystemStream): String = {
+ /**
+ * Finds the stream id which corresponds to the systemStream.
+ * This finds the stream id that is mapped to the system in systemStream
through the config and that has a physical
+ * name (the physical name might be the stream id itself if there is no
explicit mapping) that matches the stream in
+ * systemStream.
+ * Note: If the stream in the systemStream is a stream id which is mapped to
a physical stream, then that stream won't
+ * be returned as a stream id here, since the stream in systemStream doesn't
match the physical stream name.
+ *
+ * @param systemStream system stream to map to stream id
+ * @return stream id corresponding to the system stream
+ */
+ private def systemStreamToStreamId(systemStream: SystemStream): String = {
val streamIds = getStreamIdsForSystem(systemStream.getSystem)
.filter(streamId =>
systemStream.getStream().equals(getPhysicalName(streamId)))
if (streamIds.size > 1) {
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
index 474a382..b91eeb0 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
@@ -18,252 +18,741 @@
*/
package org.apache.samza.config;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Collections;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
import org.apache.samza.system.SystemStream;
import org.junit.Test;
+import scala.Option;
+import scala.collection.JavaConverters;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
-public class TestStreamConfig {
- private static final String SYSTEM_STREAM_PATTERN = "systems.%s.streams.%s.";
- private static final String STREAM_ID_PATTERN = "streams.%s.";
-
- private static final String STREAM1_SYSTEM = "Sys1";
- private static final String STREAM1_PHYSICAL_NAME = "Str1";
- private static final String STREAM1_STREAM_ID = "streamId1";
- private static final SystemStream SYSTEM_STREAM_1 = new
SystemStream(STREAM1_SYSTEM, STREAM1_PHYSICAL_NAME);
- private static final String STREAM2_SYSTEM = "Sys2";
- private static final String STREAM2_PHYSICAL_NAME = "Str2";
- private static final String STREAM2_STREAM_ID = "streamId2";
- private static final SystemStream SYSTEM_STREAM_2 = new
SystemStream(STREAM2_SYSTEM, STREAM2_PHYSICAL_NAME);
-
- private static final String STREAM3_SYSTEM = "Sys3";
- private static final String STREAM3_PHYSICAL_NAME = "Str3";
- private static final String STREAM3_STREAM_ID = "streamId3";
- private static final SystemStream SYSTEM_STREAM_3 = new
SystemStream(STREAM3_SYSTEM, STREAM3_PHYSICAL_NAME);
-
- private static final String SYSTEM_DEFAULT_STREAM_PATTERN =
"systems.%s.default.stream.";
+public class TestStreamConfig {
+ private static final String SYSTEM = "system";
+ private static final String STREAM_ID = "streamId";
+ private static final SystemStream SYSTEM_STREAM = new SystemStream(SYSTEM,
STREAM_ID);
+ private static final String OTHER_STREAM_ID = "otherStreamId";
+ private static final String PHYSICAL_STREAM = "physicalStream";
+ private static final SystemStream SYSTEM_STREAM_PHYSICAL = new
SystemStream(SYSTEM, PHYSICAL_STREAM);
+ private static final String SAMZA_IGNORED_PROPERTY =
"samza.ignored.property";
+ private static final String UNUSED_VALUE = "should_not_be_used";
+ @Test
+ public void testGetStreamMsgSerde() {
+ String value = "my.msg.serde";
+ doTestSamzaProperty(StreamConfig.MSG_SERDE(), value,
+ (config, systemStream) -> assertEquals(Option.apply(value),
config.getStreamMsgSerde(systemStream)));
+ doTestSamzaProperty(StreamConfig.MSG_SERDE(), "",
+ (config, systemStream) -> assertEquals(Option.empty(),
config.getStreamMsgSerde(systemStream)));
+ doTestSamzaPropertyDoesNotExist(StreamConfig.MSG_SERDE(),
+ (config, systemStream) -> assertEquals(Option.empty(),
config.getStreamMsgSerde(systemStream)));
+ doTestSamzaPropertyInvalidConfig(StreamConfig::getStreamMsgSerde);
+ }
- @Test(expected = IllegalArgumentException.class)
- public void testGetSamzaPropertyThrowsIfInvalidPropertyName() {
- StreamConfig config = buildConfig("key1", "value1", "key2", "value2");
- config.getSamzaProperty(SYSTEM_STREAM_1, "key1");
+ @Test
+ public void testGetStreamKeySerde() {
+ String value = "my.key.serde";
+ doTestSamzaProperty(StreamConfig.KEY_SERDE(), value,
+ (config, systemStream) -> assertEquals(Option.apply(value),
config.getStreamKeySerde(systemStream)));
+ doTestSamzaProperty(StreamConfig.KEY_SERDE(), "",
+ (config, systemStream) -> assertEquals(Option.empty(),
config.getStreamKeySerde(systemStream)));
+ doTestSamzaPropertyDoesNotExist(StreamConfig.KEY_SERDE(),
+ (config, systemStream) -> assertEquals(Option.empty(),
config.getStreamKeySerde(systemStream)));
+ doTestSamzaPropertyInvalidConfig(StreamConfig::getStreamKeySerde);
}
- @Test(expected = IllegalArgumentException.class)
- public void testGetSamzaPropertyWithDefaultThrowsIfInvalidPropertyName() {
- StreamConfig config = buildConfig("key1", "value1", "key2", "value2");
- config.getSamzaProperty(SYSTEM_STREAM_1, "key1", "default");
+ @Test
+ public void testGetResetOffset() {
+ doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "true",
+ (config, systemStream) ->
assertTrue(config.getResetOffset(systemStream)));
+ doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "false",
+ (config, systemStream) ->
assertFalse(config.getResetOffset(systemStream)));
+ // if not true/false, then use false
+ doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "unknown_value",
+ (config, systemStream) ->
assertFalse(config.getResetOffset(systemStream)));
+ doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_RESET_OFFSET(),
+ (config, systemStream) ->
assertFalse(config.getResetOffset(systemStream)));
+ doTestSamzaPropertyInvalidConfig(StreamConfig::getResetOffset);
}
- // 00
@Test
- public void testGetSamzaPropertyDoesNotExist() {
- StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1,
"samza.key1"), "value1", "key2", "value2");
- assertNull(config.getSamzaProperty(SYSTEM_STREAM_1,
"samza.keyNonExistent"));
+ public void testIsResetOffsetConfigured() {
+ doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "true",
+ (config, systemStream) ->
assertTrue(config.isResetOffsetConfigured(systemStream)));
+ doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "false",
+ (config, systemStream) ->
assertTrue(config.isResetOffsetConfigured(systemStream)));
+ // if not true/false, then use false
+ doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET(), "unknown_value",
+ (config, systemStream) ->
assertTrue(config.isResetOffsetConfigured(systemStream)));
+ doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_RESET_OFFSET(),
+ (config, systemStream) ->
assertFalse(config.isResetOffsetConfigured(systemStream)));
+ doTestSamzaPropertyInvalidConfig(StreamConfig::isResetOffsetConfigured);
}
- // 01
@Test
- public void testGetSamzaPropertyFromSystemStream() {
- StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1,
"samza.key1"), "value1", "key2", "value2");
- assertEquals("value1", config.getSamzaProperty(SYSTEM_STREAM_1,
"samza.key1"));
+ public void testGetDefaultStreamOffset() {
+ String value = "my_offset_default";
+ doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT(), value,
+ (config, systemStream) -> assertEquals(Option.apply(value),
config.getDefaultStreamOffset(systemStream)));
+ doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT(), "",
+ (config, systemStream) -> assertEquals(Option.apply(""),
config.getDefaultStreamOffset(systemStream)));
+ doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_OFFSET_DEFAULT(),
+ (config, systemStream) -> assertEquals(Option.empty(),
+ new StreamConfig(config).getDefaultStreamOffset(systemStream)));
+ doTestSamzaPropertyInvalidConfig(StreamConfig::getDefaultStreamOffset);
}
- // 10
@Test
- public void testGetSamzaPropertyFromStreamId() {
- StreamConfig config = buildConfig("key1", "value1",
- buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2",
- buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
- buildProp(STREAM1_STREAM_ID, StreamConfig.BROADCAST()), "true",
- buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()),
STREAM1_PHYSICAL_NAME);
- assertEquals("value2", config.getSamzaProperty(SYSTEM_STREAM_1,
"samza.key2"));
- assertEquals("true", config.getSamzaProperty(SYSTEM_STREAM_1,
"samza.broadcast"));
+ public void testIsDefaultStreamOffsetConfigured() {
+ String value = "my_offset_default";
+ doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT(), value,
+ (config, systemStream) ->
assertTrue(config.isDefaultStreamOffsetConfigured(systemStream)));
+ doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT(), "",
+ (config, systemStream) ->
assertTrue(config.isDefaultStreamOffsetConfigured(systemStream)));
+ doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_OFFSET_DEFAULT(),
+ (config, systemStream) ->
assertFalse(config.isDefaultStreamOffsetConfigured(systemStream)));
+
doTestSamzaPropertyInvalidConfig(StreamConfig::isDefaultStreamOffsetConfigured);
}
- // 11
@Test
- public void testGetSamzaPropertyFromSystemStreamAndStreamId() {
- StreamConfig config = buildConfig("key1", "value1",
- buildProp(SYSTEM_STREAM_1, "samza.key2"), "value2",
- buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2OVERRIDE",
- buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
- buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()),
STREAM1_PHYSICAL_NAME);
- assertEquals("value2OVERRIDE", config.getSamzaProperty(SYSTEM_STREAM_1,
"samza.key2"));
+ public void testGetBootstrapEnabled() {
+ doTestSamzaProperty(StreamConfig.BOOTSTRAP(), "true",
+ (config, systemStream) ->
assertTrue(config.getBootstrapEnabled(systemStream)));
+ doTestSamzaProperty(StreamConfig.BOOTSTRAP(), "false",
+ (config, systemStream) ->
assertFalse(config.getBootstrapEnabled(systemStream)));
+ // if not true/false, then use false
+ doTestSamzaProperty(StreamConfig.BOOTSTRAP(), "unknown_value",
+ (config, systemStream) ->
assertFalse(config.getBootstrapEnabled(systemStream)));
+ doTestSamzaPropertyDoesNotExist(StreamConfig.BOOTSTRAP(),
+ (config, systemStream) ->
assertFalse(config.getBootstrapEnabled(systemStream)));
+ doTestSamzaPropertyInvalidConfig(StreamConfig::getBootstrapEnabled);
}
- @Test(expected = IllegalArgumentException.class)
- public void testContainsSamzaPropertyThrowsIfInvalidPropertyName() {
- StreamConfig config = buildConfig("key1", "value1", "key2", "value2");
- config.containsSamzaProperty(new SystemStream("SysX", "StrX"), "key1");
+ @Test
+ public void testGetBroadcastEnabled() {
+ doTestSamzaProperty(StreamConfig.BROADCAST(), "true",
+ (config, systemStream) ->
assertTrue(config.getBroadcastEnabled(systemStream)));
+ doTestSamzaProperty(StreamConfig.BROADCAST(), "false",
+ (config, systemStream) ->
assertFalse(config.getBroadcastEnabled(systemStream)));
+ // if not true/false, then use false
+ doTestSamzaProperty(StreamConfig.BROADCAST(), "unknown_value",
+ (config, systemStream) ->
assertFalse(config.getBroadcastEnabled(systemStream)));
+ doTestSamzaPropertyDoesNotExist(StreamConfig.BROADCAST(),
+ (config, systemStream) ->
assertFalse(config.getBroadcastEnabled(systemStream)));
+ doTestSamzaPropertyInvalidConfig(StreamConfig::getBroadcastEnabled);
}
- // 00
@Test
- public void testContainsSamzaPropertyDoesNotExist() {
- StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1,
"samza.key1"), "value1", "key2", "value2");
- assertFalse(config.containsSamzaProperty(SYSTEM_STREAM_1,
"samza.keyNonExistent"));
+ public void testGetPriority() {
+ doTestSamzaProperty(StreamConfig.PRIORITY(), "0",
+ (config, systemStream) -> assertEquals(0,
config.getPriority(systemStream)));
+ doTestSamzaProperty(StreamConfig.PRIORITY(), "100",
+ (config, systemStream) -> assertEquals(100,
config.getPriority(systemStream)));
+ doTestSamzaProperty(StreamConfig.PRIORITY(), "-1",
+ (config, systemStream) -> assertEquals(-1,
config.getPriority(systemStream)));
+ doTestSamzaPropertyDoesNotExist(StreamConfig.PRIORITY(),
+ (config, systemStream) -> assertEquals(-1,
config.getPriority(systemStream)));
+ doTestSamzaPropertyInvalidConfig(StreamConfig::getPriority);
}
- // 01
@Test
- public void testContainsSamzaPropertyFromSystemStream() {
- StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1,
"samza.key1"), "value1", "key2", "value2");
- assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key1"));
+ public void testGetSerdeStreams() {
+ assertEquals(Collections.emptySet(),
+ JavaConverters.setAsJavaSetConverter(new StreamConfig(new
MapConfig()).getSerdeStreams(SYSTEM)).asJava());
+
+ // not key/msg serde property for "streams."
+ StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
SAMZA_IGNORED_PROPERTY, UNUSED_VALUE)));
+ assertEquals(Collections.emptySet(),
+
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+ // not matching system for "streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
StreamConfig.KEY_SERDE(), UNUSED_VALUE,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID),
"otherSystem")));
+ assertEquals(Collections.emptySet(),
+
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+ // not key/msg serde property for "systems.<system>.streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
SAMZA_IGNORED_PROPERTY, UNUSED_VALUE)));
+ assertEquals(Collections.emptySet(),
+
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+ // not matching system for "systems.<system>.streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), "otherSystem", STREAM_ID)
+ StreamConfig.KEY_SERDE(),
+ UNUSED_VALUE)));
+ assertEquals(Collections.emptySet(),
+
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+ // not matching system for "streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
StreamConfig.KEY_SERDE(), UNUSED_VALUE,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID),
"otherSystem")));
+ assertEquals(Collections.emptySet(),
+
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+ String serdeValue = "my.serde.class";
+
+ // key serde for "streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
StreamConfig.KEY_SERDE(), serdeValue)));
+ assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)),
+
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+ // msg serde for "streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
StreamConfig.MSG_SERDE(), serdeValue)));
+ assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)),
+
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+ // serde for "streams." with physical stream name mapping
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM,
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
StreamConfig.KEY_SERDE(), serdeValue)));
+ assertEquals(Collections.singleton(new SystemStream(SYSTEM,
PHYSICAL_STREAM)),
+
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+ // key serde for "systems.<system>.streams."
+ streamConfig = new StreamConfig(new MapConfig(
+ ImmutableMap.of(String.format(StreamConfig.STREAM_PREFIX(), SYSTEM,
STREAM_ID) + StreamConfig.KEY_SERDE(),
+ serdeValue)));
+ assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)),
+
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+ // msg serde for "systems.<system>.streams."
+ streamConfig = new StreamConfig(new MapConfig(
+ ImmutableMap.of(String.format(StreamConfig.STREAM_PREFIX(), SYSTEM,
STREAM_ID) + StreamConfig.MSG_SERDE(),
+ serdeValue)));
+ assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)),
+
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
+
+ // merge several different ways of providing serdes
+ String streamIdWithPhysicalName = "streamIdWithPhysicalName";
+ streamConfig = new StreamConfig(new MapConfig(new
ImmutableMap.Builder<String, String>()
+ // need to map the stream ids to the system
+ .put(JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM)
+ // key and msg serde for "streams."
+ .put(String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
StreamConfig.KEY_SERDE(), serdeValue)
+ .put(String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
StreamConfig.MSG_SERDE(), serdeValue)
+ // key serde for "streams." with physical stream name mapping
+ .put(String.format(StreamConfig.STREAM_ID_PREFIX(),
streamIdWithPhysicalName) + StreamConfig.KEY_SERDE(),
+ serdeValue)
+ .put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(),
streamIdWithPhysicalName), PHYSICAL_STREAM)
+ // key serde for "systems.<system>.streams."
+ .put(String.format(StreamConfig.STREAM_PREFIX(), SYSTEM,
OTHER_STREAM_ID) + StreamConfig.KEY_SERDE(),
+ serdeValue)
+ .build()));
+ assertEquals(Sets.newHashSet(new SystemStream(SYSTEM, STREAM_ID), new
SystemStream(SYSTEM, PHYSICAL_STREAM),
+ new SystemStream(SYSTEM, OTHER_STREAM_ID)),
+
JavaConverters.setAsJavaSetConverter(streamConfig.getSerdeStreams(SYSTEM)).asJava());
}
- // 10
@Test
- public void testContainsSamzaPropertyFromStreamId() {
- StreamConfig config = buildConfig("key1", "value1",
- buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2",
- buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
- buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()),
STREAM1_PHYSICAL_NAME);
- assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key2"));
+ public void testGetStreamProperties() {
+ assertEquals(new MapConfig(), new StreamConfig(new
MapConfig()).getStreamProperties(STREAM_ID));
+
+ String propertyName = "stream.property.name";
+
+ // BEGIN: tests in which properties cannot be found in the config
+
+ // not matching stream id for "streams."
+ StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), OTHER_STREAM_ID) +
propertyName, UNUSED_VALUE)));
+ assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
+
+ // not matching stream id for "systems.<system>.streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, OTHER_STREAM_ID) +
propertyName, UNUSED_VALUE,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), OTHER_STREAM_ID),
SYSTEM)));
+ assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
+
+ // no system mapping when using "systems.<system>.streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, UNUSED_VALUE)));
+ assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
+
+ // ignore property with "samza" prefix for "streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
SAMZA_IGNORED_PROPERTY, UNUSED_VALUE)));
+ assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
+
+ // ignore property with "samza" prefix for "streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
SAMZA_IGNORED_PROPERTY, UNUSED_VALUE,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID),
SYSTEM)));
+ assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
+
+ // should not map physical name back to stream id if physical name is
passed as stream id
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, UNUSED_VALUE,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM)));
+ assertEquals(new MapConfig(),
streamConfig.getStreamProperties(PHYSICAL_STREAM));
+
+ // BEGIN: tests in which properties can be found in the config
+
+ String propertyValue = "value";
+
+ // "streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
propertyName, propertyValue)));
+ assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+ streamConfig.getStreamProperties(STREAM_ID));
+
+ // "systems.<system>.streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, propertyValue,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID),
SYSTEM)));
+ assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+ streamConfig.getStreamProperties(STREAM_ID));
+
+ // "systems.<system>.default.stream."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT,
SYSTEM) + propertyName, propertyValue,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID),
SYSTEM)));
+ assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+ streamConfig.getStreamProperties(STREAM_ID));
+
+ // use physical name mapping for "systems.<system>.streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, PHYSICAL_STREAM) +
propertyName, propertyValue,
+ // should not use stream id since there is physical stream
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, UNUSED_VALUE,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM)));
+ assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+ streamConfig.getStreamProperties(STREAM_ID));
+
+ // "streams." should override "systems.<system>.streams."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
propertyName, propertyValue,
+ // should not use "systems.<system>.streams." since there is a
"streams." config
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, UNUSED_VALUE,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID),
SYSTEM)));
+ assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+ streamConfig.getStreamProperties(STREAM_ID));
+
+ // "systems.<system>.streams." should override
"systems.<system>.default.stream."
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, propertyValue,
+ // should not use "systems.<system>.default.stream." since there is a
"systems.<system>.streams."
+ String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT,
SYSTEM) + propertyName, UNUSED_VALUE,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID),
SYSTEM)));
+ assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
+ streamConfig.getStreamProperties(STREAM_ID));
+
+ // merge multiple ways of specifying configs
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ // "streams."
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
"from.stream.id.property", "fromStreamIdValue",
+ // second "streams." property
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
"from.stream.id.other.property",
+ "fromStreamIdOtherValue",
+ // "systems.<system>.streams."
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
"from.system.stream.property",
+ "fromSystemStreamValue",
+ // need to map the stream id to a system
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID),
SYSTEM)));
+ assertEquals(new MapConfig(ImmutableMap.of(
+ "from.stream.id.property", "fromStreamIdValue",
+ "from.stream.id.other.property", "fromStreamIdOtherValue",
+ "from.system.stream.property", "fromSystemStreamValue")),
+ streamConfig.getStreamProperties(STREAM_ID));
}
- // 11
@Test
- public void testContainsSamzaPropertyFromSystemStreamAndStreamId() {
- StreamConfig config = buildConfig("key1", "value1",
- buildProp(SYSTEM_STREAM_1, "samza.key2"), "value2",
- buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2OVERRIDE",
- buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
- buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()),
STREAM1_PHYSICAL_NAME);
- assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key2"));
+ public void testGetSystem() {
+ assertNull(new StreamConfig(new MapConfig()).getSystem(STREAM_ID));
+
+ // system is specified directly
+ StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+ JobConfig.JOB_DEFAULT_SYSTEM, "otherSystem",
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), OTHER_STREAM_ID),
"otherSystem")));
+ assertEquals(SYSTEM, streamConfig.getSystem(STREAM_ID));
+
+ // fall back to job default system
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), OTHER_STREAM_ID),
"otherSystem")));
+ assertEquals(SYSTEM, streamConfig.getSystem(STREAM_ID));
}
- // 00
@Test
- public void testGetSerdeStreamsDoesNotExist() {
- StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1,
"samza.key1"), "value1", "key2", "value2");
- assertTrue(config.getSerdeStreams(STREAM1_SYSTEM).isEmpty());
+ public void testGetPhysicalName() {
+ assertEquals(STREAM_ID, new StreamConfig(new
MapConfig()).getPhysicalName(STREAM_ID));
+
+ // ignore mapping for other stream ids
+ StreamConfig streamConfig = new StreamConfig(new MapConfig(
+
ImmutableMap.of(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(),
OTHER_STREAM_ID), PHYSICAL_STREAM)));
+ assertEquals(STREAM_ID, streamConfig.getPhysicalName(STREAM_ID));
+
+ streamConfig = new StreamConfig(new MapConfig(
+
ImmutableMap.of(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(),
STREAM_ID), PHYSICAL_STREAM)));
+ assertEquals(PHYSICAL_STREAM, streamConfig.getPhysicalName(STREAM_ID));
}
- // 01
@Test
- public void testGetSerdeStreamsFromSystemStream() {
- StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1,
StreamConfig.KEY_SERDE()), "value1",
- buildProp(SYSTEM_STREAM_2, StreamConfig.MSG_SERDE()), "value2",
- "key3", "value3");
- assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
- assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size());
- assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
- assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get());
+ public void testGetIsIntermediateStream() {
+ assertFalse(new StreamConfig(new
MapConfig()).getIsIntermediateStream(STREAM_ID));
+
+ // ignore mapping for other stream ids
+ StreamConfig streamConfig = new StreamConfig(new MapConfig(
+
ImmutableMap.of(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(),
OTHER_STREAM_ID), "true")));
+ assertFalse(streamConfig.getIsIntermediateStream(STREAM_ID));
+
+ // do not use stream id property if physical name is passed as input
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(),
STREAM_ID), "true",
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM)));
+ assertFalse(streamConfig.getIsIntermediateStream(PHYSICAL_STREAM));
+
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(),
STREAM_ID), "true")));
+ assertTrue(streamConfig.getIsIntermediateStream(STREAM_ID));
+
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(),
STREAM_ID), "false")));
+ assertFalse(streamConfig.getIsIntermediateStream(STREAM_ID));
}
- // 10
@Test
- public void testGetSerdeStreamsFromStreamId() {
- StreamConfig config = buildConfig(
- buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
- buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()),
STREAM1_PHYSICAL_NAME,
- buildProp(STREAM1_STREAM_ID, StreamConfig.KEY_SERDE()), "value1",
-
- buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM,
- buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()),
STREAM2_PHYSICAL_NAME,
- buildProp(STREAM2_STREAM_ID, StreamConfig.MSG_SERDE()), "value2",
- "key3", "value3");
- assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
- assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size());
- assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
- assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get());
+ public void testGetDeleteCommittedMessages() {
+ assertFalse(new StreamConfig(new
MapConfig()).getDeleteCommittedMessages(STREAM_ID));
+
+ // ignore mapping for other stream ids
+ StreamConfig streamConfig = new StreamConfig(new MapConfig(
+
ImmutableMap.of(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(),
OTHER_STREAM_ID),
+ "true")));
+ assertFalse(streamConfig.getDeleteCommittedMessages(STREAM_ID));
+
+ // do not use stream id property if physical name is passed as input
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(),
STREAM_ID), "true",
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM)));
+ assertFalse(streamConfig.getDeleteCommittedMessages(PHYSICAL_STREAM));
+
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(),
STREAM_ID), "true")));
+ assertTrue(streamConfig.getDeleteCommittedMessages(STREAM_ID));
+
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(),
STREAM_ID), "false")));
+ assertFalse(streamConfig.getDeleteCommittedMessages(STREAM_ID));
}
- // 11
@Test
- public void testGetSerdeStreamsFromSystemStreamAndStreamId() {
- StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1,
StreamConfig.KEY_SERDE()), "value1",
- buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
- buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()),
STREAM1_PHYSICAL_NAME,
- buildProp(STREAM1_STREAM_ID, StreamConfig.KEY_SERDE()),
"value1OVERRIDE",
- buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM,
- buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()),
STREAM2_PHYSICAL_NAME,
- buildProp(STREAM2_STREAM_ID, StreamConfig.MSG_SERDE()), "value2",
- "key3", "value3");
-
- assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
- assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size());
- assertEquals("value1OVERRIDE",
config.getStreamKeySerde(SYSTEM_STREAM_1).get());
- assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get());
+ public void testGetIsBounded() {
+ assertFalse(new StreamConfig(new MapConfig()).getIsBounded(STREAM_ID));
+
+ // ignore mapping for other stream ids
+ StreamConfig streamConfig = new StreamConfig(new MapConfig(
+ ImmutableMap.of(String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(),
OTHER_STREAM_ID),
+ "true")));
+ assertFalse(streamConfig.getIsBounded(STREAM_ID));
+
+ // do not use stream id property if physical name is passed as input
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), STREAM_ID),
"true",
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM)));
+ assertFalse(streamConfig.getIsBounded(PHYSICAL_STREAM));
+
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), STREAM_ID),
"true")));
+ assertTrue(streamConfig.getIsBounded(STREAM_ID));
+
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), STREAM_ID),
"false")));
+ assertFalse(streamConfig.getIsBounded(STREAM_ID));
}
@Test
- public void testStreamPropertyDefaults() {
- final String nonSamzaProperty = "replication.factor";
- StreamConfig config = buildConfig(
- buildSystemDefaultProp(STREAM1_SYSTEM, nonSamzaProperty), "1",
- buildSystemDefaultProp(STREAM1_SYSTEM, StreamConfig.KEY_SERDE()),
"value1",
- buildSystemDefaultProp(STREAM1_SYSTEM,
StreamConfig.CONSUMER_OFFSET_DEFAULT()), "newest",
- buildProp(SYSTEM_STREAM_1, "dummyStreamProperty"), "dummyValue",
- buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
- buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()),
STREAM1_PHYSICAL_NAME,
- buildSystemDefaultProp(STREAM2_SYSTEM, nonSamzaProperty), "2",
- buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM,
- buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()),
STREAM2_PHYSICAL_NAME,
- buildProp(STREAM2_STREAM_ID, nonSamzaProperty), "3",
- buildSystemDefaultProp(STREAM3_SYSTEM, nonSamzaProperty), "4",
- buildProp(STREAM3_STREAM_ID, StreamConfig.SYSTEM()), STREAM3_SYSTEM,
- buildProp(STREAM3_STREAM_ID, StreamConfig.PHYSICAL_NAME()),
STREAM3_PHYSICAL_NAME,
- buildProp(SYSTEM_STREAM_3, nonSamzaProperty), "5",
- "key3", "value3");
-
-
-
- // Ensure that we can set legacy system properties via the new system wide
default
- assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
- assertEquals(0, config.getSerdeStreams(STREAM1_SYSTEM).size());
- assertEquals("value1", new
SystemConfig(config).getSystemKeySerde(STREAM1_SYSTEM).get());
- assertEquals("newest",
config.getDefaultStreamOffset(SYSTEM_STREAM_1).get());
-
- // Property set via systems.x.default.stream.* only
- assertEquals("1",
config.getStreamProperties(STREAM1_STREAM_ID).get(nonSamzaProperty));
-
- // Property set via systems.x.default.stream.* and streams.y.*
- assertEquals("3",
config.getStreamProperties(STREAM2_STREAM_ID).get(nonSamzaProperty));
-
- // Property set via systems.x.default.stream.* and system.x.streams.z.*
- assertEquals("5",
config.getStreamProperties(STREAM3_STREAM_ID).get(nonSamzaProperty));
-
- assertEquals(false, config.getBroadcastEnabled(SYSTEM_STREAM_1));
+ public void testGetStreamIds() {
+ assertEquals(ImmutableList.of(), ImmutableList.copyOf(
+ JavaConverters.asJavaIterableConverter(new StreamConfig(new
MapConfig()).getStreamIds()).asJava()));
+
+ StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
".property", "value")));
+ assertEquals(ImmutableList.of(STREAM_ID),
+
ImmutableList.copyOf(JavaConverters.asJavaIterableConverter(streamConfig.getStreamIds()).asJava()));
+
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
".property.subProperty", "value")));
+ assertEquals(ImmutableList.of(STREAM_ID),
+
ImmutableList.copyOf(JavaConverters.asJavaIterableConverter(streamConfig.getStreamIds()).asJava()));
+
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
".property0", "value",
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
".property1", "value",
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
".property.subProperty0", "value",
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
".property.subProperty1", "value",
+ String.format(StreamConfig.STREAM_ID_PREFIX(), OTHER_STREAM_ID) +
".property", "value")));
+ assertEquals(ImmutableList.of(STREAM_ID, OTHER_STREAM_ID),
+
ImmutableList.copyOf(JavaConverters.asJavaIterableConverter(streamConfig.getStreamIds()).asJava()));
}
+ private static void doTestSamzaProperty(String propertyName, String
propertyValue, SamzaPropertyAssertion assertion) {
+ doTestSamzaPropertyAccess(propertyName, propertyValue, assertion);
+ doTestSamzaPropertyAccessWithPhysicalStream(propertyName, propertyValue,
assertion);
+ doTestSamzaPropertyPriority(propertyName, propertyValue, assertion);
+ doTestSamzaPropertyMultipleStreams(propertyName, propertyValue, assertion);
+ }
- private StreamConfig buildConfig(String... kvs) {
- if (kvs.length % 2 != 0) {
- throw new IllegalArgumentException("There must be parity between the
keys and values");
- }
+ /**
+ * Tests for Samza property access using different lookup methods, when
using stream id in system stream.
+ * This includes tests in which there is no specified physical stream, so
the stream id is used as the physical
+ * stream.
+ */
+ private static void doTestSamzaPropertyAccess(String propertyName, String
value, SamzaPropertyAssertion assertion) {
+ // streams.<streamId>.<property>
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
propertyName, value,
+ // all streams need to have a system
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM))),
+ SYSTEM_STREAM);
+
+ // systems.<system>.streams.<stream>.<property> where stream id has no
specified physical stream
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, value,
+ // specify the system for the stream id
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID),
SYSTEM))),
+ SYSTEM_STREAM);
+
+ // systems.<system>.streams.<stream>.<property> where stream is the
streamId, system is from job default system
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, value,
+ // use job default system to get the system
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM))),
+ SYSTEM_STREAM);
+
+ // systems.<system>.default.stream.<property> where stream id has no
specified physical stream
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT,
SYSTEM) + propertyName, value,
+ // specify the system for the stream id
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID),
SYSTEM))),
+ SYSTEM_STREAM);
+
+ // systems.<system>.streams.<stream>.<property> where no system mapping
(fall back to SystemStream)
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, value,
+ // map the stream id to the physical stream
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM))),
+ SYSTEM_STREAM);
+
+ // systems.<system>.default.stream.<property> where no system mapping
(fall back to SystemStream)
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT,
SYSTEM) + propertyName, value,
+ // map the stream id to the physical stream
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM))),
+ SYSTEM_STREAM);
+
+ // systems.<system>.streams.<stream>.<property> where stream id has a
physical name but property is from stream id
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, value,
+ // use job default system to get the system
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+ // map the stream id to the physical stream
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM))),
+ SYSTEM_STREAM);
+
+ // systems.<system>.default.stream.<property> where stream id has a
physical name but property is from stream id
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT,
SYSTEM) + propertyName, value,
+ // use job default system to get the system
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+ // map the stream id to the physical stream
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM))),
+ SYSTEM_STREAM);
+ }
- Map<String, String> configMap = new HashMap<>();
- for (int i = 0; i < kvs.length - 1; i += 2) {
- configMap.put(kvs[i], kvs[i + 1]);
- }
- return new StreamConfig(new MapConfig(configMap));
+ /**
+ * Tests for Samza property access using different lookup methods, when
using physical stream in system stream.
+ */
+ private static void doTestSamzaPropertyAccessWithPhysicalStream(String
propertyName, String value,
+ SamzaPropertyAssertion assertion) {
+ // streams.<streamId>.<property>
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
propertyName, value,
+ // all streams need to have a system
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+ // map the stream id to the physical stream
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM))),
+ SYSTEM_STREAM_PHYSICAL);
+
+ // systems.<system>.streams.<stream>.<property> with a specific system for
the stream id
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, PHYSICAL_STREAM) +
propertyName, value,
+ // specify the system for the stream id
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+ // map the stream id to the physical stream
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM))),
+ SYSTEM_STREAM_PHYSICAL);
+
+ // systems.<system>.streams.<stream>.<property> with system coming from
job default system
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, PHYSICAL_STREAM) +
propertyName, value,
+ // use job default system to get the system
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+ // map the stream id to the physical stream
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM))),
+ SYSTEM_STREAM_PHYSICAL);
+
+ // systems.<system>.default.stream.<property>
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT,
SYSTEM) + propertyName, value,
+ // specify the system for the stream id
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+ // map the stream id to the physical stream
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM))),
+ SYSTEM_STREAM_PHYSICAL);
}
- private String buildProp(String streamId, String suffix) {
- return String.format(STREAM_ID_PATTERN, streamId) + suffix;
+ /**
+ * Tests that certain properties have priority over others. For conciseness,
this will not explicitly test priorities
+ * for all different ways of specifying a property. This will compare two
options at a time, and then we can infer
+ * priorities transitively.
+ */
+ private static void doTestSamzaPropertyPriority(String propertyName, String
value, SamzaPropertyAssertion assertion) {
+ // streams.<streamId>.<property> vs.
systems.<system>.streams.<stream>.<property>
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
propertyName, value,
+ // all streams need to have a system
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+ // this config should not be used
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, UNUSED_VALUE))),
+ SYSTEM_STREAM);
+
+ // systems.<system>.streams.<stream>.<property> vs.
systems.<system>.default.stream.<property>
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, value,
+ // specify the system for the stream id
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+ // this config should not be used
+ String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT,
SYSTEM) + propertyName, UNUSED_VALUE))),
+ SYSTEM_STREAM);
+
+ /*
+ * The next logical case to check would be
systems.<system>.default.stream.<property> with a system mapping in the
+ * config vs. systems.<system>.streams.<stream>.<property> with a system
mapping, but that is not possible, so move
+ * on to the next case.
+ */
+
+ /*
+ * systems.<system>.streams.<stream>.<property> without a system mapping
in the config vs.
+ * systems.<system>.default.stream.<property> without a system mapping in
the config
+ */
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, value,
+ // this config should not be used
+ String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT,
SYSTEM) + propertyName, UNUSED_VALUE))),
+ SYSTEM_STREAM);
}
- private String buildProp(SystemStream systemStream, String suffix) {
- return String.format(SYSTEM_STREAM_PATTERN, systemStream.getSystem(),
systemStream.getStream()) + suffix;
+ /**
+ * Tests for Samza property access in which multiple streams are configured.
+ * Only testing cases in which streams may share some properties.
+ */
+ private static void doTestSamzaPropertyMultipleStreams(String propertyName,
String value, SamzaPropertyAssertion assertion) {
+ // systems.<system>.default.stream.<property> where stream id has no
specified physical stream
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT,
SYSTEM) + propertyName, value,
+ // specify the systems for the streams
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID), SYSTEM,
+ String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), OTHER_STREAM_ID),
SYSTEM))),
+ SYSTEM_STREAM);
}
- private String buildSystemDefaultProp(String system, String suffix) {
- return String.format(SYSTEM_DEFAULT_STREAM_PATTERN, system) + suffix;
+ private static void doTestSamzaPropertyInvalidConfig(SamzaPropertyLookup
lookup) {
+ // configure physical stream and have mapping from stream id to physical
stream
+ StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM,
+ String.format(StreamConfig.STREAM_ID_PREFIX(), PHYSICAL_STREAM) +
".property", "value",
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM)));
+ try {
+ lookup.doLookup(streamConfig, SYSTEM_STREAM_PHYSICAL);
+ fail("Expected an exception due to having too many mappings to a
physical stream");
+ } catch (IllegalStateException e) {
+ // expected to reach here
+ }
+
+ // two separate stream ids map to same physical stream
+ streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM,
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(),
OTHER_STREAM_ID), PHYSICAL_STREAM,
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM)));
+ try {
+ lookup.doLookup(streamConfig, SYSTEM_STREAM_PHYSICAL);
+ fail("Expected an exception due to having too many mappings to a
physical stream");
+ } catch (IllegalStateException e) {
+ // expected to reach here
+ }
}
- private Config addConfigs(Config original, String... kvs) {
- Map<String, String> result = new HashMap<>();
- result.putAll(original);
- result.putAll(buildConfig(kvs));
- return new MapConfig(result);
+ private static void doTestSamzaPropertyDoesNotExist(String propertyName,
SamzaPropertyAssertion assertion) {
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ // just put in some value which will be ignored
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
SAMZA_IGNORED_PROPERTY, UNUSED_VALUE))),
+ SYSTEM_STREAM);
+
+ /*
+ * Won't use streams.<streamId>.<property> if streamId is mapped to a
physical stream
+ */
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_ID_PREFIX(), STREAM_ID) +
propertyName, UNUSED_VALUE,
+ // all streams need to have a system
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+ // map the stream id to the physical stream
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM))),
+ SYSTEM_STREAM);
+
+ /*
+ * Won't use systems.<system>.streams.<stream>.<property> if stream is
mapped to a physical stream and the property
+ * is only specified using the physical stream
+ */
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, PHYSICAL_STREAM) +
propertyName, UNUSED_VALUE,
+ // all streams need to have a system
+ JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
+ // map the stream id to the physical stream
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM))),
+ SYSTEM_STREAM);
+
+ /*
+ * Won't use systems.<system>.streams.<stream>.<property> if stream is
mapped to a physical stream and there is no
+ * system mapping for the stream id
+ */
+ assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
+ String.format(StreamConfig.STREAM_PREFIX(), SYSTEM, STREAM_ID) +
propertyName, UNUSED_VALUE,
+ // map the stream id to the physical stream
+ String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID),
PHYSICAL_STREAM))),
+ SYSTEM_STREAM_PHYSICAL);
}
-}
+ /**
+ * Used to execute assertions for tests which look up specific Samza
properties.
+ */
+ @FunctionalInterface
+ private interface SamzaPropertyAssertion {
+ void doAssertion(StreamConfig streamConfig, SystemStream systemStream);
+ }
+
+ /**
+ * Used to specify how to look up specific Samza properties.
+ */
+ @FunctionalInterface
+ private interface SamzaPropertyLookup {
+ void doLookup(StreamConfig streamConfig, SystemStream systemStream);
+ }
+}
\ No newline at end of file