Repository: samza
Updated Branches:
  refs/heads/samza-fluent-api-v1 2c7309cf6 -> 8815b0392


SAMZA-1075: Refactor SystemAdmin Interface to expose a common method to create 
streams

Author: Jacob Maes <[email protected]>

Reviewers: Yi Pan (Data Infrastructure) <[email protected]>

Closes #53 from jmakes/samza-1075


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8815b039
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8815b039
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8815b039

Branch: refs/heads/samza-fluent-api-v1
Commit: 8815b03929dab0527594fd609f5080fb40a05a0b
Parents: 2c7309c
Author: Jacob Maes <[email protected]>
Authored: Fri Feb 17 12:49:19 2017 -0800
Committer: Xinyu Liu <[email protected]>
Committed: Fri Feb 17 14:14:17 2017 -0800

----------------------------------------------------------------------
 .../org/apache/samza/system/StreamSpec.java     | 203 +++++++++++++++++++
 .../samza/system/StreamValidationException.java |  30 +++
 .../org/apache/samza/system/SystemAdmin.java    |  29 ++-
 .../samza/system/kafka/KafkaStreamSpec.java     | 141 +++++++++++++
 .../org/apache/samza/config/KafkaConfig.scala   | 115 ++++++-----
 .../samza/system/kafka/KafkaSystemAdmin.scala   | 143 +++++++------
 .../system/kafka/TestKafkaSystemAdminJava.java  | 145 +++++++++++++
 .../system/kafka/TestKafkaSystemAdmin.scala     |  24 ++-
 8 files changed, 700 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java 
