Repository: samza Updated Branches: refs/heads/master 8f7f56744 -> 06702af8f
SAMZA-1387: Unable to Start Samza App Because Regex Check Author: Jacob Maes <[email protected]> Reviewers: Fred Ji <[email protected]> Closes #266 from jmakes/samza-1387 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/06702af8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/06702af8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/06702af8 Branch: refs/heads/master Commit: 06702af8fda1d016ae55461c404b55b84b20ffd2 Parents: 8f7f567 Author: Jacob Maes <[email protected]> Authored: Fri Aug 11 09:28:20 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Fri Aug 11 09:28:20 2017 -0700 ---------------------------------------------------------------------- .../samza/system/kafka/KafkaSystemAdmin.scala | 7 ++++--- .../system/kafka/TestKafkaSystemAdminJava.java | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/06702af8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index af77d5b..1e59b61 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -38,8 +38,9 @@ import scala.collection.JavaConverters._ object KafkaSystemAdmin extends Logging { // Use a dummy string for the stream id. The physical name and partition count are all that matter for changelog creation, so the dummy string should not be used. - // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317 + // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317 and 1387 val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id" + val COORDINATOR_STREAMID = "unused-temp-coordinator-stream-id" /** * A helper method that takes oldest, newest, and upcoming offsets for each @@ -331,7 +332,7 @@ class KafkaSystemAdmin( override def createCoordinatorStream(streamName: String) { info("Attempting to create coordinator stream %s." format streamName) - val streamSpec = new KafkaStreamSpec(streamName, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties) + val streamSpec = new KafkaStreamSpec(COORDINATOR_STREAMID, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties) if (createStream(streamSpec)) { info("Created coordinator stream %s." format streamName) @@ -496,7 +497,7 @@ class KafkaSystemAdmin( class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) { def this(s: String) = this(s, null) } - + override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName)) val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps) http://git-wip-us.apache.org/repos/asf/samza/blob/06702af8/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java index ce59b40..33c4017 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java @@ -51,6 +51,25 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { } @Test + public void testCreateCoordinatorStreamDelegatesToCreateStream_specialCharsInTopicName() { + final String STREAM = "test.Coord_inator.Stream"; + + SystemAdmin admin = Mockito.spy(createSystemAdmin()); + StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM()); + admin.createCoordinatorStream(STREAM); + admin.validateStream(spec); + + ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class); + Mockito.verify(admin).createStream(specCaptor.capture()); + + StreamSpec internalSpec = specCaptor.getValue(); + assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor + assertEquals(KafkaSystemAdmin.COORDINATOR_STREAMID(), internalSpec.getId()); + assertEquals(SYSTEM(), internalSpec.getSystemName()); + assertEquals(STREAM, internalSpec.getPhysicalName()); + } + + @Test public void testCreateChangelogStreamDelegatesToCreateStream() { final String STREAM = "testChangeLogStream"; final int PARTITIONS = 12;
