Repository: incubator-samza Updated Branches: refs/heads/master f5741125a -> 5ae028595
SAMZA-168; add exponential backoff to kafka system admin Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5ae02859 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5ae02859 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5ae02859 Branch: refs/heads/master Commit: 5ae02859552778193e4247daa65a00414a8708c1 Parents: f574112 Author: Martin Kleppmann <[email protected]> Authored: Wed Mar 5 11:04:27 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Wed Mar 5 11:04:27 2014 -0800 ---------------------------------------------------------------------- .../samza/util/ExponentialSleepStrategy.scala | 45 ++++++++++++++ .../util/TestExponentialSleepStrategy.scala | 57 +++++++++++++++++ .../apache/samza/system/kafka/BrokerProxy.scala | 4 +- .../samza/system/kafka/KafkaSystemAdmin.scala | 3 + .../util/ExponentialThreadSleepStrategy.scala | 65 -------------------- .../TestExponentialThreadSleepStrategy.scala | 47 -------------- 6 files changed, 107 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala new file mode 100644 index 0000000..b3c9263 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.util + +class ExponentialSleepStrategy( + backOffMultiplier: Double = 2.0, + initialDelayMs: Long = 100, + maximumDelayMs: Long = 10000) { + + require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1") + require(initialDelayMs > 0, "initialDelayMs must be positive") + require(maximumDelayMs >= initialDelayMs, "maximumDelayMs must be >= initialDelayMs") + + var previousDelay = 0L + + def sleep() = { + val nextDelay = getNextDelay(previousDelay) + Thread.sleep(nextDelay) + previousDelay = nextDelay + } + + def getNextDelay(previousDelay: Long): Long = { + val nextDelay = (previousDelay * backOffMultiplier).asInstanceOf[Long] + math.min(math.max(initialDelayMs, nextDelay), maximumDelayMs) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala new file mode 100644 index 0000000..962ca40 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.util + +import org.junit.Assert._ +import org.junit.Test +import org.apache.samza.util.ExponentialSleepStrategy + +class TestExponentialSleepStrategy { + + @Test def testGetNextDelayReturnsIncrementalDelay() = { + val st = new ExponentialSleepStrategy + var nextDelay = st.getNextDelay(0L) + assertEquals(nextDelay, 100L) + nextDelay = st.getNextDelay(nextDelay) + assertEquals(nextDelay, 200L) + nextDelay = st.getNextDelay(nextDelay) + assertEquals(nextDelay, 400L) + } + + @Test def testGetNextDelayReturnsMaximumDelayWhenDelayCapReached() = { + val st = new ExponentialSleepStrategy + var nextDelay = st.getNextDelay(6400L) + assertEquals(nextDelay, 10000L) + nextDelay = st.getNextDelay(nextDelay) + assertEquals(nextDelay, 10000L) + } + + @Test def testSleepStrategyIsConfigurable() = { + val st = new ExponentialSleepStrategy(backOffMultiplier = 3.0, initialDelayMs = 10) + var nextDelay = st.getNextDelay(0L) + assertEquals(nextDelay, 10L) + nextDelay = st.getNextDelay(nextDelay) + assertEquals(nextDelay, 30L) + nextDelay = st.getNextDelay(nextDelay) + assertEquals(nextDelay, 90L) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 0d71582..10855dc 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -32,7 +32,7 @@ import java.util.Map.Entry import scala.collection.mutable import kafka.consumer.ConsumerConfig import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX -import org.apache.samza.util.ExponentialThreadSleepStrategy +import org.apache.samza.util.ExponentialSleepStrategy /** * Companion object for class JvmMetrics encapsulating various constants @@ -125,7 +125,7 @@ class BrokerProxy( val thread: Thread = new Thread(new Runnable() { def run() { info("Initialising sleep strategy"); - val sleepStrategy = new ExponentialThreadSleepStrategy + val sleepStrategy = new ExponentialSleepStrategy info("Starting thread for BrokerProxy") while (!Thread.currentThread.isInterrupted) { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 30e9fc3..ad5f2fa 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -25,6 +25,7 @@ import org.apache.samza.system.SystemAdmin import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.ClientUtilTopicMetadataStore +import org.apache.samza.util.ExponentialSleepStrategy import kafka.api._ import kafka.consumer.SimpleConsumer import kafka.utils.Utils @@ -122,6 +123,7 @@ class KafkaSystemAdmin( var upcomingOffsets = Map[SystemStreamPartition, String]() var done = false var consumer: SimpleConsumer = null + val retryBackoff = new ExponentialSleepStrategy(initialDelayMs = 500) debug("Fetching offsets for: %s" format streams) @@ -176,6 +178,7 @@ class KafkaSystemAdmin( // Retry. warn("Unable to fetch last offsets for streams due to: %s, %s. Retrying. Turn on debugging to get a full stack trace." format (e.getMessage, streams)) debug(e) + retryBackoff.sleep } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala b/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala deleted file mode 100644 index fde3cf3..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.samza.util; - -import ExponentialThreadSleepStrategy._ - -/** - * Companion object for class ExponentialThreadSleep encapsulating various constants - */ -object ExponentialThreadSleepStrategy { - val SLEEP_BACK_OFF_MULTIPLIER = 2.0 - val SLEEP_INITIAL_BACK_OFF_DELAY_MS = 100 - val SLEEP_CAP_DELAY_MS = 10000 -} - -class ExponentialThreadSleepStrategy { - - /** - * Sleep counter variable used to - */ - var sleepBackOffMS = 0l - - val sleepMSBackOffDelay = SLEEP_INITIAL_BACK_OFF_DELAY_MS - - val sleepMSCapOnFetchMessagesError = SLEEP_CAP_DELAY_MS - - def sleep() = { - val nextDelay = getNextDelay(sleepBackOffMS) - Thread.sleep(nextDelay) - sleepBackOffMS = nextDelay - } - - def getNextDelay(previousDelay: Long) = { - var nextDelay = 0l - if (previousDelay == 0) { - nextDelay = sleepMSBackOffDelay - } else if (SLEEP_BACK_OFF_MULTIPLIER > 1) { - nextDelay = (previousDelay * SLEEP_BACK_OFF_MULTIPLIER).asInstanceOf[Long] - if (sleepMSCapOnFetchMessagesError != -1 && nextDelay > sleepMSCapOnFetchMessagesError) { - nextDelay = scala.math.max(sleepMSCapOnFetchMessagesError, sleepMSBackOffDelay) - } - } - nextDelay - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala b/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala deleted file mode 100644 index f5e3e4c..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -import org.apache.samza.util.ExponentialThreadSleepStrategy -import org.junit.Assert._ -import org.junit.Test - -class TestExponentialThreadSleepStrategy { - - @Test def testGetNextRedeliveryDelayCorrectlyReturnsMaximumDelayWhenDelayCapReached() = { - val st = new ExponentialThreadSleepStrategy - var nextDelay = st.getNextDelay(ExponentialThreadSleepStrategy.SLEEP_CAP_DELAY_MS) - assertEquals(nextDelay, 10000l) - nextDelay = st.getNextDelay(ExponentialThreadSleepStrategy.SLEEP_CAP_DELAY_MS) - assertEquals(nextDelay, 10000l) - } - - @Test def testGetNextRedeliveryDelayCorrectlyReturnsIncrementalDelay() = { - val st = new ExponentialThreadSleepStrategy - var nextDelay = st.getNextDelay(0l) - assertEquals(nextDelay, 100l) - nextDelay = st.getNextDelay(nextDelay) - assertEquals(nextDelay, 200l) - nextDelay = st.getNextDelay(nextDelay) - assertEquals(nextDelay, 400l) - } -} - -
