Repository: kafka
Updated Branches:
  refs/heads/trunk a8ccdc615 -> c6b8de4e6


KAFKA-2807: Move ThroughputThrottler back to tools jar to fix upgrade tests.

Author: Ewen Cheslack-Postava <m...@ewencp.org>

Reviewers: Gwen Shapira

Closes #499 from ewencp/kafka-2807-relocate-throughput-throttler


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6b8de4e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6b8de4e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6b8de4e

Branch: refs/heads/trunk
Commit: c6b8de4e6806d8f9f4af57e15f2a7f4170265c42
Parents: a8ccdc6
Author: Ewen Cheslack-Postava <m...@ewencp.org>
Authored: Wed Nov 11 15:55:12 2015 -0800
Committer: Gwen Shapira <csh...@gmail.com>
Committed: Wed Nov 11 15:55:12 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |  45 +++---
 .../kafka/common/utils/ThroughputThrottler.java | 141 -------------------
 .../connect/tools/VerifiableSourceTask.java     |   2 +-
 settings.gradle                                 |   2 +-
 .../apache/kafka/tools/ProducerPerformance.java |   1 -
 .../apache/kafka/tools/ThroughputThrottler.java | 141 +++++++++++++++++++
 .../apache/kafka/tools/VerifiableProducer.java  |   1 -
 7 files changed, 166 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 70fdbcd..0ee6c41 100644
--- a/build.gradle
+++ b/build.gradle
@@ -230,7 +230,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
   }
 }
 
-def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 
'connect:file', 'connect:tools']
+def connectPkgs = ['connect-api', 'connect-runtime', 'connect-json', 
'connect-file', 'connect-tools']
 def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + 
connectPkgs
 
 tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" 
}) {}
@@ -321,7 +321,7 @@ project(':core') {
     standardOutput = new File('docs/kafka_config.html').newOutputStream()
   }
 