b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
new file mode 100644
index 0000000..d8a2144
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * StreamSpec is a blueprint for creating, validating, or simply describing a 
stream in the runtime environment.
+ *
+ * It has specific attributes for common behaviors that Samza uses.
+ *
+ * It also includes a map of configurations which may be system-specific.
+ *
+ * It is immutable by design.
+ */
+public class StreamSpec {
+
+  private static final int DEFAULT_PARTITION_COUNT = 1;
+
+  /**
+   * Unique identifier for the stream in a Samza application.
+   * This identifier is used as a key for stream properties in the
+   * job config and to distinguish between streams in a graph.
+   */
+  private final String id;
+
+  /**
+   * The System name on which this stream will exist. Corresponds to a named 
implementation of the
+   * Samza System abstraction.
+   */
+  private final String systemName;
+
+  /**
+   * The physical identifier for the stream. This is the identifier that will 
be used in remote
+   * systems to identify the stream. In Kafka this would be the topic name 
whereas in HDFS it
+   * might be a file URN.
+   */
+  private final String physicalName;
+
+  /**
+   * The number of partitions for the stream.
+   */
+  private final int partitionCount;
+
+  /**
+   * A set of all system-specific configurations for the stream.
+   */
+  private final Map<String, String> config;
+
+  /**
+   *  @param id           The application-unique logical identifier for the 
stream. It is used to distinguish between
+   *                      streams in a Samza application so it must be unique 
in the context of one deployable unit.
+   *                      It does not need to be globally unique or unique 
with respect to a host.
+   *
+   * @param physicalName  The physical identifier for the stream. This is the 
identifier that will be used in remote
+   *                      systems to identify the stream. In Kafka this would 
be the topic name whereas in HDFS it
+   *                      might be a file URN.
+   *
+   * @param systemName    The System name on which this stream will exist. 
Corresponds to a named implementation of the
+   *                      Samza System abstraction. See {@link SystemFactory}
+   */
+  public StreamSpec(String id, String physicalName, String systemName) {
+    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, 
Collections.emptyMap());
+  }
+
+  /**
+   *
+   *  @param id           The application-unique logical identifier for the 
stream. It is used to distinguish between
+   *                      streams in a Samza application so it must be unique 
in the context of one deployable unit.
+   *                      It does not need to be globally unique or unique 
with respect to a host.
+   *
+   * @param physicalName  The physical identifier for the stream. This is the 
identifier that will be used in remote
+   *                      systems to identify the stream. In Kafka this would 
be the topic name whereas in HDFS it
+   *                      might be a file URN.
+   *
+   * @param systemName    The System name on which this stream will exist. 
Corresponds to a named implementation of the
+   *                      Samza System abstraction. See {@link SystemFactory}
+   *
+   * @param partitionCount  The number of partitionts for the stream. A value 
of {@code 1} indicates unpartitioned.
+   */
+  public StreamSpec(String id, String physicalName, String systemName, int 
partitionCount) {
+    this(id, physicalName, systemName, partitionCount, Collections.emptyMap());
+  }
+
+  /**
+   *  @param id           The application-unique logical identifier for the 
stream. It is used to distinguish between
+   *                      streams in a Samza application so it must be unique 
in the context of one deployable unit.
+   *                      It does not need to be globally unique or unique 
with respect to a host.
+   *
+   * @param physicalName  The physical identifier for the stream. This is the 
identifier that will be used in remote
+   *                      systems to identify the stream. In Kafka this would 
be the topic name whereas in HDFS it
+   *                      might be a file URN.
+   *
+   * @param systemName    The System name on which this stream will exist. 
Corresponds to a named implementation of the
+   *                      Samza System abstraction. See {@link SystemFactory}
+   *
+   * @param config        A map of properties for the stream. These may be 
System-specfic.
+   */
+  public StreamSpec(String id, String physicalName, String systemName, 
Map<String, String> config) {
+    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, config);
+  }
+
+  /**
+   *  @param id             The application-unique logical identifier for the 
stream. It is used to distinguish between
+   *                        streams in a Samza application so it must be 
unique in the context of one deployable unit.
+   *                        It does not need to be globally unique or unique 
with respect to a host.
+   *
+   * @param physicalName    The physical identifier for the stream. This is 
the identifier that will be used in remote
+   *                        systems to identify the stream. In Kafka this 
would be the topic name whereas in HDFS it
+   *                        might be a file URN.
+   *
+   * @param systemName      The System name on which this stream will exist. 
Corresponds to a named implementation of the
+   *                        Samza System abstraction. See {@link SystemFactory}
+   *
+   * @param partitionCount  The number of partitionts for the stream. A value 
of {@code 1} indicates unpartitioned.
+   *
+   * @param config          A map of properties for the stream. These may be 
System-specfic.
+   */
+  public StreamSpec(String id, String physicalName, String systemName, int 
partitionCount,  Map<String, String> config) {
+    if (id == null) {
+      throw new NullPointerException("Parameter 'id' must not be null");
+    }
+
+    if (systemName == null) {
+      throw new NullPointerException("Parameter 'systemName' must not be 
null");
+    }
+
+    if (partitionCount < 1) {
+      throw new NullPointerException("Parameter 'partitionCount' must not be 
greater than 0");
+    }
+
+    this.id = id;
+    this.systemName = systemName;
+    this.physicalName = physicalName;
+    this.partitionCount = partitionCount;
+
+    if (config != null) {
+      this.config = Collections.unmodifiableMap(new HashMap<>(config));
+    } else {
+      this.config = Collections.emptyMap();
+    }
+  }
+
+  /**
+   * Copies this StreamSpec, but applies a new partitionCount.
+   *
+   * This method is not static s.t. subclasses can override it.
+   *
+   * @param partitionCount  The partitionCount for the returned StreamSpec.
+   * @return                A copy of this StreamSpec with the specified 
partitionCount.
+   */
+  public StreamSpec copyWithPartitionCount(int partitionCount) {
+    return new StreamSpec(id, physicalName, systemName, partitionCount, 
config);
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String getSystemName() {
+    return systemName;
+  }
+
+  public String getPhysicalName() {
+    return physicalName;
+  }
+
+  public int getPartitionCount() {
+    return partitionCount;
+  }
+
+  public Map<String, String> getConfig() {
+    return config;
+  }
+
+  public String get(String propertyName) {
+    return config.get(propertyName);
+  }
+
+  public String getOrDefault(String propertyName, String defaultValue) {
+    return config.getOrDefault(propertyName, defaultValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-api/src/main/java/org/apache/samza/system/StreamValidationException.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/StreamValidationException.java
 
b/samza-api/src/main/java/org/apache/samza/system/StreamValidationException.java
new file mode 100644
index 0000000..fef4148
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/system/StreamValidationException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.apache.samza.SamzaException;
+
+
+public class StreamValidationException extends SamzaException {
+  private static final long serialVersionUID = 1L;
+
+  public StreamValidationException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index ef99893..b180712 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -28,7 +28,6 @@ import java.util.Set;
  * utility methods that Samza needs in order to interact with a system.
  */
 public interface SystemAdmin {
-
   /**
    * Fetches the offsets for the messages immediately after the supplied 
offsets
    * for a group of SystemStreamPartitions.
@@ -52,11 +51,12 @@ public interface SystemAdmin {
 
   /**
    * An API to create a change log stream
-   * 
+   *
    * @param streamName
    *          The name of the stream to be created in the underlying stream
    * @param numOfPartitions
    *          The number of partitions in the changelog stream
+   * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)}
    */
   void createChangelogStream(String streamName, int numOfPartitions);
 
@@ -67,6 +67,7 @@ public interface SystemAdmin {
    *          The name of the stream to be created in the underlying stream
    * @param numOfPartitions
    *          The number of partitions in the changelog stream
+   * @deprecated since 0.12.1, use {@link #validateStream(StreamSpec)}
    */
   void validateChangelogStream(String streamName, int numOfPartitions);
 
@@ -76,6 +77,7 @@ public interface SystemAdmin {
    *
    * @param streamName
    *          The name of the coordinator stream to create.
+   * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)}
    */
   void createCoordinatorStream(String streamName);
 
@@ -89,4 +91,27 @@ public interface SystemAdmin {
    * @return -1 if offset1 &lt; offset2; 0 if offset1 == offset2; 1 if offset1 
&gt; offset2. Null if not comparable
    */
   Integer offsetComparator(String offset1, String offset2);
+
+  /**
+   * Create a stream described by the spec.
+   *
+   * @param streamSpec  The spec, or blueprint from which the physical stream 
will be created on the system.
+   * @return            {@code true} if the stream was actually created and 
not pre-existing.
+   *                    {@code false} if the stream was pre-existing.
+   *                    A RuntimeException will be thrown if creation fails.
+   */
+  default boolean createStream(StreamSpec streamSpec) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Validates the stream described by the streamSpec on the system.
+   * A {@link StreamValidationException} should be thrown for any validation 
error.
+   *
+   * @param streamSpec  The spec, or blueprint for the physical stream on the 
system.
+   * @throws StreamValidationException if validation fails.
+   */
+  default void validateStream(StreamSpec streamSpec) throws 
StreamValidationException {
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
new file mode 100644
index 0000000..3255f70
--- /dev/null
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.system.StreamSpec;
+
+
+/**
+ * Extends StreamSpec with the ability to easily get the topic replication 
factor.
+ */
+public class KafkaStreamSpec extends StreamSpec {
+  private static final int DEFAULT_REPLICATION_FACTOR = 2;
+
+  /**
+   * The number of replicas for stream durability.
+   */
+  private final int replicationFactor;
+
+  /**
+   * Convenience method to convert a config map to Properties.
+   * @param map The Map to convert.
+   * @return    The Properties instance.
+   */
+  private static Properties mapToProperties(Map<String, String> map) {
+    Properties props = new Properties();
+    props.putAll(map);
+    return props;
+  }
+
+  /**
+   * Convenience method to convert Properties to a config map.
+   * @param properties  The Properties to convert.
+   * @return            The Map instance.
+   */
+  private static Map<String, String> propertiesToMap(Properties properties) {
+    Map<String, String> map = new HashMap<String, String>();
+    for (final String name: properties.stringPropertyNames()) {
+      map.put(name, properties.getProperty(name));
+    }
+    return map;
+  }
+
+  /**
+   * Converts any StreamSpec to a KafkaStreamSpec.
+   * If the original spec already is a KafkaStreamSpec, it is simply returned.
+   *
+   * @param originalSpec  The StreamSpec instance to convert to 
KafkaStreamSpec.
+   * @return              A KafkaStreamSpec instance.
+   */
+  public static KafkaStreamSpec fromSpec(StreamSpec originalSpec) {
+    if (originalSpec instanceof KafkaStreamSpec) {
+      return ((KafkaStreamSpec) originalSpec);
+    }
+
+    int replicationFactor = Integer.parseInt(originalSpec.getOrDefault( 
KafkaConfig.TOPIC_REPLICATION_FACTOR(),
+                                                                        
KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()));
+
+    return new KafkaStreamSpec( originalSpec.getId(),
+                                originalSpec.getPhysicalName(),
+                                originalSpec.getSystemName(),
+                                originalSpec.getPartitionCount(),
+                                replicationFactor,
+                                mapToProperties(originalSpec.getConfig()));
+  }
+
+  /**
+   * Convenience constructor to create a KafkaStreamSpec with just a 
topicName, systemName, and partitionCount.
+   *
+   * @param topicName       The name of the topic.
+   * @param systemName      The name of the System. See {@link 
org.apache.samza.system.SystemFactory}
+   * @param partitionCount  The number of partitions.
+   */
+  public KafkaStreamSpec(String topicName, String systemName, int 
partitionCount) {
+    this(topicName, topicName, systemName, partitionCount, 
DEFAULT_REPLICATION_FACTOR, new Properties());
+  }
+
+  /**
+   * Constructs a StreamSpec with a replication factor.
+   *
+   * @param id                The application-unique logical identifier for 
the stream. It is used to distinguish between
+   *                          streams in a Samza application so it must be 
unique in the context of one deployable unit.
+   *                          It does not need to be globally unique or unique 
with respect to a host.
+   *
+   * @param topicName         The physical identifier for the stream. This is 
the identifier that will be used in remote
+   *                          systems to identify the stream. In Kafka this 
would be the topic name whereas in HDFS it
+   *                          might be a file URN.
+   *
+   * @param systemName        The System name on which this stream will exist. 
Corresponds to a named implementation of the
+   *                          Samza System abstraction. See {@link 
org.apache.samza.system.SystemFactory}
+   *
+   * @param partitionCount    The number of partitionts for the stream. A 
value of {@code 1} indicates unpartitioned.
+   *
+   * @param replicationFactor The number of topic replicas in the Kafka 
cluster for durability.
+   *
+   * @param properties        A set of properties for the stream. These may be 
System-specfic.
+   */
+  public KafkaStreamSpec(String id, String topicName, String systemName, int 
partitionCount, int replicationFactor,
+      Properties properties) {
+    super(id, topicName, systemName, partitionCount, 
propertiesToMap(properties));
+
+    if (replicationFactor <= 0) {
+      throw new IllegalArgumentException(
+          String.format("Replication factor %d must be greater than 0.", 
replicationFactor));
+    }
+    this.replicationFactor = replicationFactor;
+  }
+
+  @Override
+  public StreamSpec copyWithPartitionCount(int partitionCount) {
+    return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), 
partitionCount, getReplicationFactor(), getProperties());
+  }
+
+  public int getReplicationFactor() {
+    return replicationFactor;
+  }
+
+  public Properties getProperties() {
+    return mapToProperties(getConfig());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 770220c..e355e7e 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -24,27 +24,33 @@ import java.util.regex.Pattern
 
 import org.apache.samza.util.Util
 import org.apache.samza.util.Logging
+
 import scala.collection.JavaConversions._
 import kafka.consumer.ConsumerConfig
 import java.util.{Properties, UUID}
+
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.samza.SamzaException
 import java.util
+
 import scala.collection.JavaConverters._
 import org.apache.samza.system.kafka.KafkaSystemFactory
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.kafka.common.serialization.ByteArraySerializer
 
 object KafkaConfig {
+  val TOPIC_REPLICATION_FACTOR = "replication.factor"
+  val TOPIC_DEFAULT_REPLICATION_FACTOR = "2"
+
   val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
   val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
   val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
 
   val CHECKPOINT_SYSTEM = "task.checkpoint.system"
-  val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
+  val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint." + 
TOPIC_REPLICATION_FACTOR
   val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
 
-  val CHANGELOG_STREAM_REPLICATION_FACTOR = 
"stores.%s.changelog.replication.factor"
+  val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + 
TOPIC_REPLICATION_FACTOR
   val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
   // The default segment size to use for changelog topics
   val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912"
@@ -53,20 +59,20 @@ object KafkaConfig {
   val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$"
 
   /**
-   * Defines how low a queue can get for a single system/stream/partition
-   * combination before trying to fetch more messages for it.
-   */
+    * Defines how low a queue can get for a single system/stream/partition
+    * combination before trying to fetch more messages for it.
+    */
   val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + 
"samza.fetch.threshold"
 
   val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
 
   /**
-   * Defines how many bytes to use for the buffered prefetch messages for job 
as a whole.
-   * The bytes for a single system/stream/partition are computed based on this.
-   * This fetches wholes messages, hence this bytes limit is a soft one, and 
the actual usage can be
-   * the bytes limit + size of max message in the partition for a given stream.
-   * If the value of this property is > 0 then this takes precedence over 
CONSUMER_FETCH_THRESHOLD config.
-   */
+    * Defines how many bytes to use for the buffered prefetch messages for job 
as a whole.
+    * The bytes for a single system/stream/partition are computed based on 
this.
+    * This fetches wholes messages, hence this bytes limit is a soft one, and 
the actual usage can be
+    * the bytes limit + size of max message in the partition for a given 
stream.
+    * If the value of this property is > 0 then this takes precedence over 
CONSUMER_FETCH_THRESHOLD config.
+    */
   val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + 
"samza.fetch.threshold.bytes"
 
   implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
@@ -75,18 +81,23 @@ object KafkaConfig {
 class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   // checkpoints
   def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
+
   def getCheckpointReplicationFactor() = 
getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
+
   def getCheckpointSegmentBytes() = 
getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, 
KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
+
   // custom consumer config
   def getConsumerFetchThreshold(name: String) = 
getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
+
   def getConsumerFetchThresholdBytes(name: String) = 
getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name)
+
   def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = 
getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0
 
 
   /**
-   * Returns a map of topic -> fetch.message.max.bytes value for all streams 
that
-   * are defined with this property in the config.
-   */
+    * Returns a map of topic -> fetch.message.max.bytes value for all streams 
that
+    * are defined with this property in the config.
+    */
   def getFetchMessageMaxBytesTopics(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
     subConf
@@ -98,9 +109,9 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
   }
 
   /**
-   * Returns a map of topic -> auto.offset.reset value for all streams that
-   * are defined with this property in the config.
-   */
+    * Returns a map of topic -> auto.offset.reset value for all streams that
+    * are defined with this property in the config.
+    */
   def getAutoOffsetResetTopics(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
     subConf
@@ -113,8 +124,11 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
 
   // regex resolver
   def getRegexResolvedStreams(rewriterName: String) = 
getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+
   def getRegexResolvedSystem(rewriterName: String) = 
getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+
   def getRegexResolvedInheritedConfig(rewriterName: String) = 
config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", 
true)
+
   def getChangelogStreamReplicationFactor(name: String) = 
getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name)
 
   // The method returns a map of storenames to changelog topic names, which 
are configured to use kafka as the changelog stream
@@ -124,16 +138,16 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
     val storageConfig = new StorageConfig(config)
     val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
 
-    for((changelogConfig, cn) <- changelogConfigs){
+    for ((changelogConfig, cn) <- changelogConfigs) {
       // Lookup the factory for this particular stream and verify if it's a 
kafka system
 
       val matcher = pattern.matcher(changelogConfig)
-      val storeName = if(matcher.find()) matcher.group(1) else throw new 
SamzaException("Unable to find store name in the changelog configuration: " + 
changelogConfig + " with SystemStream: " + cn)
+      val storeName = if (matcher.find()) matcher.group(1) else throw new 
SamzaException("Unable to find store name in the changelog configuration: " + 
changelogConfig + " with SystemStream: " + cn)
 
       val changelogName = 
storageConfig.getChangelogStream(storeName).getOrElse(throw new 
SamzaException("unable to get SystemStream for store:" + changelogConfig));
       val systemStream = Util.getSystemStreamFromNames(changelogName)
       val factoryName = 
config.getSystemFactory(systemStream.getSystem).getOrElse(new 
SamzaException("Unable to determine factory for system: " + 
systemStream.getSystem))
-      if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){
+      if (classOf[KafkaSystemFactory].getCanonicalName == factoryName) {
         storeToChangelog += storeName -> systemStream.getStream
       }
     }
@@ -147,16 +161,22 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
     kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
     kafkaChangeLogProperties.setProperty("segment.bytes", 
KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
     kafkaChangeLogProperties.setProperty("delete.retention.ms", 
String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
-    filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, 
kv._2)}
+    filteredConfigs.foreach { kv => 
kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
     kafkaChangeLogProperties
   }
 
+  def getTopicKafkaProperties(systemName: String, streamName: String) = {
+    val filteredConfigs = config.subset(StreamConfig.STREAM_PREFIX 
format(systemName, streamName), true)
+    val topicProperties = new Properties
+    filteredConfigs.foreach { kv => topicProperties.setProperty(kv._1, kv._2) }
+    topicProperties
+  }
+
   // kafka config
-  def getKafkaSystemConsumerConfig(
-    systemName: String,
-    clientId: String,
-    groupId: String = "undefined-samza-consumer-group-%s" format 
UUID.randomUUID.toString,
-    injectedProps: Map[String, String] = Map()) = {
+  def getKafkaSystemConsumerConfig( systemName: String,
+                                    clientId: String,
+                                    groupId: String = 
"undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
+                                    injectedProps: Map[String, String] = 
Map()) = {
 
     val subConf = config.subset("systems.%s.consumer." format systemName, true)
     val consumerProps = new Properties()
@@ -167,10 +187,9 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
     new ConsumerConfig(consumerProps)
   }
 
-  def getKafkaSystemProducerConfig(
-    systemName: String,
-    clientId: String,
-    injectedProps: Map[String, String] = Map()) = {
+  def getKafkaSystemProducerConfig( systemName: String,
+                                    clientId: String,
+                                    injectedProps: Map[String, String] = 
Map()) = {
 
     val subConf = config.subset("systems.%s.producer." format systemName, true)
     val producerProps = new util.HashMap[String, Object]()
@@ -197,45 +216,45 @@ class KafkaProducerConfig(val systemName: String,
     val producerProperties: java.util.Map[String, Object] = new 
util.HashMap[String, Object]()
     producerProperties.putAll(properties)
 
-    
if(!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) 
{
-      debug("%s undefined. Defaulting to %s." format 
(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
+    if 
(!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+      debug("%s undefined. Defaulting to %s." 
format(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
byteArraySerializerClassName))
       producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
byteArraySerializerClassName)
     }
 
-    
if(!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
 {
-      debug("%s undefined. Defaulting to %s." format 
(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
+    if 
(!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) 
{
+      debug("%s undefined. Defaulting to %s." 
format(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
byteArraySerializerClassName))
       producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
byteArraySerializerClassName)
     }
 
-    
if(producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
-        && 
producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt
 > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) {
-      warn("Setting '%s' to a value other than %d does not guarantee message 
ordering because new messages will be sent without waiting for previous ones to 
be acknowledged." 
-          format (ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT))
+    if 
(producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
+      && 
producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt
 > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) {
+      warn("Setting '%s' to a value other than %d does not guarantee message 
ordering because new messages will be sent without waiting for previous ones to 
be acknowledged."
+        format(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT))
     } else {
       
producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
     }
 
-    if(producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG) 
-        && 
producerProperties.get(ProducerConfig.RETRIES_CONFIG).asInstanceOf[String].toInt
 < RETRIES_DEFAULT) {
-        warn("Samza does not provide producer failure handling. Consider 
setting '%s' to a large value, like Int.MAX." format 
ProducerConfig.RETRIES_CONFIG)
+    if (producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG)
+      && 
producerProperties.get(ProducerConfig.RETRIES_CONFIG).asInstanceOf[String].toInt
 < RETRIES_DEFAULT) {
+      warn("Samza does not provide producer failure handling. Consider setting 
'%s' to a large value, like Int.MAX." format ProducerConfig.RETRIES_CONFIG)
     } else {
       // Retries config is set to Max so that when all attempts fail, Samza 
also fails the send. We do not have any special handler
       // for producer failure
       producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
     }
-    
+
     producerProperties
   }
 
-  val reconnectIntervalMs =  
Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
-          .getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long]
+  val reconnectIntervalMs = 
Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
+    .getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long]
 
   val bootsrapServers = {
-    if(properties.containsKey("metadata.broker.list"))
+    if (properties.containsKey("metadata.broker.list"))
       warn("Kafka producer configuration contains 'metadata.broker.list'. This 
configuration is deprecated . Samza has been upgraded " +
-             "to use Kafka's new producer API. Please update your 
configurations based on the documentation at 
http://kafka.apache.org/documentation.html#newproducerconfigs";)
+        "to use Kafka's new producer API. Please update your configurations 
based on the documentation at 
http://kafka.apache.org/documentation.html#newproducerconfigs";)
     Option(properties.get("bootstrap.servers"))
-            .getOrElse(throw new SamzaException("No bootstrap servers defined 
in config for %s." format systemName))
-            .asInstanceOf[String]
+      .getOrElse(throw new SamzaException("No bootstrap servers defined in 
config for %s." format systemName))
+      .asInstanceOf[String]
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 955fa44..309b653 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -20,23 +20,21 @@
 package org.apache.samza.system.kafka
 
 import java.util
+import java.util.{Properties, UUID}
 
-import org.apache.samza.Partition
-import org.apache.samza.SamzaException
-import org.apache.samza.system.{ExtendedSystemAdmin, SystemStreamMetadata, 
SystemStreamPartition}
-import org.apache.samza.util.{ ClientUtilTopicMetadataStore, 
ExponentialSleepStrategy, Logging, KafkaUtil }
+import kafka.admin.AdminUtils
 import kafka.api._
-import kafka.consumer.SimpleConsumer
-import kafka.common.{ TopicExistsException, TopicAndPartition }
-import kafka.consumer.ConsumerConfig
+import kafka.common.{TopicAndPartition, TopicExistsException}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import kafka.utils.ZkUtils
-import java.util.{ Properties, UUID }
+import org.apache.samza.config.KafkaConfig
+import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system._
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, 
ExponentialSleepStrategy, KafkaUtil, Logging}
+import org.apache.samza.{Partition, SamzaException}
+
 import scala.collection.JavaConversions
 import scala.collection.JavaConversions._
-import org.apache.samza.system.SystemStreamMetadata.{OffsetType, 
SystemStreamPartitionMetadata}
-import kafka.consumer.ConsumerConfig
-import kafka.admin.AdminUtils
-import org.apache.samza.util.KafkaUtil
 
 
 object KafkaSystemAdmin extends Logging {
@@ -269,12 +267,12 @@ class KafkaSystemAdmin(
   }
 
   /**
-    * Returns the newest offset for the specified SSP.
-    * This method is fast and targeted. It minimizes the number of kafka 
requests.
-    * It does not retry indefinitely if there is any failure.
-    * It returns null if the topic is empty. To get the offsets for *all*
-    * partitions, it would be more efficient to call getSystemStreamMetadata
-    */
+   * Returns the newest offset for the specified SSP.
+   * This method is fast and targeted. It minimizes the number of kafka 
requests.
+   * It does not retry indefinitely if there is any failure.
+   * It returns null if the topic is empty. To get the offsets for *all*
+   * partitions, it would be more efficient to call getSystemStreamMetadata
+   */
   override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: 
Integer) = {
     debug("Fetching newest offset for: %s" format ssp)
     var offset: String = null
@@ -334,34 +332,14 @@ class KafkaSystemAdmin(
 
   override def createCoordinatorStream(streamName: String) {
     info("Attempting to create coordinator stream %s." format streamName)
-    new ExponentialSleepStrategy(initialDelayMs = 500).run(
-      loop => {
-        val zkClient = connectZk()
-        try {
-          AdminUtils.createTopic(
-            zkClient,
-            streamName,
-            1, // Always one partition for coordinator stream.
-            coordinatorStreamReplicationFactor,
-            coordinatorStreamProperties)
-        } finally {
-          zkClient.close
-        }
 
-        info("Created coordinator stream %s." format streamName)
-        loop.done
-      },
+    val streamSpec = new KafkaStreamSpec(streamName, streamName, systemName, 
1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
 
-      (exception, loop) => {
-        exception match {
-          case e: TopicExistsException =>
-            info("Coordinator stream %s already exists." format streamName)
-            loop.done
-          case e: Exception =>
-            warn("Failed to create topic %s: %s. Retrying." format 
(streamName, e))
-            debug("Exception detail:", e)
-        }
-      })
+    if (createStream(streamSpec)) {
+      info("Created coordinator stream %s." format streamName)
+    } else {
+      info("Coordinator stream %s already exists." format streamName)
+    }
   }
 
   /**
@@ -435,44 +413,57 @@ class KafkaSystemAdmin(
     offsets
   }
 
-  private def createTopicInKafka(topicName: String, 
numKafkaChangelogPartitions: Int) {
-    val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
-    info("Attempting to create change log topic %s." format topicName)
-    info("Using partition count " + numKafkaChangelogPartitions + " for 
creating change log topic")
-    val topicMetaInfo = topicMetaInformation.getOrElse(topicName, throw new 
KafkaChangelogException("Unable to find topic information for topic " + 
topicName))
-    retryBackoff.run(
+  /**
+   * @inheritdoc
+   */
+  override def createStream(spec: StreamSpec): Boolean = {
+    val kSpec = KafkaStreamSpec.fromSpec(spec);
+    var streamCreated = false
+
+    new ExponentialSleepStrategy(initialDelayMs = 500).run(
       loop => {
         val zkClient = connectZk()
         try {
           AdminUtils.createTopic(
             zkClient,
-            topicName,
-            numKafkaChangelogPartitions,
-            topicMetaInfo.replicationFactor,
-            topicMetaInfo.kafkaProps)
+            kSpec.getPhysicalName,
+            kSpec.getPartitionCount,
+            kSpec.getReplicationFactor,
+            kSpec.getProperties)
         } finally {
           zkClient.close
         }
 
-        info("Created changelog topic %s." format topicName)
+        streamCreated = true
         loop.done
       },
 
       (exception, loop) => {
         exception match {
           case e: TopicExistsException =>
-            info("Changelog topic %s already exists." format topicName)
+            streamCreated = false
             loop.done
           case e: Exception =>
-            warn("Failed to create topic %s: %s. Retrying." format (topicName, 
e))
+            warn("Failed to create topic %s: %s. Retrying." format 
(spec.getPhysicalName, e))
             debug("Exception detail:", e)
         }
       })
+
+    streamCreated
   }
 
-  private def validateTopicInKafka(topicName: String, 
numKafkaChangelogPartitions: Int) {
+  /**
+    * @inheritdoc
+    *
+    * Validates a stream in Kafka. Should not be called before createStream(),
+    * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients,
+    * is not read-only and will auto-create a new topic.
+    */
+  override def validateStream(spec: StreamSpec): Unit = {
+    val topicName = spec.getPhysicalName
+    info("Validating topic %s." format topicName)
+
     val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
-    info("Validating changelog topic %s." format topicName)
     var metadataTTL = Long.MaxValue // Trust the cache until we get an 
exception
     retryBackoff.run(
       loop => {
@@ -482,17 +473,17 @@ class KafkaSystemAdmin(
         KafkaUtil.maybeThrowException(topicMetadata.errorCode)
 
         val partitionCount = topicMetadata.partitionsMetadata.length
-        if (partitionCount < numKafkaChangelogPartitions) {
-          throw new KafkaChangelogException("Changelog topic validation failed 
for topic %s because partition count %s did not match expected partition count 
of %d" format (topicName, topicMetadata.partitionsMetadata.length, 
numKafkaChangelogPartitions))
+        if (partitionCount != spec.getPartitionCount) {
+          throw new StreamValidationException("Topic validation failed for 
topic %s because partition count %s did not match expected partition count of 
%d" format (topicName, topicMetadata.partitionsMetadata.length, 
spec.getPartitionCount))
         }
 
-        info("Successfully validated changelog topic %s." format topicName)
+        info("Successfully validated topic %s." format topicName)
         loop.done
       },
 
       (exception, loop) => {
         exception match {
-          case e: KafkaChangelogException => throw e
+          case e: StreamValidationException => throw e
           case e: Exception =>
             warn("While trying to validate topic %s: %s. Retrying." format 
(topicName, e))
             debug("Exception detail:", e)
@@ -502,24 +493,32 @@ class KafkaSystemAdmin(
   }
 
   /**
-   * Exception to be thrown when the change log stream creation or validation 
has failed
-   */
+    * Exception to be thrown when the change log stream creation or validation 
has failed
+    */
   class KafkaChangelogException(s: String, t: Throwable) extends 
SamzaException(s, t) {
     def this(s: String) = this(s, null)
   }
 
   override def createChangelogStream(topicName: String, 
numKafkaChangelogPartitions: Int) = {
-    createTopicInKafka(topicName, numKafkaChangelogPartitions)
-    validateChangelogStream(topicName, numKafkaChangelogPartitions)
+    val topicMeta = topicMetaInformation.getOrElse(topicName, throw new 
KafkaChangelogException("Unable to find topic information for topic " + 
topicName))
+    val spec = new KafkaStreamSpec(topicName, topicName, systemName, 
numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)
+
+    if (createStream(spec)) {
+      info("Created changelog stream %s." format topicName)
+    } else {
+      info("Changelog stream %s already exists." format topicName)
+    }
+
+    validateStream(spec)
   }
 
   /**
-   * Validates change log stream in Kafka. Should not be called before 
createChangelogStream(),
-   * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, 
is not read-only and
-   * will auto-create a new topic.
-   */
+    * Validates a stream in Kafka. Should not be called before createStream(),
+    * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, 
is not read-only and
+    * will auto-create a new topic.
+    */
   override def validateChangelogStream(topicName: String, 
numKafkaChangelogPartitions: Int) = {
-    validateTopicInKafka(topicName, numKafkaChangelogPartitions)
+    validateStream(new KafkaStreamSpec(topicName, systemName, 
numKafkaChangelogPartitions))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
new file mode 100644
index 0000000..a786468
--- /dev/null
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.StreamValidationException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.util.Util;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.*;
+
+
+public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
+
+  KafkaSystemAdmin basicSystemAdmin = createSystemAdmin();
+
+
+  @Test
+  public void testCreateCoordinatorStreamDelegatesToCreateStream() {
+    KafkaSystemAdmin systemAdmin = createSystemAdmin();//coordProps, 3, new 
scala.collection.immutable.HashMap<>(), 1000);
+    SystemAdmin admin = Mockito.spy(systemAdmin);
+    StreamSpec spec = new StreamSpec("testId", "testCoordinatorStream", 
"testSystem");
+
+    admin.createCoordinatorStream(spec.getPhysicalName());
+    admin.validateStream(spec);
+
+    Mockito.verify(admin).createStream(Mockito.any());
+  }
+
+  @Test
+  public void testCreateChangelogStreamDelegatesToCreateStream() {
+    final String STREAM = "testChangeLogStream";
+    final int PARTITIONS = 12;
+    final int REP_FACTOR = 3;
+
+    Properties coordProps = new Properties();
+    Properties changeLogProps = new Properties();
+    changeLogProps.setProperty("cleanup.policy", "compact");
+    changeLogProps.setProperty("segment.bytes", "139");
+    Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
+    changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
+
+    SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, 
Util.javaMapAsScalaMap(changeLogMap)));
+    StreamSpec spec = new StreamSpec(STREAM, STREAM, SYSTEM(), PARTITIONS);
+    admin.createChangelogStream(STREAM, PARTITIONS);
+    admin.validateStream(spec);
+
+    ArgumentCaptor<StreamSpec> specCaptor = 
ArgumentCaptor.forClass(StreamSpec.class);
+    Mockito.verify(admin).createStream(specCaptor.capture());
+
+    StreamSpec internalSpec = specCaptor.getValue();
+    assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec 
is used to carry replication factor
+    assertEquals(STREAM, internalSpec.getId());
+    assertEquals(SYSTEM(), internalSpec.getSystemName());
+    assertEquals(STREAM, internalSpec.getPhysicalName());
+    assertEquals(REP_FACTOR, ((KafkaStreamSpec) 
internalSpec).getReplicationFactor());
+    assertEquals(PARTITIONS, internalSpec.getPartitionCount());
+    assertEquals(changeLogProps, ((KafkaStreamSpec) 
internalSpec).getProperties());
+  }
+
+  @Test
+  public void testValidateChangelogStreamDelegatesToValidateStream() {
+    final String STREAM = "testChangeLogValidate";
+    Properties coordProps = new Properties();
+    Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
+    changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties()));
+
+    KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, 
Util.javaMapAsScalaMap(changeLogMap));
+    SystemAdmin admin = Mockito.spy(systemAdmin);
+    StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12);
+
+    admin.createChangelogStream(spec.getPhysicalName(), 
spec.getPartitionCount());
+    admin.validateStream(spec);
+    admin.validateChangelogStream(spec.getPhysicalName(), 
spec.getPartitionCount());
+
+    Mockito.verify(admin).createStream(Mockito.any());
+    Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any());
+  }
+
+  @Test
+  public void testCreateStream() {
+    SystemAdmin admin = this.basicSystemAdmin;
+    StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
+
+    assertTrue("createStream should return true if the stream does not exist 
and then is created.", admin.createStream(spec));
+    admin.validateStream(spec);
+
+    assertFalse("createStream should return false if the stream already 
exists.", admin.createStream(spec));
+  }
+
+  @Test(expected = StreamValidationException.class)
+  public void testValidateStreamDoesNotExist() {
+    SystemAdmin admin = this.basicSystemAdmin;
+
+    StreamSpec spec = new StreamSpec("testId", "testStreamNameExist", 
"testSystem", 8);
+
+    admin.validateStream(spec);
+  }
+
+  @Test(expected = StreamValidationException.class)
+  public void testValidateStreamWrongPartitionCount() {
+    SystemAdmin admin = this.basicSystemAdmin;
+    StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", 
"testSystem", 8);
+    StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", 
"testSystem", 4);
+
+    assertTrue("createStream should return true if the stream does not exist 
and then is created.", admin.createStream(spec1));
+
+    admin.validateStream(spec2);
+  }
+
+  @Test(expected = StreamValidationException.class)
+  public void testValidateStreamWrongName() {
+    SystemAdmin admin = this.basicSystemAdmin;
+    StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", 
"testSystem", 8);
+    StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", 
"testSystem", 8);
+
+    assertTrue("createStream should return true if the stream does not exist 
and then is created.", admin.createStream(spec1));
+
+    admin.validateStream(spec2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 0e3c9b5..be7db97 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -21,18 +21,16 @@
 
 package org.apache.samza.system.kafka
 
-import java.util
-import java.util.Properties
+import java.util.{Properties, UUID}
 
 import kafka.admin.AdminUtils
 import kafka.common.{ErrorMapping, LeaderNotAvailableException}
 import kafka.consumer.{Consumer, ConsumerConfig}
-import kafka.server.{KafkaConfig, KafkaServer}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import kafka.utils.{TestUtils, ZkUtils}
 import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestUtils, ZkUtils}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.security.JaasUtils
-
 import org.apache.samza.Partition
 import org.apache.samza.config.KafkaProducerConfig
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -43,7 +41,9 @@ import org.junit._
 
 import scala.collection.JavaConversions._
 
-
+/**
+  * README: New tests should be added to the Java tests. See 
TestKafkaSystemAdminJava
+  */
 object TestKafkaSystemAdmin extends KafkaServerTestHarness {
 
   val SYSTEM = "kafka"
@@ -136,6 +136,14 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness 
{
     Consumer.create(consumerConfig)
   }
 
+  def createSystemAdmin: KafkaSystemAdmin = {
+    new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 
6000, 6000, zkSecure))
+  }
+
+  def createSystemAdmin(coordinatorStreamProperties: Properties, 
coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, 
ChangelogInfo]): KafkaSystemAdmin = {
+    new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 
6000, 6000, zkSecure), coordinatorStreamProperties, 
coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, 
UUID.randomUUID.toString, topicMetaInformation)
+  }
+
 }
 
 /**
@@ -146,7 +154,7 @@ class TestKafkaSystemAdmin {
   import TestKafkaSystemAdmin._
 
   // Provide a random zkAddress, the system admin tries to connect only when a 
topic is created/validated
-  val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => 
ZkUtils(zkConnect, 6000, 6000, zkSecure))
+  val systemAdmin = createSystemAdmin
 
   @Test
   def testShouldAssembleMetadata {

Reply via email to