Repository: samza Updated Branches: refs/heads/master 9f60e96b1 -> 8723c3f79
SAMZA-1943 Remove ExtendedSystemAdmin and deprecated getNewestOffsets method. Author: Boris S <[email protected]> Author: Boris S <[email protected]> Author: Boris Shkolnik <[email protected]> Reviewers: Bharath Kumarasubramanian <[email protected]> Closes #782 from sborya/removeExtendedSystemAdmin Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8723c3f7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8723c3f7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8723c3f7 Branch: refs/heads/master Commit: 8723c3f79f751870ebcece9b5618db009029d691 Parents: 9f60e96 Author: Boris S <[email protected]> Authored: Wed Oct 31 13:45:28 2018 -0700 Committer: Boris S <[email protected]> Committed: Wed Oct 31 13:45:28 2018 -0700 ---------------------------------------------------------------------- .../samza/system/ExtendedSystemAdmin.java | 37 ----------- .../org/apache/samza/system/SystemAdmin.java | 12 ++++ .../apache/samza/system/TestSystemAdmin.java | 2 +- .../samza/system/StreamMetadataCache.scala | 4 +- .../apache/samza/system/MockSystemFactory.java | 9 +-- .../samza/checkpoint/TestCheckpointTool.scala | 2 +- .../samza/system/kafka/KafkaSystemAdmin.java | 39 +----------- .../kafka_deprecated/KafkaSystemAdmin.scala | 66 +------------------- .../system/kafka/TestKafkaSystemAdmin.scala | 19 +++--- .../kafka_deprecated/TestKafkaSystemAdmin.scala | 41 ------------ 10 files changed, 32 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java deleted file mode 100644 index ba239dc..0000000 --- a/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.system; - -import java.util.Map; -import java.util.Set; - -/** - * Interface extends the more generic SystemAdmin interface - * TODO: Merge this interface method with SystemAdmin when we upgrade to JDK 1.8 - */ -public interface ExtendedSystemAdmin extends SystemAdmin { - Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL); - - /** - * Deprecated: Use {@link SystemAdmin#getSSPMetadata}, ideally combined with caching (i.e. SSPMetadataCache). - * Makes fewer offset requests than getSystemStreamMetadata - */ - @Deprecated - String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries); -} http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index 16f90e9..6ee7df2 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -144,4 +144,16 @@ public interface SystemAdmin { } + /** + * Get partitions counts only. Should be more efficient then getSystemStreamMetadata, but if not implemented + * revert to getSystemStreamMetadata. + * @param streamNames set of streams to query. + * @param cacheTTL cacheTTL to use if caching the values. + * @return A map from stream name to SystemStreamMetadata for each stream + * requested in the parameter set. + */ + default Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) { + return getSystemStreamMetadata(streamNames); + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-api/src/test/java/org/apache/samza/system/TestSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/system/TestSystemAdmin.java b/samza-api/src/test/java/org/apache/samza/system/TestSystemAdmin.java index 85245e3..a725ce1 100644 --- a/samza-api/src/test/java/org/apache/samza/system/TestSystemAdmin.java +++ b/samza-api/src/test/java/org/apache/samza/system/TestSystemAdmin.java @@ -112,5 +112,5 @@ public class TestSystemAdmin { * Looks like Mockito 1.x does not support using thenCallRealMethod with default methods for interfaces, but it works * to use this placeholder abstract class. */ - private abstract class MySystemAdmin implements ExtendedSystemAdmin { } + private abstract class MySystemAdmin implements SystemAdmin { } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala index 637858b..edffac7 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala @@ -62,8 +62,8 @@ class StreamMetadataCache ( .flatMap { case (systemName, systemStreams) => val systemAdmin = systemAdmins.getSystemAdmin(systemName) - val streamToMetadata = if (partitionsMetadataOnly && systemAdmin.isInstanceOf[ExtendedSystemAdmin]) { - systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream).asJava, cacheTTLms) + val streamToMetadata = if (partitionsMetadataOnly) { + systemAdmin.getSystemStreamPartitionCounts(systemStreams.map(_.getStream).asJava, cacheTTLms) } else { systemAdmin.getSystemStreamMetadata(systemStreams.map(_.getStream).asJava) } http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java index a9c57da..e3030ab 100644 --- a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java +++ b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java @@ -111,7 +111,7 @@ public class MockSystemFactory implements SystemFactory { } public SystemAdmin getAdmin(String systemName, Config config) { - return new ExtendedSystemAdmin() { + return new SystemAdmin() { @Override public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { @@ -161,12 +161,7 @@ public class MockSystemFactory implements SystemFactory { public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) { return getSystemStreamMetadata(streamNames); } - - @Override - public String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries) { - return null; - } - + @Override public boolean createStream(StreamSpec streamSpec) { return true; http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala index bed013c..4bb7adf 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala @@ -82,7 +82,7 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar { ).asJava) TestCheckpointTool.checkpointManager = mock[CheckpointManager] TestCheckpointTool.systemAdmin = mock[SystemAdmin] - when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo").asJava)) + when(TestCheckpointTool.systemAdmin.getSystemStreamPartitionCounts(Set("foo").asJava, 0)) .thenReturn(Map("foo" -> metadata).asJava) when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn0)) .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "1234").asJava)) http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java index d2ceafb..16a2e67 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java @@ -43,9 +43,9 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.KafkaConfig; import org.apache.samza.config.SystemConfig; -import org.apache.samza.system.ExtendedSystemAdmin; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.StreamValidationException; +import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.ExponentialSleepStrategy; @@ -64,7 +64,7 @@ import scala.runtime.BoxedUnit; import static org.apache.samza.config.KafkaConsumerConfig.*; -public class KafkaSystemAdmin implements ExtendedSystemAdmin { +public class KafkaSystemAdmin implements SystemAdmin { private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemAdmin.class); // Default exponential sleep strategy values @@ -354,41 +354,6 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin { return result; } - @Override - public String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries) { - LOG.info("Fetching newest offset for: {}", ssp); - - ExponentialSleepStrategy strategy = new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER, - DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS); - - Function1<ExponentialSleepStrategy.RetryLoop, String> fetchNewestOffset = - new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, String>() { - @Override - public String apply(ExponentialSleepStrategy.RetryLoop loop) { - String result = fetchNewestOffset(ssp); - loop.done(); - return result; - } - }; - - String offset = strategy.run(fetchNewestOffset, - new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() { - @Override - public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) { - if (loop.sleepCount() < maxRetries) { - LOG.warn(String.format("Fetching newest offset for: %s threw an exception. Retrying.", ssp), exception); - } else { - LOG.error(String.format("Fetching newest offset for: %s threw an exception.", ssp), exception); - loop.done(); - throw new SamzaException("Exception while trying to get newest offset", exception); - } - return null; - } - }).get(); - - return offset; - } - /** * Convert TopicPartition to SystemStreamPartition * @param topicPartition the topic partition to be created http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala index e7ff749..f16e507 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala @@ -154,7 +154,7 @@ class KafkaSystemAdmin( /** * Whether deleteMessages() API can be used */ - deleteCommittedMessages: Boolean = false) extends ExtendedSystemAdmin with Logging { + deleteCommittedMessages: Boolean = false) extends SystemAdmin with Logging { import KafkaSystemAdmin._ @@ -303,70 +303,6 @@ class KafkaSystemAdmin( } /** - * Returns the newest offset for the specified SSP. - * This method is fast and targeted. It minimizes the number of kafka requests. - * It does not retry indefinitely if there is any failure. - * It returns null if the topic is empty. To get the offsets for *all* - * partitions, it would be more efficient to call getSystemStreamMetadata - */ - override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = { - debug("Fetching newest offset for: %s" format ssp) - var offset: String = null - var metadataTTL = Long.MaxValue // Trust the cache until we get an exception - var retries = maxRetries - new ExponentialSleepStrategy().run( - loop => { - val metadata = TopicMetadataCache.getTopicMetadata( - Set(ssp.getStream), - systemName, - getTopicMetadata, - metadataTTL) - debug("Got metadata for streams: %s" format metadata) - - val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata) - val topicAndPartition = new TopicAndPartition(ssp.getStream, ssp.getPartition.getPartitionId) - val broker = brokersToTopicPartitions.filter((e) => e._2.contains(topicAndPartition)).head._1 - - // Get oldest, newest, and upcoming offsets for each topic and partition. - debug("Fetching offset for %s:%s: %s" format (broker.host, broker.port, topicAndPartition)) - val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId) - try { - offset = getOffsets(consumer, Set(topicAndPartition), OffsetRequest.LatestTime).head._2 - - // Kafka's "latest" offset is always last message in stream's offset + - // 1, so get newest message in stream by subtracting one. this is safe - // even for key-deduplicated streams, since the last message will - // never be deduplicated. - if (offset.toLong <= 0) { - debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition) - offset = null - } else { - offset = (offset.toLong - 1).toString - } - } finally { - consumer.close - } - - debug("Got offset %s for %s." format(offset, ssp)) - loop.done - }, - - (exception, loop) => { - if (retries > 0) { - warn("Exception while trying to get offset for %s: %s. Retrying." format(ssp, exception)) - metadataTTL = 0L // Force metadata refresh - retries -= 1 - } else { - warn("Exception while trying to get offset for %s" format(ssp), exception) - loop.done - throw exception - } - }) - - offset - } - - /** * Helper method to use topic metadata cache when fetching metadata, so we * don't hammer Kafka more than we need to. */ http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index 1570363..095a1b0 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -21,6 +21,7 @@ package org.apache.samza.system.kafka +import com.google.common.collect.ImmutableSet import kafka.admin.AdminUtils import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector} import kafka.integration.KafkaServerTestHarness @@ -316,23 +317,25 @@ class TestKafkaSystemAdmin { val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new Partition(4)) val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new Partition(13)) - assertNull(systemAdmin.getNewestOffset(sspUnderTest, 3)) - assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) + assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset) + assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset) // Add a new message to one of the partitions, and verify that it works as expected. assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, "key1".getBytes, "val1".getBytes)).get().offset().toString) - assertEquals("0", systemAdmin.getNewestOffset(sspUnderTest, 3)) - assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) + + assertEquals("0", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset) + assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset) // Again assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, "key2".getBytes, "val2".getBytes)).get().offset().toString) - assertEquals("1", systemAdmin.getNewestOffset(sspUnderTest, 3)) - assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) + assertEquals("1", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset) + assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset) // Add a message to both partitions assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, "key3".getBytes, "val3".getBytes)).get().offset().toString) assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, "key4".getBytes, "val4".getBytes)).get().offset().toString) - assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0)) - assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0)) + assertEquals("2", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset) + assertEquals("0", systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset) + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala index bf64c03..93dad09 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala @@ -307,45 +307,4 @@ class TestKafkaSystemAdmin { case e: ExponentialSleepStrategy.CallLimitReached => () } } - - @Test - def testGetNewestOffset { - createTopic(TOPIC2, 16) - validateTopic(TOPIC2, 16) - - val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new Partition(4)) - val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new Partition(13)) - - assertNull(systemAdmin.getNewestOffset(sspUnderTest, 3)) - assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) - - // Add a new message to one of the partitions, and verify that it works as expected. - assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, "key1".getBytes, "val1".getBytes)).get().offset().toString) - assertEquals("0", systemAdmin.getNewestOffset(sspUnderTest, 3)) - assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) - - // Again - assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, "key2".getBytes, "val2".getBytes)).get().offset().toString) - assertEquals("1", systemAdmin.getNewestOffset(sspUnderTest, 3)) - assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) - - // Add a message to both partitions - assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, "key3".getBytes, "val3".getBytes)).get().offset().toString) - assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, "key4".getBytes, "val4".getBytes)).get().offset().toString) - assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0)) - assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0)) - } - - @Test (expected = classOf[LeaderNotAvailableException]) - def testGetNewestOffsetMaxRetry { - val expectedRetryCount = 3 - val systemAdmin = new KafkaSystemAdminWithTopicMetadataError - try { - systemAdmin.getNewestOffset(new SystemStreamPartition(SYSTEM, "quux", new Partition(0)), 3) - } catch { - case e: Exception => - assertEquals(expectedRetryCount + 1, systemAdmin.metadataCallCount) - throw e - } - } }