-  task siteDocsTar(dependsOn: ['genProducerConfigDocs', 
'genConsumerConfigDocs', 'genKafkaConfigDocs', 
':connect:runtime:genConnectConfigDocs'], type: Tar) {
+  task siteDocsTar(dependsOn: ['genProducerConfigDocs', 
'genConsumerConfigDocs', 'genKafkaConfigDocs', 
':connect-runtime:genConnectConfigDocs'], type: Tar) {
     classifier = 'site-docs'
     compression = Compression.GZIP
     from project.file("../docs")
@@ -342,16 +342,16 @@ project(':core') {
     from(project.siteDocsTar) { into("site-docs/") }
     from(project(':tools').jar) { into("libs/") }
     from(project(':tools').configurations.runtime) { into("libs/") }
-    from(project(':connect:api').jar) { into("libs/") }
-    from(project(':connect:api').configurations.runtime) { into("libs/") }
-    from(project(':connect:runtime').jar) { into("libs/") }
-    from(project(':connect:runtime').configurations.runtime) { into("libs/") }
-    from(project(':connect:json').jar) { into("libs/") }
-    from(project(':connect:json').configurations.runtime) { into("libs/") }
-    from(project(':connect:file').jar) { into("libs/") }
-    from(project(':connect:file').configurations.runtime) { into("libs/") }
-    from(project(':connect:tools').jar) { into("libs/") }
-    from(project(':connect:tools').configurations.runtime) { into("libs/") }
+    from(project(':connect-api').jar) { into("libs/") }
+    from(project(':connect-api').configurations.runtime) { into("libs/") }
+    from(project(':connect-runtime').jar) { into("libs/") }
+    from(project(':connect-runtime').configurations.runtime) { into("libs/") }
+    from(project(':connect-json').jar) { into("libs/") }
+    from(project(':connect-json').configurations.runtime) { into("libs/") }
+    from(project(':connect-file').jar) { into("libs/") }
+    from(project(':connect-file').configurations.runtime) { into("libs/") }
+    from(project(':connect-tools').jar) { into("libs/") }
+    from(project(':connect-tools').configurations.runtime) { into("libs/") }
   }
 
   jar {
@@ -638,7 +638,7 @@ project(':log4j-appender') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':connect:api') {
+project(':connect-api') {
   apply plugin: 'checkstyle'
   archivesBaseName = "connect-api"
 
@@ -695,12 +695,12 @@ project(':connect:api') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':connect:json') {
+project(':connect-json') {
   apply plugin: 'checkstyle'
   archivesBaseName = "connect-json"
 
   dependencies {
-    compile project(':connect:api')
+    compile project(':connect-api')
     compile "$slf4japi"
     compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
 
@@ -756,12 +756,12 @@ project(':connect:json') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':connect:runtime') {
+project(':connect-runtime') {
   apply plugin: 'checkstyle'
   archivesBaseName = "connect-runtime"
 
   dependencies {
-    compile project(':connect:api')
+    compile project(':connect-api')
     compile project(':clients')
     compile "$slf4japi"
 
@@ -776,7 +776,7 @@ project(':connect:runtime') {
     testCompile "$powermock_easymock"
     testCompile project(':clients').sourceSets.test.output
     testRuntime "$slf4jlog4j"
-    testRuntime project(":connect:json")
+    testRuntime project(":connect-json")
   }
 
   task testJar(type: Jar) {
@@ -830,12 +830,12 @@ project(':connect:runtime') {
   }
 }
 
-project(':connect:file') {
+project(':connect-file') {
   apply plugin: 'checkstyle'
   archivesBaseName = "connect-file"
 
   dependencies {
-    compile project(':connect:api')
+    compile project(':connect-api')
     compile "$slf4japi"
 
     testCompile "$junit"
@@ -890,12 +890,13 @@ project(':connect:file') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':connect:tools') {
+project(':connect-tools') {
   apply plugin: 'checkstyle'
   archivesBaseName = "connect-tools"
 
   dependencies {
-    compile project(':connect:api')
+    compile project(':connect-api')
+    compile project(':tools')
     compile "$slf4japi"
     compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java 
b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
deleted file mode 100644
index 1c63ffb..0000000
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.utils;
-
-
-/**
- * This class helps producers throttle throughput.
- *
- * If targetThroughput >= 0, the resulting average throughput will be 
approximately
- * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
- * no throttling will occur.
- *
- * To use, do this between successive send attempts:
- * <pre>
- *     {@code
- *      if (throttler.shouldThrottle(...)) {
- *          throttler.throttle();
- *      }
- *     }
- * </pre>
- *
- * Note that this can be used to throttle message throughput or data 
throughput.
- */
-public class ThroughputThrottler {
-
-    private static final long NS_PER_MS = 1000000L;
-    private static final long NS_PER_SEC = 1000 * NS_PER_MS;
-    private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
-
-    long sleepTimeNs;
-    long sleepDeficitNs = 0;
-    long targetThroughput = -1;
-    long startMs;
-    private boolean wakeup = false;
-
-    /**
-     * @param targetThroughput Can be messages/sec or bytes/sec
-     * @param startMs          When the very first message is sent
-     */
-    public ThroughputThrottler(long targetThroughput, long startMs) {
-        this.startMs = startMs;
-        this.targetThroughput = targetThroughput;
-        this.sleepTimeNs = targetThroughput > 0 ?
-                           NS_PER_SEC / targetThroughput :
-                           Long.MAX_VALUE;
-    }
-
-    /**
-     * @param amountSoFar bytes produced so far if you want to throttle data 
throughput, or
-     *                    messages produced so far if you want to throttle 
message throughput.
-     * @param sendStartMs timestamp of the most recently sent message
-     * @return
-     */
-    public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
-        if (this.targetThroughput < 0) {
-            // No throttling in this case
-            return false;
-        }
-
-        float elapsedMs = (sendStartMs - startMs) / 1000.f;
-        return elapsedMs > 0 && (amountSoFar / elapsedMs) > 
this.targetThroughput;
-    }
-
-    /**
-     * Occasionally blocks for small amounts of time to achieve 
targetThroughput.
-     *
-     * Note that if targetThroughput is 0, this will block extremely 
aggressively.
-     */
-    public void throttle() {
-        if (targetThroughput == 0) {
-            try {
-                synchronized (this) {
-                    while (!wakeup) {
-                        this.wait();
-                    }
-                }
-            } catch (InterruptedException e) {
-                // do nothing
-            }
-            return;
-        }
-
-        // throttle throughput by sleeping, on average,
-        // (1 / this.throughput) seconds between "things sent"
-        sleepDeficitNs += sleepTimeNs;
-
-        // If enough sleep deficit has accumulated, sleep a little
-        if (sleepDeficitNs >= MIN_SLEEP_NS) {
-            long sleepStartNs = System.nanoTime();
-            long currentTimeNs = sleepStartNs;
-            try {
-                synchronized (this) {
-                    long elapsed = currentTimeNs - sleepStartNs;
-                    long remaining = sleepDeficitNs - elapsed;
-                    while (!wakeup && remaining > 0) {
-                        long sleepMs = remaining / 1000000;
-                        long sleepNs = remaining - sleepMs * 1000000;
-                        this.wait(sleepMs, (int) sleepNs);
-                        elapsed = System.nanoTime() - sleepStartNs;
-                        remaining = sleepDeficitNs - elapsed;
-                    }
-                    wakeup = false;
-                }
-                sleepDeficitNs = 0;
-            } catch (InterruptedException e) {
-                // If sleep is cut short, reduce deficit by the amount of
-                // time we actually spent sleeping
-                long sleepElapsedNs = System.nanoTime() - sleepStartNs;
-                if (sleepElapsedNs <= sleepDeficitNs) {
-                    sleepDeficitNs -= sleepElapsedNs;
-                }
-            }
-        }
-    }
-
-    /**
-     * Wakeup the throttler if its sleeping.
-     */
-    public void wakeup() {
-        synchronized (this) {
-            wakeup = true;
-            this.notifyAll();
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
----------------------------------------------------------------------
diff --git 
a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
 
b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
index 6fee2c4..a85a0e9 100644
--- 
a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
+++ 
b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
@@ -19,11 +19,11 @@ package org.apache.kafka.connect.tools;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.utils.ThroughputThrottler;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.tools.ThroughputThrottler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 2728b5b..d1543c3 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,4 +15,4 @@
 
 apply from: file('scala.gradle')
 include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
-        'connect:api', 'connect:runtime', 'connect:json', 'connect:file', 
'connect:tools'
+        'connect-api', 'connect-runtime', 'connect-json', 'connect-file', 
'connect-tools'

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 2a7f7b1..3a06862 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -24,7 +24,6 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
 
 import org.apache.kafka.clients.producer.*;
-import org.apache.kafka.common.utils.ThroughputThrottler;
 
 public class ProducerPerformance {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java 
b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
new file mode 100644
index 0000000..a3bcd2f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.tools;
+
+
+/**
+ * This class helps producers throttle throughput.
+ *
+ * If targetThroughput >= 0, the resulting average throughput will be 
approximately
+ * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
+ * no throttling will occur.
+ *
+ * To use, do this between successive send attempts:
+ * <pre>
+ *     {@code
+ *      if (throttler.shouldThrottle(...)) {
+ *          throttler.throttle();
+ *      }
+ *     }
+ * </pre>
+ *
+ * Note that this can be used to throttle message throughput or data 
throughput.
+ */
+public class ThroughputThrottler {
+
+    private static final long NS_PER_MS = 1000000L;
+    private static final long NS_PER_SEC = 1000 * NS_PER_MS;
+    private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
+
+    long sleepTimeNs;
+    long sleepDeficitNs = 0;
+    long targetThroughput = -1;
+    long startMs;
+    private boolean wakeup = false;
+
+    /**
+     * @param targetThroughput Can be messages/sec or bytes/sec
+     * @param startMs          When the very first message is sent
+     */
+    public ThroughputThrottler(long targetThroughput, long startMs) {
+        this.startMs = startMs;
+        this.targetThroughput = targetThroughput;
+        this.sleepTimeNs = targetThroughput > 0 ?
+                           NS_PER_SEC / targetThroughput :
+                           Long.MAX_VALUE;
+    }
+
+    /**
+     * @param amountSoFar bytes produced so far if you want to throttle data 
throughput, or
+     *                    messages produced so far if you want to throttle 
message throughput.
+     * @param sendStartMs timestamp of the most recently sent message
+     * @return
+     */
+    public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
+        if (this.targetThroughput < 0) {
+            // No throttling in this case
+            return false;
+        }
+
+        float elapsedMs = (sendStartMs - startMs) / 1000.f;
+        return elapsedMs > 0 && (amountSoFar / elapsedMs) > 
this.targetThroughput;
+    }
+
+    /**
+     * Occasionally blocks for small amounts of time to achieve 
targetThroughput.
+     *
+     * Note that if targetThroughput is 0, this will block extremely 
aggressively.
+     */
+    public void throttle() {
+        if (targetThroughput == 0) {
+            try {
+                synchronized (this) {
+                    while (!wakeup) {
+                        this.wait();
+                    }
+                }
+            } catch (InterruptedException e) {
+                // do nothing
+            }
+            return;
+        }
+
+        // throttle throughput by sleeping, on average,
+        // (1 / this.throughput) seconds between "things sent"
+        sleepDeficitNs += sleepTimeNs;
+
+        // If enough sleep deficit has accumulated, sleep a little
+        if (sleepDeficitNs >= MIN_SLEEP_NS) {
+            long sleepStartNs = System.nanoTime();
+            long currentTimeNs = sleepStartNs;
+            try {
+                synchronized (this) {
+                    long elapsed = currentTimeNs - sleepStartNs;
+                    long remaining = sleepDeficitNs - elapsed;
+                    while (!wakeup && remaining > 0) {
+                        long sleepMs = remaining / 1000000;
+                        long sleepNs = remaining - sleepMs * 1000000;
+                        this.wait(sleepMs, (int) sleepNs);
+                        elapsed = System.nanoTime() - sleepStartNs;
+                        remaining = sleepDeficitNs - elapsed;
+                    }
+                    wakeup = false;
+                }
+                sleepDeficitNs = 0;
+            } catch (InterruptedException e) {
+                // If sleep is cut short, reduce deficit by the amount of
+                // time we actually spent sleeping
+                long sleepElapsedNs = System.nanoTime() - sleepStartNs;
+                if (sleepElapsedNs <= sleepDeficitNs) {
+                    sleepDeficitNs -= sleepElapsedNs;
+                }
+            }
+        }
+    }
+
+    /**
+     * Wakeup the throttler if its sleeping.
+     */
+    public void wakeup() {
+        synchronized (this) {
+            wakeup = true;
+            this.notifyAll();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index e8bd330..0cd90c0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -41,7 +41,6 @@ import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.kafka.common.utils.ThroughputThrottler;
 
 /**
  * Primarily intended for use with system testing, this producer prints 
metadata

Reply via email to