SAMZA-1415: Add clearStream API in SystemAdmin and remove deprecated APIs The patch does the following:
1) add clearStream() APi in SystemAdmin. Currently it's only supported in Kafka with broker configuring delete.topic.enable=true. 2) remove the deprecated APIs including createChangeLogStream(), validateChangelogStream() and createCoordinatorStream(). Author: Xinyu Liu <[email protected]> Reviewers: Jake Maes <[email protected]> Closes #292 from xinyuiscool/SAMZA-1415 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/79200c73 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/79200c73 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/79200c73 Branch: refs/heads/master Commit: 79200c73509727abba2d6eb68cfc45ac1d842d35 Parents: 23bfaa8 Author: Xinyu Liu <[email protected]> Authored: Thu Sep 7 16:49:20 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Thu Sep 7 16:49:20 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/system/StreamSpec.java | 22 +++ .../org/apache/samza/system/SystemAdmin.java | 42 ++---- ...inglePartitionWithoutOffsetsSystemAdmin.java | 16 -- .../samza/coordinator/JobModelManager.scala | 8 +- .../scala/org/apache/samza/job/JobRunner.scala | 9 +- .../samza/storage/TaskStorageManager.scala | 12 +- .../MockCoordinatorStreamSystemFactory.java | 13 +- .../samza/execution/TestExecutionPlanner.java | 15 -- .../samza/checkpoint/TestOffsetManager.scala | 12 -- .../samza/container/TestTaskInstance.scala | 3 - .../samza/coordinator/TestJobCoordinator.scala | 12 -- .../samza/storage/TestTaskStorageManager.scala | 6 +- .../elasticsearch/ElasticsearchSystemAdmin.java | 15 -- .../samza/system/hdfs/HdfsSystemAdmin.java | 15 -- .../samza/system/kafka/KafkaSystemAdmin.scala | 93 +++++++----- .../system/kafka/TestKafkaSystemAdminJava.java | 151 ++++++++++--------- .../system/kafka/TestKafkaSystemAdmin.scala | 5 +- .../samza/system/mock/MockSystemAdmin.java | 15 -- .../samza/test/util/SimpleSystemAdmin.java | 15 -- .../apache/samza/job/yarn/MockSystemAdmin.scala | 12 -- 20 files changed, 192 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java index 49531dd..384fecc 100644 --- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java +++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java @@ -37,6 +37,12 @@ public class StreamSpec { private static final int DEFAULT_PARTITION_COUNT = 1; + // Internal changelog stream id. It is used for creating changelog StreamSpec. + private static final String CHANGELOG_STREAM_ID = "samza-internal-changelog-stream-id"; + + // Internal coordinator stream id. It is used for creating coordinator StreamSpec. + private static final String COORDINATOR_STREAM_ID = "samza-internal-coordinator-stream-id"; + /** * Unique identifier for the stream in a Samza application. * This identifier is used as a key for stream properties in the @@ -200,6 +206,14 @@ public class StreamSpec { return new SystemStream(systemName, physicalName); } + public boolean isChangeLogStream() { + return id.equals(CHANGELOG_STREAM_ID); + } + + public boolean isCoordinatorStream() { + return id.equals(COORDINATOR_STREAM_ID); + } + private void validateLogicalIdentifier(String identifierName, String identifierValue) { if (identifierValue == null || !identifierValue.matches("[A-Za-z0-9_-]+")) { throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue)); @@ -220,4 +234,12 @@ public class StreamSpec { public int hashCode() { return id.hashCode(); } + + public static StreamSpec createChangeLogStreamSpec(String physicalName, String systemName, int partitionCount) { + return new StreamSpec(CHANGELOG_STREAM_ID, physicalName, systemName, partitionCount); + } + + public static StreamSpec createCoordinatorStreamSpec(String physicalName, String systemName) { + return new StreamSpec(COORDINATOR_STREAM_ID, physicalName, systemName, 1); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index b180712..e765540 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -50,38 +50,6 @@ public interface SystemAdmin { Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames); /** - * An API to create a change log stream - * - * @param streamName - * The name of the stream to be created in the underlying stream - * @param numOfPartitions - * The number of partitions in the changelog stream - * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)} - */ - void createChangelogStream(String streamName, int numOfPartitions); - - /** - * Validates change log stream - * - * @param streamName - * The name of the stream to be created in the underlying stream - * @param numOfPartitions - * The number of partitions in the changelog stream - * @deprecated since 0.12.1, use {@link #validateStream(StreamSpec)} - */ - void validateChangelogStream(String streamName, int numOfPartitions); - - /** - * Create a stream for the job coordinator. If the stream already exists, this - * call should simply return. - * - * @param streamName - * The name of the coordinator stream to create. - * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)} - */ - void createCoordinatorStream(String streamName); - - /** * Compare the two offsets. -1, 0, +1 means offset1 < offset2, * offset1 == offset2 and offset1 > offset2 respectively. Return * null if those two offsets are not comparable @@ -114,4 +82,14 @@ public interface SystemAdmin { default void validateStream(StreamSpec streamSpec) throws StreamValidationException { throw new UnsupportedOperationException(); } + + /** + * Clear the stream described by the spec. + * @param streamSpec The spec for the physical stream on the system. + * @return {@code true} if the stream was successfully cleared. + * {@code false} if clearing stream failed. + */ + default boolean clearStream(StreamSpec streamSpec) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java index 2157e69..49f7da0 100644 --- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import org.apache.samza.Partition; -import org.apache.samza.SamzaException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata; @@ -56,16 +55,6 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin { } @Override - public void createChangelogStream(String streamName, int numOfPartitions) { - throw new SamzaException("Method not implemented"); - } - - @Override - public void validateChangelogStream(String streamName, int numOfPartitions) { - throw new SamzaException("Method not implemented"); - } - - @Override public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>(); @@ -77,11 +66,6 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin { } @Override - public void createCoordinatorStream(String streamName) { - throw new UnsupportedOperationException("Single partition admin can't create coordinator streams."); - } - - @Override public Integer offsetComparator(String offset1, String offset2) { return null; } http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 6319173..42bedec 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -304,7 +304,13 @@ object JobModelManager extends Logging { .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem)) ).getAdmin(systemStream.getSystem, config) - systemAdmin.createChangelogStream(systemStream.getStream, changeLogPartitions) + val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogPartitions) + if (systemAdmin.createStream(changelogSpec)) { + info("Created changelog stream %s." format systemStream.getStream) + } else { + info("Changelog stream %s already exists." format systemStream.getStream) + } + systemAdmin.validateStream(changelogSpec) } } http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index f34db99..0e973e9 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -29,6 +29,7 @@ import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish} import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine import org.apache.samza.runtime.ApplicationRunnerOperation +import org.apache.samza.system.StreamSpec import org.apache.samza.util.{Logging, Util} import scala.collection.JavaConverters._ @@ -85,7 +86,13 @@ class JobRunner(config: Config) extends Logging { info("Creating coordinator stream") val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config) val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config) - systemAdmin.createCoordinatorStream(coordinatorSystemStream.getStream) + val streamName = coordinatorSystemStream.getStream + val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem) + if (systemAdmin.createStream(coordinatorSpec)) { + info("Created coordinator stream %s." format streamName) + } else { + info("Coordinator stream %s already exists." format streamName) + } if (resetJobConfig) { info("Storing config in coordinator stream.") http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 977ac5b..0879e9a 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -25,14 +25,7 @@ import java.util import org.apache.samza.config.StorageConfig import org.apache.samza.{Partition, SamzaException} import org.apache.samza.container.TaskName -import org.apache.samza.system.StreamMetadataCache -import org.apache.samza.system.SystemAdmin -import org.apache.samza.system.SystemConsumer -import org.apache.samza.system.SystemStream -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.system.SystemStreamPartitionIterator -import org.apache.samza.system.ExtendedSystemAdmin -import org.apache.samza.system.SystemStreamMetadata +import org.apache.samza.system._ import org.apache.samza.util.Logging import org.apache.samza.util.Util import org.apache.samza.util.Clock @@ -218,7 +211,8 @@ class TaskStorageManager( val systemAdmin = systemAdmins .getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream)) - systemAdmin.validateChangelogStream(systemStream.getStream, changeLogStreamPartitions) + val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions) + systemAdmin.validateStream(changelogSpec) } val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet) http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java index 662c737..6413413 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java @@ -25,14 +25,7 @@ import org.apache.samza.config.ConfigException; import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.serializers.JsonSerde; -import org.apache.samza.system.SystemAdmin; -import org.apache.samza.system.SystemConsumer; -import org.apache.samza.system.SystemFactory; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.system.SystemProducer; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.*; import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin; import org.apache.samza.util.Util; @@ -208,8 +201,10 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { } public static final class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin { - public void createCoordinatorStream(String streamName) { + @Override + public boolean createStream(StreamSpec streamSpec) { // Do nothing. + return true; } } http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index 2c8f682..c4fd8f7 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -93,21 +93,6 @@ public class TestExecutionPlanner { } @Override - public void createChangelogStream(String streamName, int numOfPartitions) { - - } - - @Override - public void validateChangelogStream(String streamName, int numOfPartitions) { - - } - - @Override - public void createCoordinatorStream(String streamName) { - - } - - @Override public Integer offsetComparator(String offset1, String offset2) { return null; } http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index abfc63f..48504a9 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -340,18 +340,6 @@ class TestOffsetManager { def getSystemStreamMetadata(streamNames: java.util.Set[String]) = Map[String, SystemStreamMetadata]().asJava - override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) { - new UnsupportedOperationException("Method not implemented.") - } - - override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) { - new UnsupportedOperationException("Method not implemented.") - } - - override def createCoordinatorStream(streamName: String) { - new UnsupportedOperationException("Method not implemented.") - } - override def offsetComparator(offset1: String, offset2: String) = null } } http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 9025077..dcb06d3 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -398,9 +398,6 @@ class TestTaskInstance { class MockSystemAdmin extends SystemAdmin { override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { offsets } override def getSystemStreamMetadata(streamNames: java.util.Set[String]) = null - override def createCoordinatorStream(stream: String) = {} - override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {} - override def validateChangelogStream(topicName: String, numOfPartitions: Int) = {} override def offsetComparator(offset1: String, offset2: String) = { offset1.toLong compare offset2.toLong http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index 0b6dd8b..e6b148b 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -318,18 +318,6 @@ class MockSystemAdmin extends ExtendedSystemAdmin { Map(streamNames.asScala.toList.head -> new SystemStreamMetadata("foo", partitionMetadata.asJava)).asJava } - override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) { - new UnsupportedOperationException("Method not implemented.") - } - - override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) { - new UnsupportedOperationException("Method not implemented.") - } - - override def createCoordinatorStream(streamName: String) { - new UnsupportedOperationException("Method not implemented.") - } - override def offsetComparator(offset1: String, offset2: String) = null override def getSystemStreamPartitionCounts(streamNames: util.Set[String], http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala index 2495baf..ea4d37b 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala @@ -95,7 +95,8 @@ class TestTaskStorageManager extends MockitoSugar { val mockStreamMetadataCache = mock[StreamMetadataCache] val mockSystemConsumer = mock[SystemConsumer] val mockSystemAdmin = mock[SystemAdmin] - doNothing().when(mockSystemAdmin).validateChangelogStream("testStream", 1) + val changelogSpec = StreamSpec.createChangeLogStreamSpec("testStream", "kafka", 1) + doNothing().when(mockSystemAdmin).validateStream(changelogSpec) var registerOffset = "0" when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { @@ -204,7 +205,8 @@ class TestTaskStorageManager extends MockitoSugar { // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin val mockStreamMetadataCache = mock[StreamMetadataCache] val mockSystemAdmin = mock[SystemAdmin] - doNothing().when(mockSystemAdmin).validateChangelogStream("testStream", 1) + val changelogSpec = StreamSpec.createChangeLogStreamSpec("testStream", "kafka", 1) + doNothing().when(mockSystemAdmin).validateStream(changelogSpec) val mockSystemConsumer = mock[SystemConsumer] when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] { http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java index 446534a..3cadce0 100644 --- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java +++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java @@ -54,21 +54,6 @@ public class ElasticsearchSystemAdmin implements SystemAdmin { } @Override - public void createChangelogStream(String stream, int foo) { - throw new UnsupportedOperationException(); - } - - @Override - public void createCoordinatorStream(String streamName) { - throw new UnsupportedOperationException(); - } - - @Override - public void validateChangelogStream(String streamName, int numOfPartitions) { - throw new UnsupportedOperationException(); - } - - @Override public Integer offsetComparator(String offset1, String offset2) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java index f5b05fb..9251db0 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java @@ -201,21 +201,6 @@ public class HdfsSystemAdmin implements SystemAdmin { return systemStreamMetadataMap; } - @Override - public void createChangelogStream(String streamName, int numOfPartitions) { - throw new UnsupportedOperationException("HDFS doesn't support change log stream."); - } - - @Override - public void validateChangelogStream(String streamName, int numOfPartitions) { - throw new UnsupportedOperationException("HDFS doesn't support change log stream."); - } - - @Override - public void createCoordinatorStream(String streamName) { - throw new UnsupportedOperationException("HDFS doesn't support coordinator stream."); - } - /** * Compare two multi-file style offset. A multi-file style offset consist of both * the file index as well as the offset within that file. And the format of it is: http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index af77d5b..6e582e9 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -37,9 +37,8 @@ import scala.collection.JavaConverters._ object KafkaSystemAdmin extends Logging { - // Use a dummy string for the stream id. The physical name and partition count are all that matter for changelog creation, so the dummy string should not be used. - // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317 - val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id" + + val CLEAR_STREAM_RETRIES = 3 /** * A helper method that takes oldest, newest, and upcoming offsets for each @@ -328,23 +327,11 @@ class KafkaSystemAdmin( offset } - override def createCoordinatorStream(streamName: String) { - info("Attempting to create coordinator stream %s." format streamName) - - val streamSpec = new KafkaStreamSpec(streamName, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties) - - if (createStream(streamSpec)) { - info("Created coordinator stream %s." format streamName) - } else { - info("Coordinator stream %s already exists." format streamName) - } - } - /** * Helper method to use topic metadata cache when fetching metadata, so we * don't hammer Kafka more than we need to. */ - protected def getTopicMetadata(topics: Set[String]) = { + def getTopicMetadata(topics: Set[String]) = { new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout) .getTopicInfo(topics) } @@ -415,7 +402,7 @@ class KafkaSystemAdmin( * @inheritdoc */ override def createStream(spec: StreamSpec): Boolean = { - val kSpec = KafkaStreamSpec.fromSpec(spec); + val kSpec = toKafkaSpec(spec) var streamCreated = false new ExponentialSleepStrategy(initialDelayMs = 500).run( @@ -451,6 +438,23 @@ class KafkaSystemAdmin( } /** + * Converts a StreamSpec into a KafakStreamSpec. Special handling for coordinator and changelog stream. + * @param spec a StreamSpec object + * @return KafkaStreamSpec object + */ + def toKafkaSpec(spec: StreamSpec): KafkaStreamSpec = { + if (spec.isChangeLogStream) { + val topicName = spec.getPhysicalName + val topicMeta = topicMetaInformation.getOrElse(topicName, throw new StreamValidationException("Unable to find topic information for topic " + topicName)) + new KafkaStreamSpec(spec.getId, topicName, systemName, spec.getPartitionCount, topicMeta.replicationFactor, topicMeta.kafkaProps) + } else if (spec.isCoordinatorStream){ + new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties) + } else { + KafkaStreamSpec.fromSpec(spec) + } + } + + /** * @inheritdoc * * Validates a stream in Kafka. Should not be called before createStream(), @@ -491,32 +495,41 @@ class KafkaSystemAdmin( } /** - * Exception to be thrown when the change log stream creation or validation has failed - */ - class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) { - def this(s: String) = this(s, null) - } - - override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { - val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName)) - val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps) + * @inheritdoc + * + * Delete a stream in Kafka. Deleting topics works only when the broker is configured with "delete.topic.enable=true". + * Otherwise it's a no-op. + */ + override def clearStream(spec: StreamSpec): Boolean = { + val kSpec = KafkaStreamSpec.fromSpec(spec) + var retries = CLEAR_STREAM_RETRIES + new ExponentialSleepStrategy().run( + loop => { + val zkClient = connectZk() + try { + AdminUtils.deleteTopic( + zkClient, + kSpec.getPhysicalName) + } finally { + zkClient.close + } - if (createStream(spec)) { - info("Created changelog stream %s." format topicName) - } else { - info("Changelog stream %s already exists." format topicName) - } + loop.done + }, - validateStream(spec) - } + (exception, loop) => { + if (retries > 0) { + warn("Exception while trying to delete topic %s: %s. Retrying." format (spec.getPhysicalName, exception)) + retries -= 1 + } else { + warn("Fail to delete topic %s: %s" format (spec.getPhysicalName, exception)) + loop.done + throw exception + } + }) - /** - * Validates a stream in Kafka. Should not be called before createStream(), - * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, is not read-only and - * will auto-create a new topic. - */ - override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { - validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions)) + val topicMetadata = getTopicMetadata(Set(kSpec.getPhysicalName)).get(kSpec.getPhysicalName).get + topicMetadata.partitionsMetadata.isEmpty } /** http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java index ce59b40..51af518 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java @@ -19,15 +19,16 @@ package org.apache.samza.system.kafka; +import java.util.*; import java.util.HashMap; import java.util.Map; -import java.util.Properties; + +import kafka.api.TopicMetadata; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.StreamValidationException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.util.Util; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static org.junit.Assert.*; @@ -39,53 +40,48 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { @Test - public void testCreateCoordinatorStreamDelegatesToCreateStream() { + public void testCreateCoordinatorStream() { KafkaSystemAdmin systemAdmin = createSystemAdmin();//coordProps, 3, new scala.collection.immutable.HashMap<>(), 1000); SystemAdmin admin = Mockito.spy(systemAdmin); - StreamSpec spec = new StreamSpec("testId", "testCoordinatorStream", "testSystem"); + StreamSpec spec = StreamSpec.createCoordinatorStreamSpec("testCoordinatorStream", "testSystem"); - admin.createCoordinatorStream(spec.getPhysicalName()); + admin.createStream(spec); admin.validateStream(spec); Mockito.verify(admin).createStream(Mockito.any()); } @Test - public void testCreateChangelogStreamDelegatesToCreateStream() { - final String STREAM = "testChangeLogStream"; - final int PARTITIONS = 12; - final int REP_FACTOR = 3; + public void testCreateCoordinatorStreamWithSpecialCharsInTopicName() { + final String STREAM = "test.coordinator_test.Stream"; Properties coordProps = new Properties(); - Properties changeLogProps = new Properties(); - changeLogProps.setProperty("cleanup.policy", "compact"); - changeLogProps.setProperty("segment.bytes", "139"); Map<String, ChangelogInfo> changeLogMap = new HashMap<>(); - changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps)); - SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap))); - StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM(), PARTITIONS); - admin.createChangelogStream(STREAM, PARTITIONS); + KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap))); + StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(STREAM, SYSTEM()); + + Mockito.doAnswer(invocationOnMock -> { + StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod(); + assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor + assertTrue(internalSpec.isCoordinatorStream()); + assertEquals(SYSTEM(), internalSpec.getSystemName()); + assertEquals(STREAM, internalSpec.getPhysicalName()); + assertEquals(1, internalSpec.getPartitionCount()); + + return internalSpec; + }).when(admin).toKafkaSpec(Mockito.any()); + + admin.createStream(spec); admin.validateStream(spec); - ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class); - Mockito.verify(admin).createStream(specCaptor.capture()); - - StreamSpec internalSpec = specCaptor.getValue(); - assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor - assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId()); - assertEquals(SYSTEM(), internalSpec.getSystemName()); - assertEquals(STREAM, internalSpec.getPhysicalName()); - assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor()); - assertEquals(PARTITIONS, internalSpec.getPartitionCount()); - assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties()); } @Test - public void testCreateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() { - final String STREAM = "test.Change_Log.Stream"; + public void testCreateChangelogStream() { + final String STREAM = "testChangeLogStream"; final int PARTITIONS = 12; - final int REP_FACTOR = 3; + final int REP_FACTOR = 1; Properties coordProps = new Properties(); Properties changeLogProps = new Properties(); @@ -94,60 +90,56 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { Map<String, ChangelogInfo> changeLogMap = new HashMap<>(); changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps)); - SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap))); - StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM(), PARTITIONS); - admin.createChangelogStream(STREAM, PARTITIONS); - admin.validateStream(spec); + KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap))); + StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS); - ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class); - Mockito.verify(admin).createStream(specCaptor.capture()); - - StreamSpec internalSpec = specCaptor.getValue(); - assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor - assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId()); - assertEquals(SYSTEM(), internalSpec.getSystemName()); - assertEquals(STREAM, internalSpec.getPhysicalName()); - assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor()); - assertEquals(PARTITIONS, internalSpec.getPartitionCount()); - assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties()); - } + Mockito.doAnswer(invocationOnMock -> { + StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod(); + assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor + assertTrue(internalSpec.isChangeLogStream()); + assertEquals(SYSTEM(), internalSpec.getSystemName()); + assertEquals(STREAM, internalSpec.getPhysicalName()); + assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor()); + assertEquals(PARTITIONS, internalSpec.getPartitionCount()); + assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties()); - @Test - public void testValidateChangelogStreamDelegatesToValidateStream() { - final String STREAM = "testChangeLogValidate"; - Properties coordProps = new Properties(); - Map<String, ChangelogInfo> changeLogMap = new HashMap<>(); - changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties())); - - KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)); - SystemAdmin admin = Mockito.spy(systemAdmin); - StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12); + return internalSpec; + }).when(admin).toKafkaSpec(Mockito.any()); - admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount()); + admin.createStream(spec); admin.validateStream(spec); - admin.validateChangelogStream(spec.getPhysicalName(), spec.getPartitionCount()); - - Mockito.verify(admin).createStream(Mockito.any()); - Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any()); } @Test - public void testValidateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() { - final String STREAM = "test.Change_Log.Validate"; + public void testCreateChangelogStreamWithSpecialCharsInTopicName() { + final String STREAM = "test.Change_Log.Stream"; + final int PARTITIONS = 12; + final int REP_FACTOR = 1; + Properties coordProps = new Properties(); + Properties changeLogProps = new Properties(); + changeLogProps.setProperty("cleanup.policy", "compact"); + changeLogProps.setProperty("segment.bytes", "139"); Map<String, ChangelogInfo> changeLogMap = new HashMap<>(); - changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties())); - - KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)); - SystemAdmin admin = Mockito.spy(systemAdmin); - StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12); + changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps)); - admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount()); + KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap))); + StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS); + Mockito.doAnswer(invocationOnMock -> { + StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod(); + assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor + assertTrue(internalSpec.isChangeLogStream()); + assertEquals(SYSTEM(), internalSpec.getSystemName()); + assertEquals(STREAM, internalSpec.getPhysicalName()); + assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor()); + assertEquals(PARTITIONS, internalSpec.getPartitionCount()); + assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties()); + + return internalSpec; + }).when(admin).toKafkaSpec(Mockito.any()); + + admin.createStream(spec); admin.validateStream(spec); - admin.validateChangelogStream(STREAM, spec.getPartitionCount()); // Should not throw - - Mockito.verify(admin).createStream(Mockito.any()); - Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any()); } @Test @@ -191,4 +183,17 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { admin.validateStream(spec2); } + + @Test + public void testClearStream() { + KafkaSystemAdmin admin = this.basicSystemAdmin; + StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8); + + assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec)); + assertTrue(admin.clearStream(spec)); + + scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName()); + scala.collection.immutable.Map<String, TopicMetadata> metadata = admin.getTopicMetadata(topic); + assertTrue(metadata.get(spec.getPhysicalName()).get().partitionsMetadata().isEmpty()); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index 19f3903..6fb03a1 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.security.JaasUtils import org.apache.samza.Partition import org.apache.samza.config.KafkaProducerConfig import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition} +import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition} import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, TopicMetadataStore} import org.junit.Assert._ import org.junit._ @@ -283,7 +283,8 @@ class TestKafkaSystemAdmin { val topic = "test-coordinator-stream" val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3) - systemAdmin.createCoordinatorStream(topic) + val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka") + systemAdmin.createStream(spec) validateTopic(topic, 1) val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo) assertTrue(topicMetadataMap.contains(topic)) http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java index a05f89a..322b367 100644 --- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java +++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java @@ -66,21 +66,6 @@ public class MockSystemAdmin implements SystemAdmin { } @Override - public void createChangelogStream(String streamName, int numOfPartitions) { - throw new UnsupportedOperationException("Method not implemented"); - } - - @Override - public void validateChangelogStream(String streamName, int numOfPartitions) { - throw new UnsupportedOperationException("Method not implemented"); - } - - @Override - public void createCoordinatorStream(String streamName) { - throw new UnsupportedOperationException("Method not implemented."); - } - - @Override public Integer offsetComparator(String offset1, String offset2) { return null; } http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java index 41f01c5..8890a2f 100644 --- a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java +++ b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java @@ -63,21 +63,6 @@ public class SimpleSystemAdmin implements SystemAdmin { } @Override - public void createChangelogStream(String streamName, int numOfPartitions) { - throw new UnsupportedOperationException(); - } - - @Override - public void validateChangelogStream(String streamName, int numOfPartitions) { - throw new UnsupportedOperationException(); - } - - @Override - public void createCoordinatorStream(String streamName) { - throw new UnsupportedOperationException(); - } - - @Override public Integer offsetComparator(String offset1, String offset2) { if (offset1 == null) { return offset2 == null ? 0 : -1; http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala index c320a97..5650d4b 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala @@ -38,17 +38,5 @@ class MockSystemAdmin(numTasks: Int) extends SystemAdmin { }).toMap.asJava } - override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) { - new UnsupportedOperationException("Method not implemented.") - } - - override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) { - new UnsupportedOperationException("Method not implemented.") - } - - override def createCoordinatorStream(streamName: String) { - new UnsupportedOperationException("Method not implemented.") - } - override def offsetComparator(offset1: String, offset2: String) = null }
