Repository: samza Updated Branches: refs/heads/master 668a952ac -> 63ccf5eb1
SAMZA-944 - Broadcast stream is not added properly in the prioritized tiers in the DefaultChooser Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/63ccf5eb Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/63ccf5eb Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/63ccf5eb Branch: refs/heads/master Commit: 63ccf5eb14f07840a61fdb8b44d2750019e56181 Parents: 668a952 Author: Jacob Maes <[email protected]> Authored: Thu May 5 10:30:33 2016 -0700 Committer: Navina Ramesh <[email protected]> Committed: Thu May 5 10:30:33 2016 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 2 + .../samza/config/DefaultChooserConfig.java | 85 ++++++++++++++++++++ .../org/apache/samza/config/TaskConfigJava.java | 34 +++++++- .../samza/config/DefaultChooserConfig.scala | 50 ------------ .../samza/system/chooser/DefaultChooser.scala | 36 +++------ .../system/chooser/TestDefaultChooser.scala | 57 ++++++++++--- 6 files changed, 177 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 9afab88..8d77486 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -31,6 +31,8 @@ <allow pkg="org.apache.log4j" /> <allow pkg="org.apache.kafka" /> <allow pkg="org.apache.commons" /> + <allow class="scala.collection.JavaConversions" /> + <allow class="scala.collection.JavaConverters" /> <subpackage name="config"> <allow class="org.apache.samza.SamzaException" /> http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java new file mode 100644 index 0000000..d242d14 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.config; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.samza.system.SystemStream; + + +/** + * A convenience class for fetching configs related to the {@link org.apache.samza.system.chooser.DefaultChooser} + */ +public class DefaultChooserConfig extends MapConfig { + public static final String BOOTSTRAP = StreamConfig.STREAM_PREFIX() + "samza.bootstrap"; + public static final String PRIORITY = StreamConfig.STREAM_PREFIX() + "samza.priority"; + public static final String BATCH_SIZE = "task.consumer.batch.size"; + + private final TaskConfigJava taskConfigJava; + + public DefaultChooserConfig(Config config) { + super(config); + taskConfigJava = new TaskConfigJava(config); + } + + /** + * @return the configured batch size, or 0 if it was not configured. + */ + public int getChooserBatchSize() { + return getInt(BATCH_SIZE, 0); + } + + /** + * @return the set of SystemStreams which were configured as bootstrap streams. + */ + public Set<SystemStream> getBootstrapStreams() { + Set<SystemStream> bootstrapInputs = new HashSet<>(); + Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams(); + for (SystemStream systemStream : allInputs) { + if (getBoolean(String.format(BOOTSTRAP, systemStream.getSystem(), systemStream.getStream()), false)) { + bootstrapInputs.add(systemStream); + } + } + return Collections.unmodifiableSet(bootstrapInputs); + } + + /** + * Gets the priority of every SystemStream for which the priority + * was explicitly configured with a value >=0. + * + * @return the explicitly-configured stream priorities as a map from + * SystemStream to the configured priority value. Streams that + * were not explicitly configured with a priority are not returned. + */ + public Map<SystemStream, Integer> getPriorityStreams() { + Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams(); + + Map<SystemStream, Integer> priorityStreams = new HashMap<>(); + for (SystemStream systemStream : allInputs) { + int priority = getInt(String.format(PRIORITY, systemStream.getSystem(), systemStream.getStream()), -1); + if (priority >= 0) { + priorityStreams.put(systemStream, priority); + } + } + return Collections.unmodifiableMap(priorityStreams); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java index 8acb6ca..021d42a 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java @@ -19,6 +19,7 @@ package org.apache.samza.config; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -30,6 +31,8 @@ import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.collection.JavaConversions; + public class TaskConfigJava extends MapConfig { // broadcast streams consumed by all tasks. e.g. kafka.foo#1 @@ -52,7 +55,7 @@ public class TaskConfigJava extends MapConfig { */ public Set<SystemStreamPartition> getBroadcastSystemStreamPartitions() { HashSet<SystemStreamPartition> systemStreamPartitionSet = new HashSet<SystemStreamPartition>(); - List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS); + List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS, Collections.<String>emptyList()); for (String systemStreamPartition : systemStreamPartitions) { int hashPosition = systemStreamPartition.indexOf("#"); @@ -85,4 +88,33 @@ public class TaskConfigJava extends MapConfig { } return systemStreamPartitionSet; } + + /** + * Get the SystemStreams for the configured broadcast streams. + * + * @return the set of SystemStreams for which there are broadcast stream SSPs configured. + */ + public Set<SystemStream> getBroadcastSystemStreams() { + Set<SystemStream> broadcastSS = new HashSet<>(); + Set<SystemStreamPartition> broadcastSSPs = getBroadcastSystemStreamPartitions(); + for (SystemStreamPartition bssp : broadcastSSPs) { + broadcastSS.add(bssp.getSystemStream()); + } + return Collections.unmodifiableSet(broadcastSS); + } + + /** + * Get the SystemStreams for the configured input and broadcast streams. + * + * @return the set of SystemStreams for both standard inputs and broadcast stream inputs. + */ + public Set<SystemStream> getAllInputStreams() { + Set<SystemStream> allInputSS = new HashSet<>(); + + TaskConfig taskConfig = TaskConfig.Config2Task(this); + allInputSS.addAll(JavaConversions.asJavaSet(taskConfig.getInputStreams())); + allInputSS.addAll(getBroadcastSystemStreams()); + + return Collections.unmodifiableSet(allInputSS); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala deleted file mode 100644 index 4224393..0000000 --- a/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config - -import org.apache.samza.system.SystemStream -import TaskConfig._ - -object DefaultChooserConfig { - val BOOTSTRAP = StreamConfig.STREAM_PREFIX + "samza.bootstrap" - val PRIORITY = StreamConfig.STREAM_PREFIX + "samza.priority" - val BATCH_SIZE = "task.consumer.batch.size" - - implicit def Config2DefaultChooser(config: Config) = new DefaultChooserConfig(config) -} - -class DefaultChooserConfig(config: Config) extends ScalaMapConfig(config) { - import DefaultChooserConfig._ - - def getChooserBatchSize = getOption(BATCH_SIZE) - - def getBootstrapStreams = config - .getInputStreams - .map(systemStream => (systemStream, getOrElse(BOOTSTRAP format (systemStream.getSystem, systemStream.getStream), "false").equals("true"))) - .filter(_._2.equals(true)) - .map(_._1) - - def getPriorityStreams = config - .getInputStreams - .map(systemStream => (systemStream, getOrElse(PRIORITY format (systemStream.getSystem, systemStream.getStream), "-1").toInt)) - .filter(_._2 >= 0) - .toMap -} - http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala index 95bd188..b433713 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala @@ -19,50 +19,40 @@ package org.apache.samza.system.chooser -import scala.collection.JavaConversions._ import org.apache.samza.SamzaException -import org.apache.samza.config.Config -import org.apache.samza.config.SystemConfig._ -import org.apache.samza.config.DefaultChooserConfig._ -import org.apache.samza.config.TaskConfig._ -import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.system.SystemFactory -import org.apache.samza.system.SystemStream -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.Util -import org.apache.samza.system.SystemAdmin -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.system.SystemStreamMetadata +import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfigJava} +import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap} +import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition} import org.apache.samza.util.Logging +import scala.collection.JavaConverters._ + + object DefaultChooser extends Logging { def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry) = { - val batchSize = config.getChooserBatchSize match { - case Some(batchSize) => Some(batchSize.toInt) - case _ => None - } + val chooserConfig = new DefaultChooserConfig(config) + val batchSize = if (chooserConfig.getChooserBatchSize > 0) Some(chooserConfig.getChooserBatchSize) else None debug("Got batch size: %s" format batchSize) // Normal streams default to priority 0. - val defaultPrioritizedStreams = config - .getInputStreams + val defaultPrioritizedStreams = new TaskConfigJava(config) + .getAllInputStreams.asScala .map((_, 0)) .toMap debug("Got default priority streams: %s" format defaultPrioritizedStreams) // Bootstrap streams default to Int.MaxValue priority. - val prioritizedBootstrapStreams = config - .getBootstrapStreams + val prioritizedBootstrapStreams = chooserConfig + .getBootstrapStreams.asScala .map((_, Int.MaxValue)) .toMap debug("Got bootstrap priority streams: %s" format prioritizedBootstrapStreams) // Explicitly prioritized streams are set to whatever they were configured to. - val prioritizedStreams = config.getPriorityStreams + val prioritizedStreams = chooserConfig.getPriorityStreams.asScala.mapValues(_.asInstanceOf[Int]) debug("Got prioritized streams: %s" format prioritizedStreams) http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala index 0909956..7fb70b2 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala @@ -19,17 +19,13 @@ package org.apache.samza.system.chooser -import org.junit.Assert._ -import org.junit.Test -import org.apache.samza.util.BlockingEnvelopeMap -import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition -import org.apache.samza.config.MapConfig -import org.apache.samza.config.DefaultChooserConfig -import org.apache.samza.system.SystemStream -import org.apache.samza.system.SystemStreamMetadata +import org.apache.samza.config.{DefaultChooserConfig, MapConfig} import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition} +import org.apache.samza.util.BlockingEnvelopeMap +import org.junit.Assert._ +import org.junit.Test import scala.collection.JavaConversions._ @@ -141,11 +137,10 @@ class TestDefaultChooser { @Test def testBootstrapConfig { - import DefaultChooserConfig.Config2DefaultChooser val configMap = Map( "task.inputs" -> "kafka.foo,kafka.bar-baz", "systems.kafka.streams.bar-baz.samza.bootstrap" -> "true") - val config = new MapConfig(configMap) + val config = new DefaultChooserConfig(new MapConfig(configMap)) val bootstrapStreams = config.getBootstrapStreams assertEquals(1, bootstrapStreams.size) assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "bar-baz"))) @@ -153,15 +148,51 @@ class TestDefaultChooser { @Test def testPriorityConfig { - import DefaultChooserConfig.Config2DefaultChooser val configMap = Map( "task.inputs" -> "kafka.foo,kafka.bar-baz", "systems.kafka.streams.bar-baz.samza.priority" -> "3") - val config = new MapConfig(configMap) + val config = new DefaultChooserConfig(new MapConfig(configMap)) val priorityStreams = config.getPriorityStreams assertEquals(1, priorityStreams.size) assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz"))) } + + @Test + def testBroadcastOnlyConfig { + val configMap = Map( + "task.broadcast.inputs" -> "kafka.foo#[1-2],kafka.bar-baz#5,kafka.fizz#0", + "systems.kafka.streams.bar-baz.samza.priority" -> "3", + "systems.kafka.streams.fizz.samza.bootstrap" -> "true") + val config = new DefaultChooserConfig(new MapConfig(configMap)) + val priorityStreams = config.getPriorityStreams + assertEquals(1, priorityStreams.size) + assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz"))) + + val bootstrapStreams = config.getBootstrapStreams + assertEquals(1, bootstrapStreams.size()) + assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "fizz"))) + } + + @Test + def testBroadcastAndStandardInputConfig { + val configMap = Map( + "task.broadcast.inputs" -> "kafka.foo#[1-2],kafka.bar-baz#5,kafka.fizz#0", + "task.inputs" -> "kafka.bootstrapTopic,kafka.priorityTopic,kafka.normalTopic", + "systems.kafka.streams.priorityTopic.samza.priority" -> "2", + "systems.kafka.streams.bar-baz.samza.priority" -> "3", + "systems.kafka.streams.bootstrapTopic.samza.bootstrap" -> "true", + "systems.kafka.streams.fizz.samza.bootstrap" -> "true") + val config = new DefaultChooserConfig(new MapConfig(configMap)) + val priorityStreams = config.getPriorityStreams + assertEquals(2, priorityStreams.size) + assertEquals(2, priorityStreams(new SystemStream("kafka", "priorityTopic"))) + assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz"))) + + val bootstrapStreams = config.getBootstrapStreams + assertEquals(2, bootstrapStreams.size()) + assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "bootstrapTopic"))) + assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "fizz"))) + } } class MockBlockingEnvelopeMap extends BlockingEnvelopeMap {
