This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 15d1cc8  MINOR: Improvements and fixes for Trogdor payload generators. 
(#10621)
15d1cc8 is described below

commit 15d1cc8b5435eda7c36e42ae0898b0671b70a96f
Author: Scott Hendricks <[email protected]>
AuthorDate: Thu May 6 20:46:01 2021 -0400

    MINOR: Improvements and fixes for Trogdor payload generators. (#10621)
    
    * Changes the new Throughput Generators to track messages per window
    instead of making per-second calculations which can have rounding errors.
    Also, one of these had a calculation error which prompted this change in
    the first place.
    
    * Fixes a couple typos.
    
    * Fixes an error where certain JSON fields were not exposed, causing the
    workloads to not behave as intended.
    
    * Fixes a bug where we use wait not in a loop, which exits too quickly.
    
    * Adds additional constant payload generators.
    
    * Fixes problems with an example spec.
    
    * Fixes several off-by-one comparisons.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../workload/ConstantThroughputGenerator.java      | 39 ++++++-----
 .../trogdor/workload/GaussianFlushGenerator.java   |  6 +-
 .../workload/GaussianThroughputGenerator.java      | 49 +++++++-------
 ...GaussianTimestampConstantPayloadGenerator.java} | 27 ++++----
 .../GaussianTimestampRandomPayloadGenerator.java   | 11 ++-
 .../kafka/trogdor/workload/PayloadGenerator.java   |  4 +-
 .../TimestampConstantPayloadGenerator.java         | 78 ++++++++++++++++++++++
 .../trogdor/workload/TimestampRecordProcessor.java |  2 +-
 8 files changed, 150 insertions(+), 66 deletions(-)

diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
index 19e5af0..9e5eeb9 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
@@ -26,26 +26,21 @@ import org.apache.kafka.common.utils.Time;
  * The lower the window size, the smoother the traffic will be. Using a 100ms 
window offers no noticeable spikes in
  * traffic while still being long enough to avoid too much overhead.
  *
- * WARNING: Due to binary nature of throughput in terms of messages sent in a 
window, each window will send at least 1
- * message, and each window sends the same number of messages, rounded down. 
For example, 99 messages per second with a
- * 100ms window will only send 90 messages per second, or 9 messages per 
window. Another example, in order to send only
- * 5 messages per second, a window size of 200ms is required. In cases like 
these, both the `messagesPerSecond` and
- * `windowSizeMs` parameters should be adjusted together to achieve more 
accurate throughput.
- *
  * Here is an example spec:
  *
  * {
  *    "type": "constant",
- *    "messagesPerSecond": 500,
+ *    "messagesPerWindow": 50,
  *    "windowSizeMs": 100
  * }
  *
  * This will produce a workload that runs 500 messages per second, with a 
maximum resolution of 50 messages per 100
  * millisecond.
+ *
+ * If `messagesPerWindow` is less than or equal to 0, `throttle` will not 
throttle at all and will return immediately.
  */
 
 public class ConstantThroughputGenerator implements ThroughputGenerator {
-    private final int messagesPerSecond;
     private final int messagesPerWindow;
     private final long windowSizeMs;
 
@@ -53,23 +48,25 @@ public class ConstantThroughputGenerator implements 
ThroughputGenerator {
     private int messageTracker = 0;
 
     @JsonCreator
-    public ConstantThroughputGenerator(@JsonProperty("messagesPerSecond") int 
messagesPerSecond,
+    public ConstantThroughputGenerator(@JsonProperty("messagesPerWindow") int 
messagesPerWindow,
                                        @JsonProperty("windowSizeMs") long 
windowSizeMs) {
-        // Calcualte the default values.
+        // Calculate the default values.
         if (windowSizeMs <= 0) {
             windowSizeMs = 100;
         }
         this.windowSizeMs = windowSizeMs;
-        this.messagesPerSecond = messagesPerSecond;
-
-        // Use the rest of the parameters to calculate window properties.
-        this.messagesPerWindow = (int) ((long) messagesPerSecond / 
windowSizeMs);
+        this.messagesPerWindow = messagesPerWindow;
         calculateNextWindow();
     }
 
     @JsonProperty
-    public int messagesPerSecond() {
-        return messagesPerSecond;
+    public long windowSizeMs() {
+        return windowSizeMs;
+    }
+
+    @JsonProperty
+    public int messagesPerWindow() {
+        return messagesPerWindow;
     }
 
     private void calculateNextWindow() {
@@ -79,7 +76,7 @@ public class ConstantThroughputGenerator implements 
ThroughputGenerator {
         // Calculate the next window start time.
         long now = Time.SYSTEM.milliseconds();
         if (nextWindowStarts > 0) {
-            while (nextWindowStarts < now) {
+            while (nextWindowStarts <= now) {
                 nextWindowStarts += windowSizeMs;
             }
         } else {
@@ -89,8 +86,8 @@ public class ConstantThroughputGenerator implements 
ThroughputGenerator {
 
     @Override
     public synchronized void throttle() throws InterruptedException {
-        // Run unthrottled if messagesPerSecond is negative.
-        if (messagesPerSecond < 0) {
+        // Run unthrottled if messagesPerWindow is not positive.
+        if (messagesPerWindow <= 0) {
             return;
         }
 
@@ -106,7 +103,9 @@ public class ConstantThroughputGenerator implements 
ThroughputGenerator {
         if (messageTracker >= messagesPerWindow) {
 
             // Wait the difference in time between now and when the next 
window starts.
-            wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+            while (nextWindowStarts > Time.SYSTEM.milliseconds()) {
+                wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+            }
         }
     }
 }
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
index a3157db..eb6845e 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
@@ -52,7 +52,7 @@ import java.util.Random;
 
 public class GaussianFlushGenerator implements FlushGenerator {
     private final int messagesPerFlushAverage;
-    private final int messagesPerFlushDeviation;
+    private final double messagesPerFlushDeviation;
 
     private final Random random = new Random();
 
@@ -61,7 +61,7 @@ public class GaussianFlushGenerator implements FlushGenerator 
{
 
     @JsonCreator
     public GaussianFlushGenerator(@JsonProperty("messagesPerFlushAverage") int 
messagesPerFlushAverage,
-                                  @JsonProperty("messagesPerFlushDeviation") 
int messagesPerFlushDeviation) {
+                                  @JsonProperty("messagesPerFlushDeviation") 
double messagesPerFlushDeviation) {
         this.messagesPerFlushAverage = messagesPerFlushAverage;
         this.messagesPerFlushDeviation = messagesPerFlushDeviation;
         calculateFlushSize();
@@ -73,7 +73,7 @@ public class GaussianFlushGenerator implements FlushGenerator 
{
     }
 
     @JsonProperty
-    public long messagesPerFlushDeviation() {
+    public double messagesPerFlushDeviation() {
         return messagesPerFlushDeviation;
     }
 
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
index 6d71ff5..a77298f 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
@@ -25,21 +25,17 @@ import java.util.Random;
 /*
  * This throughput generator configures throughput with a gaussian normal 
distribution on a per-window basis. You can
  * specify how many windows to keep the throughput at the rate before 
changing. All traffic will follow a gaussian
- * distribution centered around `messagesPerSecondAverage` with a deviation of 
`messagesPerSecondDeviation`.
+ * distribution centered around `messagesPerWindowAverage` with a deviation of 
`messagesPerWindowDeviation`.
  *
  * The lower the window size, the smoother the traffic will be. Using a 100ms 
window offers no noticeable spikes in
  * traffic while still being long enough to avoid too much overhead.
  *
- * WARNING: Due to binary nature of throughput in terms of messages sent in a 
window, this does not work well for an
- * average throughput of less than 5 messages per window.  In cases where you 
want lower throughput, please adjust the
- * `windowSizeMs` accordingly.
- *
  * Here is an example spec:
  *
  * {
  *    "type": "gaussian",
- *    "messagesPerSecondAverage": 500,
- *    "messagesPerSecondDeviation": 50,
+ *    "messagesPerWindowAverage": 50,
+ *    "messagesPerWindowDeviation": 5,
  *    "windowsUntilRateChange": 100,
  *    "windowSizeMs": 100
  * }
@@ -56,10 +52,8 @@ import java.util.Random;
  */
 
 public class GaussianThroughputGenerator implements ThroughputGenerator {
-    private final int messagesPerSecondAverage;
-    private final int messagesPerSecondDeviation;
     private final int messagesPerWindowAverage;
-    private final int messagesPerWindowDeviation;
+    private final double messagesPerWindowDeviation;
     private final int windowsUntilRateChange;
     private final long windowSizeMs;
 
@@ -71,35 +65,31 @@ public class GaussianThroughputGenerator implements 
ThroughputGenerator {
     private int throttleMessages = 0;
 
     @JsonCreator
-    public 
GaussianThroughputGenerator(@JsonProperty("messagesPerSecondAverage") int 
messagesPerSecondAverage,
-                                       
@JsonProperty("messagesPerSecondDeviation") int messagesPerSecondDeviation,
+    public 
GaussianThroughputGenerator(@JsonProperty("messagesPerWindowAverage") int 
messagesPerWindowAverage,
+                                       
@JsonProperty("messagesPerWindowDeviation") double messagesPerWindowDeviation,
                                        @JsonProperty("windowsUntilRateChange") 
int windowsUntilRateChange,
                                        @JsonProperty("windowSizeMs") long 
windowSizeMs) {
-        // Calcualte the default values.
+        // Calculate the default values.
         if (windowSizeMs <= 0) {
             windowSizeMs = 100;
         }
         this.windowSizeMs = windowSizeMs;
-        this.messagesPerSecondAverage = messagesPerSecondAverage;
-        this.messagesPerSecondDeviation = messagesPerSecondDeviation;
+        this.messagesPerWindowAverage = messagesPerWindowAverage;
+        this.messagesPerWindowDeviation = messagesPerWindowDeviation;
         this.windowsUntilRateChange = windowsUntilRateChange;
 
-        // Take per-second calculations and convert them to per-window 
calculations.
-        messagesPerWindowAverage = (int) (messagesPerSecondAverage * 
windowSizeMs / 1000);
-        messagesPerWindowDeviation = (int) (messagesPerSecondDeviation * 
windowSizeMs / 1000);
-
-        // Calcualte the first window.
+        // Calculate the first window.
         calculateNextWindow(true);
     }
 
     @JsonProperty
-    public int messagesPerSecondAverage() {
-        return messagesPerSecondAverage;
+    public int messagesPerWindowAverage() {
+        return messagesPerWindowAverage;
     }
 
     @JsonProperty
-    public long messagesPerSecondDeviation() {
-        return messagesPerSecondDeviation;
+    public double messagesPerWindowDeviation() {
+        return messagesPerWindowDeviation;
     }
 
     @JsonProperty
@@ -107,6 +97,11 @@ public class GaussianThroughputGenerator implements 
ThroughputGenerator {
         return windowsUntilRateChange;
     }
 
+    @JsonProperty
+    public long windowSizeMs() {
+        return windowSizeMs;
+    }
+
     private synchronized void calculateNextWindow(boolean force) {
         // Reset the message count.
         messageTracker = 0;
@@ -127,7 +122,7 @@ public class GaussianThroughputGenerator implements 
ThroughputGenerator {
 
             // Calculate the number of messages allowed in this window using a 
normal distribution.
             // The formula is: Messages = Gaussian * Deviation + Average
-            throttleMessages = Math.max((int) (random.nextGaussian() * 
(double) messagesPerWindowDeviation) + messagesPerWindowAverage, 1);
+            throttleMessages = Math.max((int) (random.nextGaussian() * 
messagesPerWindowDeviation) + messagesPerWindowAverage, 1);
         }
         windowTracker += 1;
     }
@@ -146,7 +141,9 @@ public class GaussianThroughputGenerator implements 
ThroughputGenerator {
         if (messageTracker >= throttleMessages) {
 
             // Wait the difference in time between now and when the next 
window starts.
-            wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+            while (nextWindowStarts > Time.SYSTEM.milliseconds()) {
+                wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+            }
         }
     }
 }
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampConstantPayloadGenerator.java
similarity index 79%
copy from 
trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
copy to 
trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampConstantPayloadGenerator.java
index 90fe279..8660ed3 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampConstantPayloadGenerator.java
@@ -26,7 +26,7 @@ import java.nio.ByteOrder;
 import java.util.Random;
 
 /**
- * This class behaves identically to TimestampRandomPayloadGenerator, except 
the message size follows a gaussian
+ * This class behaves identically to TimestampConstantPayloadGenerator, except 
the message size follows a gaussian
  * distribution.
  *
  * This should be used in conjunction with TimestampRecordProcessor in the 
Consumer to measure true end-to-end latency
@@ -35,12 +35,11 @@ import java.util.Random;
  * `messageSizeAverage` - The average size in bytes of each message.
  * `messageSizeDeviation` - The standard deviation to use when calculating 
message size.
  * `messagesUntilSizeChange` - The number of messages to keep at the same size.
- * `seed` - Used to initialize Random() to remove some non-determinism.
  *
  * Here is an example spec:
  *
  * {
- *    "type": "gaussianTimestampRandom",
+ *    "type": "gaussianTimestampConstant",
  *    "messageSizeAverage": 512,
  *    "messageSizeDeviation": 100,
  *    "messagesUntilSizeChange": 100
@@ -56,9 +55,9 @@ import java.util.Random;
  *    ~99% of the messages are between 212 and 812 bytes
  */
 
-public class GaussianTimestampRandomPayloadGenerator implements 
PayloadGenerator {
+public class GaussianTimestampConstantPayloadGenerator implements 
PayloadGenerator {
     private final int messageSizeAverage;
-    private final int messageSizeDeviation;
+    private final double messageSizeDeviation;
     private final int messagesUntilSizeChange;
     private final long seed;
 
@@ -69,10 +68,10 @@ public class GaussianTimestampRandomPayloadGenerator 
implements PayloadGenerator
     private int messageSize = 0;
 
     @JsonCreator
-    public 
GaussianTimestampRandomPayloadGenerator(@JsonProperty("messageSizeAverage") int 
messageSizeAverage,
-                                                   
@JsonProperty("messageSizeDeviation") int messageSizeDeviation,
-                                                   
@JsonProperty("messagesUntilSizeChange") int messagesUntilSizeChange,
-                                                   @JsonProperty("seed") long 
seed) {
+    public 
GaussianTimestampConstantPayloadGenerator(@JsonProperty("messageSizeAverage") 
int messageSizeAverage,
+                                                     
@JsonProperty("messageSizeDeviation") double messageSizeDeviation,
+                                                     
@JsonProperty("messagesUntilSizeChange") int messagesUntilSizeChange,
+                                                     @JsonProperty("seed") 
long seed) {
         this.messageSizeAverage = messageSizeAverage;
         this.messageSizeDeviation = messageSizeDeviation;
         this.seed = seed;
@@ -87,11 +86,16 @@ public class GaussianTimestampRandomPayloadGenerator 
implements PayloadGenerator
     }
 
     @JsonProperty
-    public long messageSizeDeviation() {
+    public double messageSizeDeviation() {
         return messageSizeDeviation;
     }
 
     @JsonProperty
+    public int messagesUntilSizeChange() {
+        return messagesUntilSizeChange;
+    }
+
+    @JsonProperty
     public long seed() {
         return seed;
     }
@@ -108,9 +112,8 @@ public class GaussianTimestampRandomPayloadGenerator 
implements PayloadGenerator
         }
         messageTracker += 1;
 
-        // Generate out of order to prevent inclusion of random number 
generation in latency numbers.
+        // Generate the byte array before the timestamp generation.
         byte[] result = new byte[messageSize];
-        random.nextBytes(result);
 
         // Do the timestamp generation as the very last task.
         buffer.clear();
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
index 90fe279..48261a4 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
@@ -58,7 +58,7 @@ import java.util.Random;
 
 public class GaussianTimestampRandomPayloadGenerator implements 
PayloadGenerator {
     private final int messageSizeAverage;
-    private final int messageSizeDeviation;
+    private final double messageSizeDeviation;
     private final int messagesUntilSizeChange;
     private final long seed;
 
@@ -70,7 +70,7 @@ public class GaussianTimestampRandomPayloadGenerator 
implements PayloadGenerator
 
     @JsonCreator
     public 
GaussianTimestampRandomPayloadGenerator(@JsonProperty("messageSizeAverage") int 
messageSizeAverage,
-                                                   
@JsonProperty("messageSizeDeviation") int messageSizeDeviation,
+                                                   
@JsonProperty("messageSizeDeviation") double messageSizeDeviation,
                                                    
@JsonProperty("messagesUntilSizeChange") int messagesUntilSizeChange,
                                                    @JsonProperty("seed") long 
seed) {
         this.messageSizeAverage = messageSizeAverage;
@@ -87,11 +87,16 @@ public class GaussianTimestampRandomPayloadGenerator 
implements PayloadGenerator
     }
 
     @JsonProperty
-    public long messageSizeDeviation() {
+    public double messageSizeDeviation() {
         return messageSizeDeviation;
     }
 
     @JsonProperty
+    public int messagesUntilSizeChange() {
+        return messagesUntilSizeChange;
+    }
+
+    @JsonProperty
     public long seed() {
         return seed;
     }
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
index 225a663..6d7393d 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
@@ -37,7 +37,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
     @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
     @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name = 
"randomComponent"),
     @JsonSubTypes.Type(value = TimestampRandomPayloadGenerator.class, name = 
"timestampRandom"),
-    @JsonSubTypes.Type(value = GaussianTimestampRandomPayloadGenerator.class, 
name = "gaussianTimestampRandom")
+    @JsonSubTypes.Type(value = TimestampConstantPayloadGenerator.class, name = 
"timestampConstant"),
+    @JsonSubTypes.Type(value = GaussianTimestampRandomPayloadGenerator.class, 
name = "gaussianTimestampRandom"),
+    @JsonSubTypes.Type(value = 
GaussianTimestampConstantPayloadGenerator.class, name = 
"gaussianTimestampConstant")
     })
 public interface PayloadGenerator {
     /**
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampConstantPayloadGenerator.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampConstantPayloadGenerator.java
new file mode 100644
index 0000000..e9c4bc8
--- /dev/null
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampConstantPayloadGenerator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteOrder;
+import java.nio.ByteBuffer;
+
+/**
+ * A PayloadGenerator which generates a timestamped constant payload.
+ *
+ * The timestamp used for this class is in milliseconds since epoch, encoded 
directly to the first several bytes of the
+ * payload.
+ *
+ * This should be used in conjunction with TimestampRecordProcessor in the 
Consumer to measure true end-to-end latency
+ * of a system.
+ *
+ * `size` - The size in bytes of each message.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *    "type": "timestampConstant",
+ *    "size": 512
+ * }
+ *
+ * This will generate a 512-byte message with the first several bytes encoded 
with the timestamp.
+ */
+public class TimestampConstantPayloadGenerator implements PayloadGenerator {
+    private final int size;
+    private final ByteBuffer buffer;
+
+    @JsonCreator
+    public TimestampConstantPayloadGenerator(@JsonProperty("size") int size) {
+        this.size = size;
+        if (size < Long.BYTES) {
+            throw new RuntimeException("The size of the payload must be 
greater than or equal to " + Long.BYTES + ".");
+        }
+        buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.order(ByteOrder.LITTLE_ENDIAN);
+    }
+
+    @JsonProperty
+    public int size() {
+        return size;
+    }
+
+    @Override
+    public synchronized byte[] generate(long position) {
+        // Generate the byte array before the timestamp generation.
+        byte[] result = new byte[size];
+
+        // Do the timestamp generation as the very last task.
+        buffer.clear();
+        buffer.putLong(Time.SYSTEM.milliseconds());
+        buffer.rewind();
+        System.arraycopy(buffer.array(), 0, result, 0, Long.BYTES);
+        return result;
+    }
+}
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
index 658016a..035d459 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
@@ -38,7 +38,7 @@ import java.nio.ByteOrder;
  *
  * Example spec:
  * {
- *    "type": "timestampRandom",
+ *    "type": "timestamp",
  *    "histogramMaxMs": 10000,
  *    "histogramMinMs": 0,
  *    "histogramStepMs": 1

Reply via email to