This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 15d1cc8 MINOR: Improvements and fixes for Trogdor payload generators.
(#10621)
15d1cc8 is described below
commit 15d1cc8b5435eda7c36e42ae0898b0671b70a96f
Author: Scott Hendricks <[email protected]>
AuthorDate: Thu May 6 20:46:01 2021 -0400
MINOR: Improvements and fixes for Trogdor payload generators. (#10621)
* Changes the new Throughput Generators to track messages per window
instead of making per-second calculations which can have rounding errors.
Also, one of these had a calculation error which prompted this change in
the first place.
* Fixes a couple typos.
* Fixes an error where certain JSON fields were not exposed, causing the
workloads to not behave as intended.
* Fixes a bug where we use wait not in a loop, which exits too quickly.
* Adds additional constant payload generators.
* Fixes problems with an example spec.
* Fixes several off-by-one comparisons.
Reviewers: Colin P. McCabe <[email protected]>
---
.../workload/ConstantThroughputGenerator.java | 39 ++++++-----
.../trogdor/workload/GaussianFlushGenerator.java | 6 +-
.../workload/GaussianThroughputGenerator.java | 49 +++++++-------
...GaussianTimestampConstantPayloadGenerator.java} | 27 ++++----
.../GaussianTimestampRandomPayloadGenerator.java | 11 ++-
.../kafka/trogdor/workload/PayloadGenerator.java | 4 +-
.../TimestampConstantPayloadGenerator.java | 78 ++++++++++++++++++++++
.../trogdor/workload/TimestampRecordProcessor.java | 2 +-
8 files changed, 150 insertions(+), 66 deletions(-)
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
index 19e5af0..9e5eeb9 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
@@ -26,26 +26,21 @@ import org.apache.kafka.common.utils.Time;
* The lower the window size, the smoother the traffic will be. Using a 100ms
window offers no noticeable spikes in
* traffic while still being long enough to avoid too much overhead.
*
- * WARNING: Due to binary nature of throughput in terms of messages sent in a
window, each window will send at least 1
- * message, and each window sends the same number of messages, rounded down.
For example, 99 messages per second with a
- * 100ms window will only send 90 messages per second, or 9 messages per
window. Another example, in order to send only
- * 5 messages per second, a window size of 200ms is required. In cases like
these, both the `messagesPerSecond` and
- * `windowSizeMs` parameters should be adjusted together to achieve more
accurate throughput.
- *
* Here is an example spec:
*
* {
* "type": "constant",
- * "messagesPerSecond": 500,
+ * "messagesPerWindow": 50,
* "windowSizeMs": 100
* }
*
* This will produce a workload that runs 500 messages per second, with a
maximum resolution of 50 messages per 100
* millisecond.
+ *
+ * If `messagesPerWindow` is less than or equal to 0, `throttle` will not
throttle at all and will return immediately.
*/
public class ConstantThroughputGenerator implements ThroughputGenerator {
- private final int messagesPerSecond;
private final int messagesPerWindow;
private final long windowSizeMs;
@@ -53,23 +48,25 @@ public class ConstantThroughputGenerator implements
ThroughputGenerator {
private int messageTracker = 0;
@JsonCreator
- public ConstantThroughputGenerator(@JsonProperty("messagesPerSecond") int
messagesPerSecond,
+ public ConstantThroughputGenerator(@JsonProperty("messagesPerWindow") int
messagesPerWindow,
@JsonProperty("windowSizeMs") long
windowSizeMs) {
- // Calcualte the default values.
+ // Calculate the default values.
if (windowSizeMs <= 0) {
windowSizeMs = 100;
}
this.windowSizeMs = windowSizeMs;
- this.messagesPerSecond = messagesPerSecond;
-
- // Use the rest of the parameters to calculate window properties.
- this.messagesPerWindow = (int) ((long) messagesPerSecond /
windowSizeMs);
+ this.messagesPerWindow = messagesPerWindow;
calculateNextWindow();
}
@JsonProperty
- public int messagesPerSecond() {
- return messagesPerSecond;
+ public long windowSizeMs() {
+ return windowSizeMs;
+ }
+
+ @JsonProperty
+ public int messagesPerWindow() {
+ return messagesPerWindow;
}
private void calculateNextWindow() {
@@ -79,7 +76,7 @@ public class ConstantThroughputGenerator implements
ThroughputGenerator {
// Calculate the next window start time.
long now = Time.SYSTEM.milliseconds();
if (nextWindowStarts > 0) {
- while (nextWindowStarts < now) {
+ while (nextWindowStarts <= now) {
nextWindowStarts += windowSizeMs;
}
} else {
@@ -89,8 +86,8 @@ public class ConstantThroughputGenerator implements
ThroughputGenerator {
@Override
public synchronized void throttle() throws InterruptedException {
- // Run unthrottled if messagesPerSecond is negative.
- if (messagesPerSecond < 0) {
+ // Run unthrottled if messagesPerWindow is not positive.
+ if (messagesPerWindow <= 0) {
return;
}
@@ -106,7 +103,9 @@ public class ConstantThroughputGenerator implements
ThroughputGenerator {
if (messageTracker >= messagesPerWindow) {
// Wait the difference in time between now and when the next
window starts.
- wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+ while (nextWindowStarts > Time.SYSTEM.milliseconds()) {
+ wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+ }
}
}
}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
index a3157db..eb6845e 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
@@ -52,7 +52,7 @@ import java.util.Random;
public class GaussianFlushGenerator implements FlushGenerator {
private final int messagesPerFlushAverage;
- private final int messagesPerFlushDeviation;
+ private final double messagesPerFlushDeviation;
private final Random random = new Random();
@@ -61,7 +61,7 @@ public class GaussianFlushGenerator implements FlushGenerator
{
@JsonCreator
public GaussianFlushGenerator(@JsonProperty("messagesPerFlushAverage") int
messagesPerFlushAverage,
- @JsonProperty("messagesPerFlushDeviation")
int messagesPerFlushDeviation) {
+ @JsonProperty("messagesPerFlushDeviation")
double messagesPerFlushDeviation) {
this.messagesPerFlushAverage = messagesPerFlushAverage;
this.messagesPerFlushDeviation = messagesPerFlushDeviation;
calculateFlushSize();
@@ -73,7 +73,7 @@ public class GaussianFlushGenerator implements FlushGenerator
{
}
@JsonProperty
- public long messagesPerFlushDeviation() {
+ public double messagesPerFlushDeviation() {
return messagesPerFlushDeviation;
}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
index 6d71ff5..a77298f 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
@@ -25,21 +25,17 @@ import java.util.Random;
/*
* This throughput generator configures throughput with a gaussian normal
distribution on a per-window basis. You can
* specify how many windows to keep the throughput at the rate before
changing. All traffic will follow a gaussian
- * distribution centered around `messagesPerSecondAverage` with a deviation of
`messagesPerSecondDeviation`.
+ * distribution centered around `messagesPerWindowAverage` with a deviation of
`messagesPerWindowDeviation`.
*
* The lower the window size, the smoother the traffic will be. Using a 100ms
window offers no noticeable spikes in
* traffic while still being long enough to avoid too much overhead.
*
- * WARNING: Due to binary nature of throughput in terms of messages sent in a
window, this does not work well for an
- * average throughput of less than 5 messages per window. In cases where you
want lower throughput, please adjust the
- * `windowSizeMs` accordingly.
- *
* Here is an example spec:
*
* {
* "type": "gaussian",
- * "messagesPerSecondAverage": 500,
- * "messagesPerSecondDeviation": 50,
+ * "messagesPerWindowAverage": 50,
+ * "messagesPerWindowDeviation": 5,
* "windowsUntilRateChange": 100,
* "windowSizeMs": 100
* }
@@ -56,10 +52,8 @@ import java.util.Random;
*/
public class GaussianThroughputGenerator implements ThroughputGenerator {
- private final int messagesPerSecondAverage;
- private final int messagesPerSecondDeviation;
private final int messagesPerWindowAverage;
- private final int messagesPerWindowDeviation;
+ private final double messagesPerWindowDeviation;
private final int windowsUntilRateChange;
private final long windowSizeMs;
@@ -71,35 +65,31 @@ public class GaussianThroughputGenerator implements
ThroughputGenerator {
private int throttleMessages = 0;
@JsonCreator
- public
GaussianThroughputGenerator(@JsonProperty("messagesPerSecondAverage") int
messagesPerSecondAverage,
-
@JsonProperty("messagesPerSecondDeviation") int messagesPerSecondDeviation,
+ public
GaussianThroughputGenerator(@JsonProperty("messagesPerWindowAverage") int
messagesPerWindowAverage,
+
@JsonProperty("messagesPerWindowDeviation") double messagesPerWindowDeviation,
@JsonProperty("windowsUntilRateChange")
int windowsUntilRateChange,
@JsonProperty("windowSizeMs") long
windowSizeMs) {
- // Calcualte the default values.
+ // Calculate the default values.
if (windowSizeMs <= 0) {
windowSizeMs = 100;
}
this.windowSizeMs = windowSizeMs;
- this.messagesPerSecondAverage = messagesPerSecondAverage;
- this.messagesPerSecondDeviation = messagesPerSecondDeviation;
+ this.messagesPerWindowAverage = messagesPerWindowAverage;
+ this.messagesPerWindowDeviation = messagesPerWindowDeviation;
this.windowsUntilRateChange = windowsUntilRateChange;
- // Take per-second calculations and convert them to per-window
calculations.
- messagesPerWindowAverage = (int) (messagesPerSecondAverage *
windowSizeMs / 1000);
- messagesPerWindowDeviation = (int) (messagesPerSecondDeviation *
windowSizeMs / 1000);
-
- // Calcualte the first window.
+ // Calculate the first window.
calculateNextWindow(true);
}
@JsonProperty
- public int messagesPerSecondAverage() {
- return messagesPerSecondAverage;
+ public int messagesPerWindowAverage() {
+ return messagesPerWindowAverage;
}
@JsonProperty
- public long messagesPerSecondDeviation() {
- return messagesPerSecondDeviation;
+ public double messagesPerWindowDeviation() {
+ return messagesPerWindowDeviation;
}
@JsonProperty
@@ -107,6 +97,11 @@ public class GaussianThroughputGenerator implements
ThroughputGenerator {
return windowsUntilRateChange;
}
+ @JsonProperty
+ public long windowSizeMs() {
+ return windowSizeMs;
+ }
+
private synchronized void calculateNextWindow(boolean force) {
// Reset the message count.
messageTracker = 0;
@@ -127,7 +122,7 @@ public class GaussianThroughputGenerator implements
ThroughputGenerator {
// Calculate the number of messages allowed in this window using a
normal distribution.
// The formula is: Messages = Gaussian * Deviation + Average
- throttleMessages = Math.max((int) (random.nextGaussian() *
(double) messagesPerWindowDeviation) + messagesPerWindowAverage, 1);
+ throttleMessages = Math.max((int) (random.nextGaussian() *
messagesPerWindowDeviation) + messagesPerWindowAverage, 1);
}
windowTracker += 1;
}
@@ -146,7 +141,9 @@ public class GaussianThroughputGenerator implements
ThroughputGenerator {
if (messageTracker >= throttleMessages) {
// Wait the difference in time between now and when the next
window starts.
- wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+ while (nextWindowStarts > Time.SYSTEM.milliseconds()) {
+ wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+ }
}
}
}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampConstantPayloadGenerator.java
similarity index 79%
copy from
trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
copy to
trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampConstantPayloadGenerator.java
index 90fe279..8660ed3 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampConstantPayloadGenerator.java
@@ -26,7 +26,7 @@ import java.nio.ByteOrder;
import java.util.Random;
/**
- * This class behaves identically to TimestampRandomPayloadGenerator, except
the message size follows a gaussian
+ * This class behaves identically to TimestampConstantPayloadGenerator, except
the message size follows a gaussian
* distribution.
*
* This should be used in conjunction with TimestampRecordProcessor in the
Consumer to measure true end-to-end latency
@@ -35,12 +35,11 @@ import java.util.Random;
* `messageSizeAverage` - The average size in bytes of each message.
* `messageSizeDeviation` - The standard deviation to use when calculating
message size.
* `messagesUntilSizeChange` - The number of messages to keep at the same size.
- * `seed` - Used to initialize Random() to remove some non-determinism.
*
* Here is an example spec:
*
* {
- * "type": "gaussianTimestampRandom",
+ * "type": "gaussianTimestampConstant",
* "messageSizeAverage": 512,
* "messageSizeDeviation": 100,
* "messagesUntilSizeChange": 100
@@ -56,9 +55,9 @@ import java.util.Random;
* ~99% of the messages are between 212 and 812 bytes
*/
-public class GaussianTimestampRandomPayloadGenerator implements
PayloadGenerator {
+public class GaussianTimestampConstantPayloadGenerator implements
PayloadGenerator {
private final int messageSizeAverage;
- private final int messageSizeDeviation;
+ private final double messageSizeDeviation;
private final int messagesUntilSizeChange;
private final long seed;
@@ -69,10 +68,10 @@ public class GaussianTimestampRandomPayloadGenerator
implements PayloadGenerator
private int messageSize = 0;
@JsonCreator
- public
GaussianTimestampRandomPayloadGenerator(@JsonProperty("messageSizeAverage") int
messageSizeAverage,
-
@JsonProperty("messageSizeDeviation") int messageSizeDeviation,
-
@JsonProperty("messagesUntilSizeChange") int messagesUntilSizeChange,
- @JsonProperty("seed") long
seed) {
+ public
GaussianTimestampConstantPayloadGenerator(@JsonProperty("messageSizeAverage")
int messageSizeAverage,
+
@JsonProperty("messageSizeDeviation") double messageSizeDeviation,
+
@JsonProperty("messagesUntilSizeChange") int messagesUntilSizeChange,
+ @JsonProperty("seed")
long seed) {
this.messageSizeAverage = messageSizeAverage;
this.messageSizeDeviation = messageSizeDeviation;
this.seed = seed;
@@ -87,11 +86,16 @@ public class GaussianTimestampRandomPayloadGenerator
implements PayloadGenerator
}
@JsonProperty
- public long messageSizeDeviation() {
+ public double messageSizeDeviation() {
return messageSizeDeviation;
}
@JsonProperty
+ public int messagesUntilSizeChange() {
+ return messagesUntilSizeChange;
+ }
+
+ @JsonProperty
public long seed() {
return seed;
}
@@ -108,9 +112,8 @@ public class GaussianTimestampRandomPayloadGenerator
implements PayloadGenerator
}
messageTracker += 1;
- // Generate out of order to prevent inclusion of random number
generation in latency numbers.
+ // Generate the byte array before the timestamp generation.
byte[] result = new byte[messageSize];
- random.nextBytes(result);
// Do the timestamp generation as the very last task.
buffer.clear();
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
index 90fe279..48261a4 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
@@ -58,7 +58,7 @@ import java.util.Random;
public class GaussianTimestampRandomPayloadGenerator implements
PayloadGenerator {
private final int messageSizeAverage;
- private final int messageSizeDeviation;
+ private final double messageSizeDeviation;
private final int messagesUntilSizeChange;
private final long seed;
@@ -70,7 +70,7 @@ public class GaussianTimestampRandomPayloadGenerator
implements PayloadGenerator
@JsonCreator
public
GaussianTimestampRandomPayloadGenerator(@JsonProperty("messageSizeAverage") int
messageSizeAverage,
-
@JsonProperty("messageSizeDeviation") int messageSizeDeviation,
+
@JsonProperty("messageSizeDeviation") double messageSizeDeviation,
@JsonProperty("messagesUntilSizeChange") int messagesUntilSizeChange,
@JsonProperty("seed") long
seed) {
this.messageSizeAverage = messageSizeAverage;
@@ -87,11 +87,16 @@ public class GaussianTimestampRandomPayloadGenerator
implements PayloadGenerator
}
@JsonProperty
- public long messageSizeDeviation() {
+ public double messageSizeDeviation() {
return messageSizeDeviation;
}
@JsonProperty
+ public int messagesUntilSizeChange() {
+ return messagesUntilSizeChange;
+ }
+
+ @JsonProperty
public long seed() {
return seed;
}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
index 225a663..6d7393d 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
@@ -37,7 +37,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
@JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name =
"randomComponent"),
@JsonSubTypes.Type(value = TimestampRandomPayloadGenerator.class, name =
"timestampRandom"),
- @JsonSubTypes.Type(value = GaussianTimestampRandomPayloadGenerator.class,
name = "gaussianTimestampRandom")
+ @JsonSubTypes.Type(value = TimestampConstantPayloadGenerator.class, name =
"timestampConstant"),
+ @JsonSubTypes.Type(value = GaussianTimestampRandomPayloadGenerator.class,
name = "gaussianTimestampRandom"),
+ @JsonSubTypes.Type(value =
GaussianTimestampConstantPayloadGenerator.class, name =
"gaussianTimestampConstant")
})
public interface PayloadGenerator {
/**
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampConstantPayloadGenerator.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampConstantPayloadGenerator.java
new file mode 100644
index 0000000..e9c4bc8
--- /dev/null
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampConstantPayloadGenerator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteOrder;
+import java.nio.ByteBuffer;
+
+/**
+ * A PayloadGenerator which generates a timestamped constant payload.
+ *
+ * The timestamp used for this class is in milliseconds since epoch, encoded
directly to the first several bytes of the
+ * payload.
+ *
+ * This should be used in conjunction with TimestampRecordProcessor in the
Consumer to measure true end-to-end latency
+ * of a system.
+ *
+ * `size` - The size in bytes of each message.
+ *
+ * Here is an example spec:
+ *
+ * {
+ * "type": "timestampConstant",
+ * "size": 512
+ * }
+ *
+ * This will generate a 512-byte message with the first several bytes encoded
with the timestamp.
+ */
+public class TimestampConstantPayloadGenerator implements PayloadGenerator {
+ private final int size;
+ private final ByteBuffer buffer;
+
+ @JsonCreator
+ public TimestampConstantPayloadGenerator(@JsonProperty("size") int size) {
+ this.size = size;
+ if (size < Long.BYTES) {
+ throw new RuntimeException("The size of the payload must be
greater than or equal to " + Long.BYTES + ".");
+ }
+ buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ }
+
+ @JsonProperty
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public synchronized byte[] generate(long position) {
+ // Generate the byte array before the timestamp generation.
+ byte[] result = new byte[size];
+
+ // Do the timestamp generation as the very last task.
+ buffer.clear();
+ buffer.putLong(Time.SYSTEM.milliseconds());
+ buffer.rewind();
+ System.arraycopy(buffer.array(), 0, result, 0, Long.BYTES);
+ return result;
+ }
+}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
index 658016a..035d459 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
@@ -38,7 +38,7 @@ import java.nio.ByteOrder;
*
* Example spec:
* {
- * "type": "timestampRandom",
+ * "type": "timestamp",
* "histogramMaxMs": 10000,
* "histogramMinMs": 0,
* "histogramStepMs": 1