Repository: samza Updated Branches: refs/heads/samza-fluent-api-v1 2c7309cf6 -> 8815b0392
SAMZA-1075: Refactor SystemAdmin Interface to expose a common method to create streams Author: Jacob Maes <[email protected]> Reviewers: Yi Pan (Data Infrastructure) <[email protected]> Closes #53 from jmakes/samza-1075 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8815b039 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8815b039 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8815b039 Branch: refs/heads/samza-fluent-api-v1 Commit: 8815b03929dab0527594fd609f5080fb40a05a0b Parents: 2c7309c Author: Jacob Maes <[email protected]> Authored: Fri Feb 17 12:49:19 2017 -0800 Committer: Xinyu Liu <[email protected]> Committed: Fri Feb 17 14:14:17 2017 -0800 ---------------------------------------------------------------------- .../org/apache/samza/system/StreamSpec.java | 203 +++++++++++++++++++ .../samza/system/StreamValidationException.java | 30 +++ .../org/apache/samza/system/SystemAdmin.java | 29 ++- .../samza/system/kafka/KafkaStreamSpec.java | 141 +++++++++++++ .../org/apache/samza/config/KafkaConfig.scala | 115 ++++++----- .../samza/system/kafka/KafkaSystemAdmin.scala | 143 +++++++------ .../system/kafka/TestKafkaSystemAdminJava.java | 145 +++++++++++++ .../system/kafka/TestKafkaSystemAdmin.scala | 24 ++- 8 files changed, 700 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java new file mode 100644 index 0000000..d8a2144 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + + +/** + * StreamSpec is a blueprint for creating, validating, or simply describing a stream in the runtime environment. + * + * It has specific attributes for common behaviors that Samza uses. + * + * It also includes a map of configurations which may be system-specific. + * + * It is immutable by design. + */ +public class StreamSpec { + + private static final int DEFAULT_PARTITION_COUNT = 1; + + /** + * Unique identifier for the stream in a Samza application. + * This identifier is used as a key for stream properties in the + * job config and to distinguish between streams in a graph. + */ + private final String id; + + /** + * The System name on which this stream will exist. Corresponds to a named implementation of the + * Samza System abstraction. + */ + private final String systemName; + + /** + * The physical identifier for the stream. This is the identifier that will be used in remote + * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it + * might be a file URN. + */ + private final String physicalName; + + /** + * The number of partitions for the stream. + */ + private final int partitionCount; + + /** + * A set of all system-specific configurations for the stream. + */ + private final Map<String, String> config; + + /** + * @param id The application-unique logical identifier for the stream. It is used to distinguish between + * streams in a Samza application so it must be unique in the context of one deployable unit. + * It does not need to be globally unique or unique with respect to a host. + * + * @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote + * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it + * might be a file URN. + * + * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the + * Samza System abstraction. See {@link SystemFactory} + */ + public StreamSpec(String id, String physicalName, String systemName) { + this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, Collections.emptyMap()); + } + + /** + * + * @param id The application-unique logical identifier for the stream. It is used to distinguish between + * streams in a Samza application so it must be unique in the context of one deployable unit. + * It does not need to be globally unique or unique with respect to a host. + * + * @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote + * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it + * might be a file URN. + * + * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the + * Samza System abstraction. See {@link SystemFactory} + * + * @param partitionCount The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned. + */ + public StreamSpec(String id, String physicalName, String systemName, int partitionCount) { + this(id, physicalName, systemName, partitionCount, Collections.emptyMap()); + } + + /** + * @param id The application-unique logical identifier for the stream. It is used to distinguish between + * streams in a Samza application so it must be unique in the context of one deployable unit. + * It does not need to be globally unique or unique with respect to a host. + * + * @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote + * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it + * might be a file URN. + * + * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the + * Samza System abstraction. See {@link SystemFactory} + * + * @param config A map of properties for the stream. These may be System-specfic. + */ + public StreamSpec(String id, String physicalName, String systemName, Map<String, String> config) { + this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, config); + } + + /** + * @param id The application-unique logical identifier for the stream. It is used to distinguish between + * streams in a Samza application so it must be unique in the context of one deployable unit. + * It does not need to be globally unique or unique with respect to a host. + * + * @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote + * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it + * might be a file URN. + * + * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the + * Samza System abstraction. See {@link SystemFactory} + * + * @param partitionCount The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned. + * + * @param config A map of properties for the stream. These may be System-specfic. + */ + public StreamSpec(String id, String physicalName, String systemName, int partitionCount, Map<String, String> config) { + if (id == null) { + throw new NullPointerException("Parameter 'id' must not be null"); + } + + if (systemName == null) { + throw new NullPointerException("Parameter 'systemName' must not be null"); + } + + if (partitionCount < 1) { + throw new NullPointerException("Parameter 'partitionCount' must not be greater than 0"); + } + + this.id = id; + this.systemName = systemName; + this.physicalName = physicalName; + this.partitionCount = partitionCount; + + if (config != null) { + this.config = Collections.unmodifiableMap(new HashMap<>(config)); + } else { + this.config = Collections.emptyMap(); + } + } + + /** + * Copies this StreamSpec, but applies a new partitionCount. + * + * This method is not static s.t. subclasses can override it. + * + * @param partitionCount The partitionCount for the returned StreamSpec. + * @return A copy of this StreamSpec with the specified partitionCount. + */ + public StreamSpec copyWithPartitionCount(int partitionCount) { + return new StreamSpec(id, physicalName, systemName, partitionCount, config); + } + + public String getId() { + return id; + } + + public String getSystemName() { + return systemName; + } + + public String getPhysicalName() { + return physicalName; + } + + public int getPartitionCount() { + return partitionCount; + } + + public Map<String, String> getConfig() { + return config; + } + + public String get(String propertyName) { + return config.get(propertyName); + } + + public String getOrDefault(String propertyName, String defaultValue) { + return config.getOrDefault(propertyName, defaultValue); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-api/src/main/java/org/apache/samza/system/StreamValidationException.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamValidationException.java b/samza-api/src/main/java/org/apache/samza/system/StreamValidationException.java new file mode 100644 index 0000000..fef4148 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/system/StreamValidationException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system; + +import org.apache.samza.SamzaException; + + +public class StreamValidationException extends SamzaException { + private static final long serialVersionUID = 1L; + + public StreamValidationException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index ef99893..b180712 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -28,7 +28,6 @@ import java.util.Set; * utility methods that Samza needs in order to interact with a system. */ public interface SystemAdmin { - /** * Fetches the offsets for the messages immediately after the supplied offsets * for a group of SystemStreamPartitions. @@ -52,11 +51,12 @@ public interface SystemAdmin { /** * An API to create a change log stream - * + * * @param streamName * The name of the stream to be created in the underlying stream * @param numOfPartitions * The number of partitions in the changelog stream + * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)} */ void createChangelogStream(String streamName, int numOfPartitions); @@ -67,6 +67,7 @@ public interface SystemAdmin { * The name of the stream to be created in the underlying stream * @param numOfPartitions * The number of partitions in the changelog stream + * @deprecated since 0.12.1, use {@link #validateStream(StreamSpec)} */ void validateChangelogStream(String streamName, int numOfPartitions); @@ -76,6 +77,7 @@ public interface SystemAdmin { * * @param streamName * The name of the coordinator stream to create. + * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)} */ void createCoordinatorStream(String streamName); @@ -89,4 +91,27 @@ public interface SystemAdmin { * @return -1 if offset1 < offset2; 0 if offset1 == offset2; 1 if offset1 > offset2. Null if not comparable */ Integer offsetComparator(String offset1, String offset2); + + /** + * Create a stream described by the spec. + * + * @param streamSpec The spec, or blueprint from which the physical stream will be created on the system. + * @return {@code true} if the stream was actually created and not pre-existing. + * {@code false} if the stream was pre-existing. + * A RuntimeException will be thrown if creation fails. + */ + default boolean createStream(StreamSpec streamSpec) { + throw new UnsupportedOperationException(); + } + + /** + * Validates the stream described by the streamSpec on the system. + * A {@link StreamValidationException} should be thrown for any validation error. + * + * @param streamSpec The spec, or blueprint for the physical stream on the system. + * @throws StreamValidationException if validation fails. + */ + default void validateStream(StreamSpec streamSpec) throws StreamValidationException { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java new file mode 100644 index 0000000..3255f70 --- /dev/null +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.system.StreamSpec; + + +/** + * Extends StreamSpec with the ability to easily get the topic replication factor. + */ +public class KafkaStreamSpec extends StreamSpec { + private static final int DEFAULT_REPLICATION_FACTOR = 2; + + /** + * The number of replicas for stream durability. + */ + private final int replicationFactor; + + /** + * Convenience method to convert a config map to Properties. + * @param map The Map to convert. + * @return The Properties instance. + */ + private static Properties mapToProperties(Map<String, String> map) { + Properties props = new Properties(); + props.putAll(map); + return props; + } + + /** + * Convenience method to convert Properties to a config map. + * @param properties The Properties to convert. + * @return The Map instance. + */ + private static Map<String, String> propertiesToMap(Properties properties) { + Map<String, String> map = new HashMap<String, String>(); + for (final String name: properties.stringPropertyNames()) { + map.put(name, properties.getProperty(name)); + } + return map; + } + + /** + * Converts any StreamSpec to a KafkaStreamSpec. + * If the original spec already is a KafkaStreamSpec, it is simply returned. + * + * @param originalSpec The StreamSpec instance to convert to KafkaStreamSpec. + * @return A KafkaStreamSpec instance. + */ + public static KafkaStreamSpec fromSpec(StreamSpec originalSpec) { + if (originalSpec instanceof KafkaStreamSpec) { + return ((KafkaStreamSpec) originalSpec); + } + + int replicationFactor = Integer.parseInt(originalSpec.getOrDefault( KafkaConfig.TOPIC_REPLICATION_FACTOR(), + KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR())); + + return new KafkaStreamSpec( originalSpec.getId(), + originalSpec.getPhysicalName(), + originalSpec.getSystemName(), + originalSpec.getPartitionCount(), + replicationFactor, + mapToProperties(originalSpec.getConfig())); + } + + /** + * Convenience constructor to create a KafkaStreamSpec with just a topicName, systemName, and partitionCount. + * + * @param topicName The name of the topic. + * @param systemName The name of the System. See {@link org.apache.samza.system.SystemFactory} + * @param partitionCount The number of partitions. + */ + public KafkaStreamSpec(String topicName, String systemName, int partitionCount) { + this(topicName, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, new Properties()); + } + + /** + * Constructs a StreamSpec with a replication factor. + * + * @param id The application-unique logical identifier for the stream. It is used to distinguish between + * streams in a Samza application so it must be unique in the context of one deployable unit. + * It does not need to be globally unique or unique with respect to a host. + * + * @param topicName The physical identifier for the stream. This is the identifier that will be used in remote + * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it + * might be a file URN. + * + * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the + * Samza System abstraction. See {@link org.apache.samza.system.SystemFactory} + * + * @param partitionCount The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned. + * + * @param replicationFactor The number of topic replicas in the Kafka cluster for durability. + * + * @param properties A set of properties for the stream. These may be System-specfic. + */ + public KafkaStreamSpec(String id, String topicName, String systemName, int partitionCount, int replicationFactor, + Properties properties) { + super(id, topicName, systemName, partitionCount, propertiesToMap(properties)); + + if (replicationFactor <= 0) { + throw new IllegalArgumentException( + String.format("Replication factor %d must be greater than 0.", replicationFactor)); + } + this.replicationFactor = replicationFactor; + } + + @Override + public StreamSpec copyWithPartitionCount(int partitionCount) { + return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), partitionCount, getReplicationFactor(), getProperties()); + } + + public int getReplicationFactor() { + return replicationFactor; + } + + public Properties getProperties() { + return mapToProperties(getConfig()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 770220c..e355e7e 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -24,27 +24,33 @@ import java.util.regex.Pattern import org.apache.samza.util.Util import org.apache.samza.util.Logging + import scala.collection.JavaConversions._ import kafka.consumer.ConsumerConfig import java.util.{Properties, UUID} + import org.apache.kafka.clients.producer.ProducerConfig import org.apache.samza.SamzaException import java.util + import scala.collection.JavaConverters._ import org.apache.samza.system.kafka.KafkaSystemFactory import org.apache.samza.config.SystemConfig.Config2System import org.apache.kafka.common.serialization.ByteArraySerializer object KafkaConfig { + val TOPIC_REPLICATION_FACTOR = "replication.factor" + val TOPIC_DEFAULT_REPLICATION_FACTOR = "2" + val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex" val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system" val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config" val CHECKPOINT_SYSTEM = "task.checkpoint.system" - val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor" + val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint." + TOPIC_REPLICATION_FACTOR val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" - val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor" + val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + TOPIC_REPLICATION_FACTOR val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka." // The default segment size to use for changelog topics val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912" @@ -53,20 +59,20 @@ object KafkaConfig { val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$" /** - * Defines how low a queue can get for a single system/stream/partition - * combination before trying to fetch more messages for it. - */ + * Defines how low a queue can get for a single system/stream/partition + * combination before trying to fetch more messages for it. + */ val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold" val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400 /** - * Defines how many bytes to use for the buffered prefetch messages for job as a whole. - * The bytes for a single system/stream/partition are computed based on this. - * This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be - * the bytes limit + size of max message in the partition for a given stream. - * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config. - */ + * Defines how many bytes to use for the buffered prefetch messages for job as a whole. + * The bytes for a single system/stream/partition are computed based on this. + * This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be + * the bytes limit + size of max message in the partition for a given stream. + * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config. + */ val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes" implicit def Config2Kafka(config: Config) = new KafkaConfig(config) @@ -75,18 +81,23 @@ object KafkaConfig { class KafkaConfig(config: Config) extends ScalaMapConfig(config) { // checkpoints def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM) + def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR) + def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES) + // custom consumer config def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name) + def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name) + def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0 /** - * Returns a map of topic -> fetch.message.max.bytes value for all streams that - * are defined with this property in the config. - */ + * Returns a map of topic -> fetch.message.max.bytes value for all streams that + * are defined with this property in the config. + */ def getFetchMessageMaxBytesTopics(systemName: String) = { val subConf = config.subset("systems.%s.streams." format systemName, true) subConf @@ -98,9 +109,9 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { } /** - * Returns a map of topic -> auto.offset.reset value for all streams that - * are defined with this property in the config. - */ + * Returns a map of topic -> auto.offset.reset value for all streams that + * are defined with this property in the config. + */ def getAutoOffsetResetTopics(systemName: String) = { val subConf = config.subset("systems.%s.streams." format systemName, true) subConf @@ -113,8 +124,11 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { // regex resolver def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName) + def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName) + def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true) + def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name) // The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream @@ -124,16 +138,16 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val storageConfig = new StorageConfig(config) val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX) - for((changelogConfig, cn) <- changelogConfigs){ + for ((changelogConfig, cn) <- changelogConfigs) { // Lookup the factory for this particular stream and verify if it's a kafka system val matcher = pattern.matcher(changelogConfig) - val storeName = if(matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn) + val storeName = if (matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn) val changelogName = storageConfig.getChangelogStream(storeName).getOrElse(throw new SamzaException("unable to get SystemStream for store:" + changelogConfig)); val systemStream = Util.getSystemStreamFromNames(changelogName) val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem)) - if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){ + if (classOf[KafkaSystemFactory].getCanonicalName == factoryName) { storeToChangelog += storeName -> systemStream.getStream } } @@ -147,16 +161,22 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE) kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name))) - filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2)} + filteredConfigs.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) } kafkaChangeLogProperties } + def getTopicKafkaProperties(systemName: String, streamName: String) = { + val filteredConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true) + val topicProperties = new Properties + filteredConfigs.foreach { kv => topicProperties.setProperty(kv._1, kv._2) } + topicProperties + } + // kafka config - def getKafkaSystemConsumerConfig( - systemName: String, - clientId: String, - groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString, - injectedProps: Map[String, String] = Map()) = { + def getKafkaSystemConsumerConfig( systemName: String, + clientId: String, + groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString, + injectedProps: Map[String, String] = Map()) = { val subConf = config.subset("systems.%s.consumer." format systemName, true) val consumerProps = new Properties() @@ -167,10 +187,9 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { new ConsumerConfig(consumerProps) } - def getKafkaSystemProducerConfig( - systemName: String, - clientId: String, - injectedProps: Map[String, String] = Map()) = { + def getKafkaSystemProducerConfig( systemName: String, + clientId: String, + injectedProps: Map[String, String] = Map()) = { val subConf = config.subset("systems.%s.producer." format systemName, true) val producerProps = new util.HashMap[String, Object]() @@ -197,45 +216,45 @@ class KafkaProducerConfig(val systemName: String, val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]() producerProperties.putAll(properties) - if(!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { - debug("%s undefined. Defaulting to %s." format (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)) + if (!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { + debug("%s undefined. Defaulting to %s." format(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)) producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName) } - if(!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { - debug("%s undefined. Defaulting to %s." format (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)) + if (!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + debug("%s undefined. Defaulting to %s." format(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)) producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName) } - if(producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) - && producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) { - warn("Setting '%s' to a value other than %d does not guarantee message ordering because new messages will be sent without waiting for previous ones to be acknowledged." - format (ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)) + if (producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) + && producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) { + warn("Setting '%s' to a value other than %d does not guarantee message ordering because new messages will be sent without waiting for previous ones to be acknowledged." + format(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)) } else { producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) } - if(producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG) - && producerProperties.get(ProducerConfig.RETRIES_CONFIG).asInstanceOf[String].toInt < RETRIES_DEFAULT) { - warn("Samza does not provide producer failure handling. Consider setting '%s' to a large value, like Int.MAX." format ProducerConfig.RETRIES_CONFIG) + if (producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG) + && producerProperties.get(ProducerConfig.RETRIES_CONFIG).asInstanceOf[String].toInt < RETRIES_DEFAULT) { + warn("Samza does not provide producer failure handling. Consider setting '%s' to a large value, like Int.MAX." format ProducerConfig.RETRIES_CONFIG) } else { // Retries config is set to Max so that when all attempts fail, Samza also fails the send. We do not have any special handler // for producer failure producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT) } - + producerProperties } - val reconnectIntervalMs = Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG)) - .getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long] + val reconnectIntervalMs = Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG)) + .getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long] val bootsrapServers = { - if(properties.containsKey("metadata.broker.list")) + if (properties.containsKey("metadata.broker.list")) warn("Kafka producer configuration contains 'metadata.broker.list'. This configuration is deprecated . Samza has been upgraded " + - "to use Kafka's new producer API. Please update your configurations based on the documentation at http://kafka.apache.org/documentation.html#newproducerconfigs") + "to use Kafka's new producer API. Please update your configurations based on the documentation at http://kafka.apache.org/documentation.html#newproducerconfigs") Option(properties.get("bootstrap.servers")) - .getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName)) - .asInstanceOf[String] + .getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName)) + .asInstanceOf[String] } } http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 955fa44..309b653 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -20,23 +20,21 @@ package org.apache.samza.system.kafka import java.util +import java.util.{Properties, UUID} -import org.apache.samza.Partition -import org.apache.samza.SamzaException -import org.apache.samza.system.{ExtendedSystemAdmin, SystemStreamMetadata, SystemStreamPartition} -import org.apache.samza.util.{ ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging, KafkaUtil } +import kafka.admin.AdminUtils import kafka.api._ -import kafka.consumer.SimpleConsumer -import kafka.common.{ TopicExistsException, TopicAndPartition } -import kafka.consumer.ConsumerConfig +import kafka.common.{TopicAndPartition, TopicExistsException} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} import kafka.utils.ZkUtils -import java.util.{ Properties, UUID } +import org.apache.samza.config.KafkaConfig +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.system._ +import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging} +import org.apache.samza.{Partition, SamzaException} + import scala.collection.JavaConversions import scala.collection.JavaConversions._ -import org.apache.samza.system.SystemStreamMetadata.{OffsetType, SystemStreamPartitionMetadata} -import kafka.consumer.ConsumerConfig -import kafka.admin.AdminUtils -import org.apache.samza.util.KafkaUtil object KafkaSystemAdmin extends Logging { @@ -269,12 +267,12 @@ class KafkaSystemAdmin( } /** - * Returns the newest offset for the specified SSP. - * This method is fast and targeted. It minimizes the number of kafka requests. - * It does not retry indefinitely if there is any failure. - * It returns null if the topic is empty. To get the offsets for *all* - * partitions, it would be more efficient to call getSystemStreamMetadata - */ + * Returns the newest offset for the specified SSP. + * This method is fast and targeted. It minimizes the number of kafka requests. + * It does not retry indefinitely if there is any failure. + * It returns null if the topic is empty. To get the offsets for *all* + * partitions, it would be more efficient to call getSystemStreamMetadata + */ override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = { debug("Fetching newest offset for: %s" format ssp) var offset: String = null @@ -334,34 +332,14 @@ class KafkaSystemAdmin( override def createCoordinatorStream(streamName: String) { info("Attempting to create coordinator stream %s." format streamName) - new ExponentialSleepStrategy(initialDelayMs = 500).run( - loop => { - val zkClient = connectZk() - try { - AdminUtils.createTopic( - zkClient, - streamName, - 1, // Always one partition for coordinator stream. - coordinatorStreamReplicationFactor, - coordinatorStreamProperties) - } finally { - zkClient.close - } - info("Created coordinator stream %s." format streamName) - loop.done - }, + val streamSpec = new KafkaStreamSpec(streamName, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties) - (exception, loop) => { - exception match { - case e: TopicExistsException => - info("Coordinator stream %s already exists." format streamName) - loop.done - case e: Exception => - warn("Failed to create topic %s: %s. Retrying." format (streamName, e)) - debug("Exception detail:", e) - } - }) + if (createStream(streamSpec)) { + info("Created coordinator stream %s." format streamName) + } else { + info("Coordinator stream %s already exists." format streamName) + } } /** @@ -435,44 +413,57 @@ class KafkaSystemAdmin( offsets } - private def createTopicInKafka(topicName: String, numKafkaChangelogPartitions: Int) { - val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy - info("Attempting to create change log topic %s." format topicName) - info("Using partition count " + numKafkaChangelogPartitions + " for creating change log topic") - val topicMetaInfo = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName)) - retryBackoff.run( + /** + * @inheritdoc + */ + override def createStream(spec: StreamSpec): Boolean = { + val kSpec = KafkaStreamSpec.fromSpec(spec); + var streamCreated = false + + new ExponentialSleepStrategy(initialDelayMs = 500).run( loop => { val zkClient = connectZk() try { AdminUtils.createTopic( zkClient, - topicName, - numKafkaChangelogPartitions, - topicMetaInfo.replicationFactor, - topicMetaInfo.kafkaProps) + kSpec.getPhysicalName, + kSpec.getPartitionCount, + kSpec.getReplicationFactor, + kSpec.getProperties) } finally { zkClient.close } - info("Created changelog topic %s." format topicName) + streamCreated = true loop.done }, (exception, loop) => { exception match { case e: TopicExistsException => - info("Changelog topic %s already exists." format topicName) + streamCreated = false loop.done case e: Exception => - warn("Failed to create topic %s: %s. Retrying." format (topicName, e)) + warn("Failed to create topic %s: %s. Retrying." format (spec.getPhysicalName, e)) debug("Exception detail:", e) } }) + + streamCreated } - private def validateTopicInKafka(topicName: String, numKafkaChangelogPartitions: Int) { + /** + * @inheritdoc + * + * Validates a stream in Kafka. Should not be called before createStream(), + * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, + * is not read-only and will auto-create a new topic. + */ + override def validateStream(spec: StreamSpec): Unit = { + val topicName = spec.getPhysicalName + info("Validating topic %s." format topicName) + val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy - info("Validating changelog topic %s." format topicName) var metadataTTL = Long.MaxValue // Trust the cache until we get an exception retryBackoff.run( loop => { @@ -482,17 +473,17 @@ class KafkaSystemAdmin( KafkaUtil.maybeThrowException(topicMetadata.errorCode) val partitionCount = topicMetadata.partitionsMetadata.length - if (partitionCount < numKafkaChangelogPartitions) { - throw new KafkaChangelogException("Changelog topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format (topicName, topicMetadata.partitionsMetadata.length, numKafkaChangelogPartitions)) + if (partitionCount != spec.getPartitionCount) { + throw new StreamValidationException("Topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format (topicName, topicMetadata.partitionsMetadata.length, spec.getPartitionCount)) } - info("Successfully validated changelog topic %s." format topicName) + info("Successfully validated topic %s." format topicName) loop.done }, (exception, loop) => { exception match { - case e: KafkaChangelogException => throw e + case e: StreamValidationException => throw e case e: Exception => warn("While trying to validate topic %s: %s. Retrying." format (topicName, e)) debug("Exception detail:", e) @@ -502,24 +493,32 @@ class KafkaSystemAdmin( } /** - * Exception to be thrown when the change log stream creation or validation has failed - */ + * Exception to be thrown when the change log stream creation or validation has failed + */ class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) { def this(s: String) = this(s, null) } override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { - createTopicInKafka(topicName, numKafkaChangelogPartitions) - validateChangelogStream(topicName, numKafkaChangelogPartitions) + val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName)) + val spec = new KafkaStreamSpec(topicName, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps) + + if (createStream(spec)) { + info("Created changelog stream %s." format topicName) + } else { + info("Changelog stream %s already exists." format topicName) + } + + validateStream(spec) } /** - * Validates change log stream in Kafka. Should not be called before createChangelogStream(), - * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, is not read-only and - * will auto-create a new topic. - */ + * Validates a stream in Kafka. Should not be called before createStream(), + * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, is not read-only and + * will auto-create a new topic. + */ override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { - validateTopicInKafka(topicName, numKafkaChangelogPartitions) + validateStream(new KafkaStreamSpec(topicName, systemName, numKafkaChangelogPartitions)) } /** http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java new file mode 100644 index 0000000..a786468 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.StreamValidationException; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.util.Util; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.*; + + +public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { + + KafkaSystemAdmin basicSystemAdmin = createSystemAdmin(); + + + @Test + public void testCreateCoordinatorStreamDelegatesToCreateStream() { + KafkaSystemAdmin systemAdmin = createSystemAdmin();//coordProps, 3, new scala.collection.immutable.HashMap<>(), 1000); + SystemAdmin admin = Mockito.spy(systemAdmin); + StreamSpec spec = new StreamSpec("testId", "testCoordinatorStream", "testSystem"); + + admin.createCoordinatorStream(spec.getPhysicalName()); + admin.validateStream(spec); + + Mockito.verify(admin).createStream(Mockito.any()); + } + + @Test + public void testCreateChangelogStreamDelegatesToCreateStream() { + final String STREAM = "testChangeLogStream"; + final int PARTITIONS = 12; + final int REP_FACTOR = 3; + + Properties coordProps = new Properties(); + Properties changeLogProps = new Properties(); + changeLogProps.setProperty("cleanup.policy", "compact"); + changeLogProps.setProperty("segment.bytes", "139"); + Map<String, ChangelogInfo> changeLogMap = new HashMap<>(); + changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps)); + + SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap))); + StreamSpec spec = new StreamSpec(STREAM, STREAM, SYSTEM(), PARTITIONS); + admin.createChangelogStream(STREAM, PARTITIONS); + admin.validateStream(spec); + + ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class); + Mockito.verify(admin).createStream(specCaptor.capture()); + + StreamSpec internalSpec = specCaptor.getValue(); + assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor + assertEquals(STREAM, internalSpec.getId()); + assertEquals(SYSTEM(), internalSpec.getSystemName()); + assertEquals(STREAM, internalSpec.getPhysicalName()); + assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor()); + assertEquals(PARTITIONS, internalSpec.getPartitionCount()); + assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties()); + } + + @Test + public void testValidateChangelogStreamDelegatesToValidateStream() { + final String STREAM = "testChangeLogValidate"; + Properties coordProps = new Properties(); + Map<String, ChangelogInfo> changeLogMap = new HashMap<>(); + changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties())); + + KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)); + SystemAdmin admin = Mockito.spy(systemAdmin); + StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12); + + admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount()); + admin.validateStream(spec); + admin.validateChangelogStream(spec.getPhysicalName(), spec.getPartitionCount()); + + Mockito.verify(admin).createStream(Mockito.any()); + Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any()); + } + + @Test + public void testCreateStream() { + SystemAdmin admin = this.basicSystemAdmin; + StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8); + + assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec)); + admin.validateStream(spec); + + assertFalse("createStream should return false if the stream already exists.", admin.createStream(spec)); + } + + @Test(expected = StreamValidationException.class) + public void testValidateStreamDoesNotExist() { + SystemAdmin admin = this.basicSystemAdmin; + + StreamSpec spec = new StreamSpec("testId", "testStreamNameExist", "testSystem", 8); + + admin.validateStream(spec); + } + + @Test(expected = StreamValidationException.class) + public void testValidateStreamWrongPartitionCount() { + SystemAdmin admin = this.basicSystemAdmin; + StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8); + StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4); + + assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec1)); + + admin.validateStream(spec2); + } + + @Test(expected = StreamValidationException.class) + public void testValidateStreamWrongName() { + SystemAdmin admin = this.basicSystemAdmin; + StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8); + StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8); + + assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec1)); + + admin.validateStream(spec2); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8815b039/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index 0e3c9b5..be7db97 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -21,18 +21,16 @@ package org.apache.samza.system.kafka -import java.util -import java.util.Properties +import java.util.{Properties, UUID} import kafka.admin.AdminUtils import kafka.common.{ErrorMapping, LeaderNotAvailableException} import kafka.consumer.{Consumer, ConsumerConfig} -import kafka.server.{KafkaConfig, KafkaServer} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import kafka.utils.{TestUtils, ZkUtils} import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{TestUtils, ZkUtils} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.security.JaasUtils - import org.apache.samza.Partition import org.apache.samza.config.KafkaProducerConfig import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata @@ -43,7 +41,9 @@ import org.junit._ import scala.collection.JavaConversions._ - +/** + * README: New tests should be added to the Java tests. See TestKafkaSystemAdminJava + */ object TestKafkaSystemAdmin extends KafkaServerTestHarness { val SYSTEM = "kafka" @@ -136,6 +136,14 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { Consumer.create(consumerConfig) } + def createSystemAdmin: KafkaSystemAdmin = { + new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) + } + + def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = { + new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation) + } + } /** @@ -146,7 +154,7 @@ class TestKafkaSystemAdmin { import TestKafkaSystemAdmin._ // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated - val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) + val systemAdmin = createSystemAdmin @Test def testShouldAssembleMetadata {
