This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cassandra-easy-stress.git


The following commit(s) were added to refs/heads/main by this push:
     new 8407586  Added a new run flag --parquet which will take all client 
latency events and write to a Apache Parquet file (#44)
8407586 is described below

commit 8407586c90a0b1063416c8e47ce1ba1ad160c369
Author: dcapwell <[email protected]>
AuthorDate: Fri May 23 16:38:08 2025 -0700

    Added a new run flag --parquet which will take all client latency events 
and write to a Apache Parquet file (#44)
---
 build.gradle                                       |  7 ++
 manual/MANUAL.adoc                                 | 87 ++++++++++++++++++-
 manual/examples/cassandra-easy-stress-help.txt     |  5 ++
 .../com/rustyrazorblade/easycassstress/Either.kt   |  7 ++
 .../com/rustyrazorblade/easycassstress/Metrics.kt  |  6 --
 .../easycassstress/OperationCallback.kt            | 35 +++-----
 .../easycassstress/ProfileRunner.kt                |  8 +-
 .../rustyrazorblade/easycassstress/RequestQueue.kt | 22 +----
 .../easycassstress/StressContext.kt                | 44 +++++++++-
 .../easycassstress/collector/AsyncCollector.kt     | 86 +++++++++++++++++++
 .../easycassstress/collector/Collector.kt          | 25 ++++++
 .../easycassstress/collector/CompositeCollector.kt | 32 +++++++
 .../easycassstress/collector/HdrCollector.kt       | 63 ++++++++++++++
 .../easycassstress/collector/ParquetCollector.kt   | 98 ++++++++++++++++++++++
 .../rustyrazorblade/easycassstress/commands/Run.kt | 88 +++++++++++--------
 .../easycassstress/workloads/IStressProfile.kt     | 32 +++----
 .../collector/ParquetCollectorTest.kt              | 65 ++++++++++++++
 17 files changed, 609 insertions(+), 101 deletions(-)

diff --git a/build.gradle b/build.gradle
index 5b1a7b3..801d6ac 100644
--- a/build.gradle
+++ b/build.gradle
@@ -101,6 +101,13 @@ dependencies {
     implementation 'org.apache.commons:commons-math3:3.6.1'
     implementation 'org.hdrhistogram:HdrHistogram:2.1.12'
 
+    implementation("org.agrona:agrona:1.22.0") // can't use the 2.x or 1.23+ 
line as it requires JDK 17
+
+    // for Parquet support
+    implementation("org.apache.parquet:parquet-hadoop:1.15.2")
+    implementation 'org.apache.hadoop:hadoop-common:3.4.1'
+    implementation 'org.apache.hadoop:hadoop-mapreduce-client-common:3.4.1'
+
     // exporting dropwizard metrics
 
     testImplementation group: 'org.junit.jupiter', name: 
'junit-jupiter-engine', version: '5.1.0'
diff --git a/manual/MANUAL.adoc b/manual/MANUAL.adoc
index d2cf85b..7aae32c 100644
--- a/manual/MANUAL.adoc
+++ b/manual/MANUAL.adoc
@@ -53,7 +53,7 @@ $ sudo apt install cassandra-easy-stress
 
 ==== RPM Packages
 
-eYou'll need the bintray repo set up on your machine.  Create this 
`/etc/yum.repos.d/tlp-tools.repo`:
+You'll need the bintray repo set up on your machine.  Create this 
`/etc/yum.repos.d/tlp-tools.repo`:
 
 ```
 [bintraybintray-rustyrazorblade-tlp-tools-rpm]
@@ -315,6 +315,91 @@ The Debian package installs a basic configuration file to 
`/etc/cassandra-easy-s
 
 cassandra-easy-stress automatically runs an HTTP server exporting metrics in 
Prometheus format on port 9501.
 
+=== Capturing Client Latencies to Apache Parquet
+
+You can capture detailed client latency metrics to an Apache Parquet file 
using the `--parquet` flag followed by a path to a file or directory:
+
+```
+$ bin/cassandra-easy-stress run KeyValue --duration 5m --parquet rawlog.parquet
+```
+
+This writes operation metrics including operation type, success/failure 
status, start time, and duration to a Parquet file that can be analyzed later 
with data analysis tools like Pandas, Spark, DuckDB, or visualization tools 
that support the Parquet format.
+
+If a directory is provided instead of a file, cassandra-easy-stress will 
automatically create an appropriately named file in that directory.
+
+==== Analyzing Parquet Files with DuckDB
+
+The Parquet files created by cassandra-easy-stress can be easily analyzed 
using DuckDB, a lightweight analytical database engine. Here are some example 
queries to get you started:
+
+```sql
+-- Show summary statistics for operation latencies for every minute
+SELECT date_trunc('minute', epoch_ms(request_start_time_ms)) as minute,
+       COUNT(*) as count,
+       AVG(request_duration_ns / 1000 / 1000) as avg,
+       MIN(request_duration_ns / 1000 / 1000) as min,
+       MAX(request_duration_ns / 1000 / 1000) as max,
+       APPROX_QUANTILE(request_duration_ns / 1000 / 1000, .5) as p50,
+       APPROX_QUANTILE(request_duration_ns / 1000 / 1000, .9) as p90,
+       APPROX_QUANTILE(request_duration_ns / 1000 / 1000, .99) as p99,
+       COUNT(CASE WHEN failure_reason != '' THEN 1 END) AS errors,
+       COUNT(CASE WHEN failure_reason = 'ReadTimeoutException' THEN 1 END) AS 
read_timeouts,
+       COUNT(CASE WHEN failure_reason = 'WriteTimeoutException' THEN 1 END) AS 
write_timeouts,
+FROM read_parquet('rawlog.parquet')
+GROUP BY minute
+ORDER BY minute;
+
+┌─────────────────────┬────────┬─────────────────────┬──────────┬────────────────────┬─────────────────────┬─────────────────────┬────────────────────┬────────┬───────────────┬────────────────┐
+│       minute        │ count  │         avg         │   min    │        max   
      │         p50         │         p90         │        p99         │ errors 
│ read_timeouts │ write_timeouts │
+│      timestamp      │ int64  │       double        │  double  │       double 
      │       double        │       double        │       double       │ int64  
│     int64     │     int64      │
+├─────────────────────┼────────┼─────────────────────┼──────────┼────────────────────┼─────────────────────┼─────────────────────┼────────────────────┼────────┼───────────────┼────────────────┤
+│ 2025-05-23 22:45:00 │ 141911 │  18.891404855317813 │ 0.088042 │        
1305.993875 │ 0.23617621307864622 │  0.5454138286498701 │   755.160273634975 │  
    0 │             0 │              0 │
+│ 2025-05-23 22:46:00 │ 300081 │ 0.26154326542833034 │ 0.091458 │          
16.620042 │  0.2198726495794396 │ 0.28866495065114356 │ 1.0759477146627234 │    
  0 │             0 │              0 │
+│ 2025-05-23 22:47:00 │ 300075 │ 0.29655502679997087 │ 0.089208 │          
19.247875 │  0.2241928371244093 │  0.3096208807374364 │ 1.8582042492087465 │    
  0 │             0 │              0 │
+│ 2025-05-23 22:48:00 │ 298543 │  0.6524298801211285 │ 0.093666 │ 
198.99904199999997 │ 0.22677466153454573 │ 0.33573102120265713 │  
9.839418314581492 │      0 │             0 │              0 │
+│ 2025-05-23 22:49:00 │ 300053 │ 0.30696147925533085 │ 0.100167 │          
64.216666 │  0.2249157848195121 │  0.3072763658664282 │ 1.6342296967730887 │    
  0 │             0 │              0 │
+│ 2025-05-23 22:50:00 │  24765 │  0.4530204902079576 │  0.12675 │          
39.548167 │  0.2259263537252715 │ 0.30608390597020046 │ 7.7139616210838575 │    
  0 │             0 │              0 │
+└─────────────────────┴────────┴─────────────────────┴──────────┴────────────────────┴─────────────────────┴─────────────────────┴────────────────────┴────────┴───────────────┴────────────────┘
+
+
+-- Show error counts
+SELECT failure_reason, count(*)
+FROM read_parquet('rawlog.parquet')
+WHERE failure_reason != ''
+GROUP BY failure_reason;
+```
+
+You can also use DuckDB through its various clients including Python, R, Java, 
and JDBC.
+
+==== Understanding Request Time vs Service Time
+
+When analyzing latency data from the Parquet files, it's important to 
understand the distinction between two key metrics:
+
+* **Service Time**: This is the actual time it takes for the database to 
process a request and return a response once the request is received by the 
database. It measures only the execution time of the operation.
+
+* **Request Time**: This is the total time from when the client intended to 
make the request until receiving the response. It includes the service time 
plus any queue time or delays that might have occurred before the request was 
actually sent to the database.
+
+The difference between these metrics is critical for understanding coordinated 
omission, a common problem in performance testing where the test client doesn't 
accurately capture the full latency that would be experienced by real users 
when the system is under load.
+
+For example, if your database is overloaded and can only process 100 
operations per second, but your test is trying to send 200 operations per 
second:
+
+* A naïve benchmark would only measure the service time of the operations that 
actually got processed, missing the fact that half the operations were delayed.
+* A properly instrumented benchmark (like cassandra-easy-stress) captures the 
request time, which includes how long operations had to wait in a queue.
+
+When using the Parquet files for analysis, you can examine both metrics to get 
a more complete picture of your system's performance under load:
+
+```sql
+-- Compare average service time vs request time by operation type
+SELECT 
+    operation,
+    AVG(service_duration_ns / 1000 / 1000) as avg_service_time_ms,
+    AVG(request_duration_ns / 1000 / 1000) as avg_request_time_ms,
+    AVG(request_duration_ns - service_duration_ns) / 1000 / 1000 as 
avg_queue_time_ms
+FROM read_parquet('rawlog.parquet')
+GROUP BY operation;
+```
+
+A significant difference between average request time and average service time 
indicates queuing or scheduling delays in your system, which can be an early 
warning sign of performance bottlenecks.
+
 === Workload Restrictions
 
 The `BasicTimeSeries` workload only supports Cassandra versions 3.0 and above. 
This is because range deletes are used by this workload during runtime. Range 
deletes are only support in Cassandra versions 3.0. An exception will is thrown 
if this workload is used and a Cassandra version less than 3.0 is detected 
during runtime.
diff --git a/manual/examples/cassandra-easy-stress-help.txt 
b/manual/examples/cassandra-easy-stress-help.txt
index 1b71f3a..a32b22a 100644
--- a/manual/examples/cassandra-easy-stress-help.txt
+++ b/manual/examples/cassandra-easy-stress-help.txt
@@ -97,6 +97,11 @@ Usage: cassandra-easy-stress [options] [command] [command 
options]
             Default: false
           --paging
             Override the driver's default page size.
+          --parquet
+            Capture client latency metrics to a Apache Parquet file at the
+            specified path.  If the file is a directory, the file will be
+            named rawlog.parquet within that directory
+            Default: <empty string>
           --partitiongenerator, --pg
             Method of generating partition keys.  Supports random, normal 
             (gaussian), and sequence.
diff --git a/src/main/kotlin/com/rustyrazorblade/easycassstress/Either.kt 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/Either.kt
new file mode 100644
index 0000000..75135c3
--- /dev/null
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/Either.kt
@@ -0,0 +1,7 @@
+package com.rustyrazorblade.easycassstress
+
+sealed class Either<out A, out B> {
+    class Left<A>(val value: A) : Either<A, Nothing>()
+
+    class Right<B>(val value: B) : Either<Nothing, B>()
+}
diff --git a/src/main/kotlin/com/rustyrazorblade/easycassstress/Metrics.kt 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/Metrics.kt
index 722868c..b5245f3 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/Metrics.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/Metrics.kt
@@ -22,7 +22,6 @@ import com.codahale.metrics.ScheduledReporter
 import io.prometheus.client.CollectorRegistry
 import io.prometheus.client.dropwizard.DropwizardExports
 import io.prometheus.client.exporter.HTTPServer
-import org.HdrHistogram.SynchronizedHistogram
 import java.util.Optional
 import java.util.concurrent.TimeUnit
 
@@ -71,11 +70,6 @@ class Metrics(val metricRegistry: MetricRegistry, val 
reporters: List<ScheduledR
     val deletionThroughputTracker = getTracker { deletions.count }.start()
     val populateThroughputTracker = getTracker { populate.count }.start()
 
-    // Using a synchronized histogram for now, we may need to change this 
later if it's a perf bottleneck
-    val mutationHistogram = SynchronizedHistogram(2)
-    val selectHistogram = SynchronizedHistogram(2)
-    val deleteHistogram = SynchronizedHistogram(2)
-
     /**
      * We track throughput using separate structures than Dropwizard
      */
diff --git 
a/src/main/kotlin/com/rustyrazorblade/easycassstress/OperationCallback.kt 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/OperationCallback.kt
index 3dd8ad9..f4969c9 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/OperationCallback.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/OperationCallback.kt
@@ -18,9 +18,11 @@
 package com.rustyrazorblade.easycassstress
 
 import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.google.common.base.Throwables
 import com.rustyrazorblade.easycassstress.workloads.IStressRunner
 import com.rustyrazorblade.easycassstress.workloads.Operation
 import org.apache.logging.log4j.kotlin.logger
+import java.util.concurrent.TimeUnit
 import java.util.function.BiConsumer
 
 /**
@@ -32,8 +34,9 @@ class OperationCallback(
     val context: StressContext,
     val runner: IStressRunner,
     val op: Operation,
+    val startNanos: Long,
+    val populatePhase: Boolean,
     val paginate: Boolean = false,
-    val writeHdr: Boolean = true,
 ) : BiConsumer<AsyncResultSet?, Throwable?> {
     companion object {
         val log = logger()
@@ -45,6 +48,7 @@ class OperationCallback(
     ) {
         if (t != null) {
             context.metrics.errors.mark()
+            context.collect(op, Either.Right(Throwables.getRootCause(t)), 
startNanos, System.nanoTime())
             log.error { t }
             return
         }
@@ -56,39 +60,26 @@ class OperationCallback(
                 result.fetchNextPage()
             }
         }
+        val endNanos = System.nanoTime()
+        context.timer(op, populatePhase).update(endNanos - op.createdAtNanos, 
TimeUnit.NANOSECONDS)
+        // TODO (visibility): include details about paging?
+        context.collect(op, Either.Left(result!!), startNanos, endNanos)
 
-        val time = op.startTime.stop()
-
-        // we log to the HDR histogram and do the callback for mutations
+        // do the callback for mutations
         // might extend this to select, but I can't see a reason for it now
         when (op) {
             is Operation.Mutation -> {
-                if (writeHdr) {
-                    context.metrics.mutationHistogram.recordValue(time)
-                }
                 runner.onSuccess(op, result)
             }
-
-            is Operation.Deletion -> {
-                if (writeHdr) {
-                    context.metrics.deleteHistogram.recordValue(time)
-                }
-            }
-
-            is Operation.SelectStatement -> {
-                if (writeHdr) {
-                    context.metrics.selectHistogram.recordValue(time)
-                }
-            }
             is Operation.DDL -> {
-                if (writeHdr) {
-                    context.metrics.mutationHistogram.recordValue(time)
-                }
                 runner.onSuccess(op, result)
             }
             is Operation.Stop -> {
                 throw OperationStopException()
             }
+            else -> {
+                // ignore
+            }
         }
     }
 }
diff --git 
a/src/main/kotlin/com/rustyrazorblade/easycassstress/ProfileRunner.kt 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/ProfileRunner.kt
index c94d6e6..fd0478b 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/ProfileRunner.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/ProfileRunner.kt
@@ -142,6 +142,7 @@ class ProfileRunner(
         var paginate = context.mainArguments.paginate
         for (op in queue.getNextOperation()) {
             // In driver v4, async execution returns a CompletionStage
+            val startNanos = System.nanoTime()
             val future =
                 when (op) {
                     is Operation.DDL -> {
@@ -161,8 +162,9 @@ class ProfileRunner(
                     context,
                     runner,
                     op,
+                    startNanos,
+                    queue.populatePhase,
                     paginate = paginate,
-                    writeHdr = context.mainArguments.hdrHistogramPrefix != "",
                 )
 
             future.whenComplete { result, error ->
@@ -200,6 +202,7 @@ class ProfileRunner(
 
         try {
             for (op in queue.getNextOperation()) {
+                val startNanos = System.nanoTime()
                 val future = context.session.executeAsync(op.bound!!)
 
                 // Create callback to handle the result
@@ -208,8 +211,9 @@ class ProfileRunner(
                         context,
                         runner,
                         op,
+                        startNanos,
+                        queue.populatePhase,
                         paginate = false,
-                        writeHdr = context.mainArguments.hdrHistogramPrefix != 
"",
                     )
 
                 future.whenComplete { result, error ->
diff --git a/src/main/kotlin/com/rustyrazorblade/easycassstress/RequestQueue.kt 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/RequestQueue.kt
index b6d65ad..f57e834 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/RequestQueue.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/RequestQueue.kt
@@ -17,7 +17,6 @@
  */
 package com.rustyrazorblade.easycassstress
 
-import com.codahale.metrics.Timer
 import com.rustyrazorblade.easycassstress.workloads.IStressRunner
 import com.rustyrazorblade.easycassstress.workloads.Operation
 import org.apache.logging.log4j.kotlin.logger
@@ -38,7 +37,7 @@ class RequestQueue(
     runner: IStressRunner,
     readRate: Double,
     deleteRate: Double,
-    populatePhase: Boolean = false,
+    val populatePhase: Boolean = false,
 ) {
     val queue = 
ArrayBlockingQueue<Operation>(context.mainArguments.queueDepth.toInt(), true)
     var generatorThread: Thread
@@ -57,23 +56,6 @@ class RequestQueue(
                 var executed = 0L
                 log.info("populate=$populatePhase total values: $totalValues, 
duration: $duration")
 
-                // we're using a separate timer for populate phase
-                // regardless of the operation performed
-                fun getTimer(operation: Operation): Timer {
-                    return if (populatePhase) {
-                        context.metrics.populate
-                    } else {
-                        when (operation) {
-                            is Operation.SelectStatement -> 
context.metrics.selects
-                            is Operation.Mutation -> context.metrics.mutations
-                            is Operation.Deletion -> context.metrics.deletions
-                            is Operation.Stop -> throw OperationStopException()
-                            // maybe this should be under DDL, it's a weird 
case.
-                            is Operation.DDL -> context.metrics.mutations
-                        }
-                    }
-                }
-
                 for (key in partitionKeyGenerator.generateKey(totalValues, 
context.mainArguments.partitionValues)) {
                     if (duration > 0 && 
desiredEndTime.isBefore(LocalDateTime.now())) {
                         log.info("Reached duration, ending")
@@ -113,8 +95,6 @@ class RequestQueue(
                             runner.getNextMutation(key)
                         }
 
-                    op.startTime = getTimer(op).time()
-
                     if (!queue.offer(op)) {
                         context.metrics.errors.mark()
                     }
diff --git 
a/src/main/kotlin/com/rustyrazorblade/easycassstress/StressContext.kt 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/StressContext.kt
index 8a3198d..bc6d95b 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/StressContext.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/StressContext.kt
@@ -17,10 +17,14 @@
  */
 package com.rustyrazorblade.easycassstress
 
+import com.codahale.metrics.Timer
 import com.datastax.oss.driver.api.core.CqlSession
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
 import com.google.common.util.concurrent.RateLimiter
+import com.rustyrazorblade.easycassstress.collector.Collector
 import com.rustyrazorblade.easycassstress.commands.Run
 import com.rustyrazorblade.easycassstress.generators.Registry
+import com.rustyrazorblade.easycassstress.workloads.Operation
 
 data class StressContext(
     val session: CqlSession,
@@ -29,4 +33,42 @@ data class StressContext(
     val metrics: Metrics,
     val registry: Registry,
     val rateLimiter: RateLimiter?,
-)
+    val collector: Collector,
+) {
+    fun collect(
+        op: Operation,
+        result: Either<AsyncResultSet, Throwable>,
+        startNanos: Long,
+        endNanos: Long,
+    ) = collector.collect(this, op, result, startNanos, endNanos)
+
+    // we're using a separate timer for populate phase
+    // regardless of the operation performed
+    fun timer(
+        op: Operation,
+        populatePhase: Boolean,
+    ): Timer =
+        if (populatePhase) {
+            metrics.populate
+        } else {
+            when (op) {
+                is Operation.SelectStatement -> metrics.selects
+                is Operation.Mutation -> metrics.mutations
+                is Operation.Deletion -> metrics.deletions
+                is Operation.Stop -> throw OperationStopException()
+                // maybe this should be under DDL, it's a weird case.
+                is Operation.DDL -> metrics.mutations
+            }
+        }
+}
+
+data class Context(
+    val session: CqlSession,
+    val mainArguments: Run,
+    val metrics: Metrics,
+    val registry: Registry,
+    val rateLimiter: RateLimiter?,
+    val collector: Collector,
+) {
+    fun stress(thread: Int): StressContext = StressContext(session, 
mainArguments, thread, metrics, registry, rateLimiter, collector)
+}
diff --git 
a/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/AsyncCollector.kt
 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/AsyncCollector.kt
new file mode 100644
index 0000000..2c2e861
--- /dev/null
+++ 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/AsyncCollector.kt
@@ -0,0 +1,86 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.rustyrazorblade.easycassstress.Context
+import com.rustyrazorblade.easycassstress.Either
+import com.rustyrazorblade.easycassstress.StressContext
+import com.rustyrazorblade.easycassstress.workloads.Operation
+import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue
+import org.agrona.concurrent.BackoffIdleStrategy
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * Base type for collectors that have "expensive" work that needs to happen 
off-thread.  Every call to collect will
+ * generate an object, push to a queue, and processed in another worker thread.
+ *
+ * Implementations must define the writer interface which will be called to do 
the "real" work for the collector.
+ */
+abstract class AsyncCollector(
+    fileOrDirectory: File,
+) : Collector {
+    data class Event(
+        val op: Operation,
+        val result: Either<AsyncResultSet, Throwable>,
+        val startNanos: Long,
+        val endNanos: Long,
+    )
+
+    interface Writer : Closeable {
+        fun write(event: Event)
+    }
+
+    private val queue = 
MpscArrayQueue<Event>(Integer.getInteger("cassandra-easy-stress.event_csv_queue_size",
 4096))
+    private val writer = createWriter(fileOrDirectory)
+
+    @Volatile
+    private var running = true
+    private val thread = Thread(this::run)
+    private val idleStrategy = BackoffIdleStrategy()
+
+    init {
+        thread.isDaemon = true
+        thread.name = "cassandra-easy-stress event raw log collector"
+        thread.start()
+    }
+
+    val dropped = AtomicInteger()
+    val counter = AtomicInteger()
+
+    abstract fun createWriter(fileOrDirectory: File): Writer
+
+    override fun collect(
+        ctx: StressContext,
+        op: Operation,
+        result: Either<AsyncResultSet, Throwable>,
+        startNanos: Long,
+        endNanos: Long,
+    ) {
+        if (!queue.offer(Event(op, result, startNanos, endNanos))) {
+            dropped.incrementAndGet()
+        }
+    }
+
+    private fun run() {
+        while (running) {
+            try {
+                val processed = queue.drain(writer::write)
+                counter.addAndGet(processed)
+                idleStrategy.idle(processed)
+            } catch (t: Throwable) {
+                System.err.println("Exception while writing raw logs")
+                t.printStackTrace()
+                running = false
+                return
+            }
+        }
+    }
+
+    override fun close(context: Context) {
+        running = false
+        thread.join()
+        writer.close()
+        println("Wrote ${counter.get()} events; Dropped ${dropped.get()} 
events")
+    }
+}
diff --git 
a/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/Collector.kt 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/Collector.kt
new file mode 100644
index 0000000..8005305
--- /dev/null
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/Collector.kt
@@ -0,0 +1,25 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.rustyrazorblade.easycassstress.Context
+import com.rustyrazorblade.easycassstress.Either
+import com.rustyrazorblade.easycassstress.StressContext
+import com.rustyrazorblade.easycassstress.workloads.Operation
+
+/**
+ * When an operation completes (success or failure) this interface is called 
showing the state at that moment.  This
+ * interface is part of the "hot" path and as such implementations should 
respect that and should push expensive work
+ * outside the thread calling this.
+ */
+interface Collector {
+    fun collect(
+        ctx: StressContext,
+        op: Operation,
+        result: Either<AsyncResultSet, Throwable>,
+        startNanos: Long,
+        endNanos: Long,
+    )
+
+    fun close(context: Context) {
+    }
+}
diff --git 
a/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/CompositeCollector.kt
 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/CompositeCollector.kt
new file mode 100644
index 0000000..5d0b212
--- /dev/null
+++ 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/CompositeCollector.kt
@@ -0,0 +1,32 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.rustyrazorblade.easycassstress.Context
+import com.rustyrazorblade.easycassstress.Either
+import com.rustyrazorblade.easycassstress.StressContext
+import com.rustyrazorblade.easycassstress.workloads.Operation
+
+/**
+ * Groups a list of collectors together and will call each in the same order.
+ */
+class CompositeCollector(
+    private vararg val collectors: Collector,
+) : Collector {
+    override fun collect(
+        ctx: StressContext,
+        op: Operation,
+        result: Either<AsyncResultSet, Throwable>,
+        startNanos: Long,
+        endNanos: Long,
+    ) {
+        for (c in collectors) {
+            c.collect(ctx, op, result, startNanos, endNanos)
+        }
+    }
+
+    override fun close(context: Context) {
+        for (c in collectors) {
+            c.close(context)
+        }
+    }
+}
diff --git 
a/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/HdrCollector.kt 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/HdrCollector.kt
new file mode 100644
index 0000000..11d422b
--- /dev/null
+++ 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/HdrCollector.kt
@@ -0,0 +1,63 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.rustyrazorblade.easycassstress.Context
+import com.rustyrazorblade.easycassstress.Either
+import com.rustyrazorblade.easycassstress.StressContext
+import com.rustyrazorblade.easycassstress.workloads.Operation
+import org.HdrHistogram.SynchronizedHistogram
+import java.io.File
+import java.io.PrintStream
+
+/**
+ * Stores all events into a HdrHistorgram and publishes to 3 files at the end 
of the run
+ */
+class HdrCollector(
+    val hdrHistogramPrefix: String,
+) : Collector {
+    // Using a synchronized histogram for now, we may need to change this 
later if it's a perf bottleneck
+    val mutationHistogram = SynchronizedHistogram(2)
+    val selectHistogram = SynchronizedHistogram(2)
+    val deleteHistogram = SynchronizedHistogram(2)
+
+    override fun collect(
+        ctx: StressContext,
+        op: Operation,
+        result: Either<AsyncResultSet, Throwable>,
+        startNanos: Long,
+        endNanos: Long,
+    ) {
+        if (result is Either.Right) return // only success is tracked
+
+        // we log to the HDR histogram and do the callback for mutations
+        // might extend this to select, but I can't see a reason for it now
+        when (op) {
+            is Operation.Mutation, is Operation.DDL -> {
+                mutationHistogram.recordValue(endNanos - op.createdAtNanos)
+            }
+            is Operation.Deletion -> {
+                deleteHistogram.recordValue(endNanos - op.createdAtNanos)
+            }
+            is Operation.SelectStatement -> {
+                selectHistogram.recordValue(endNanos - op.createdAtNanos)
+            }
+            else -> {
+                // ignore
+            }
+        }
+    }
+
+    override fun close(ctx: Context) {
+        // print out the hdr histograms if requested to 3 separate files
+        val pairs =
+            listOf(
+                Pair(mutationHistogram, "mutations"),
+                Pair(selectHistogram, "reads"),
+                Pair(deleteHistogram, "deletes"),
+            )
+        for (entry in pairs) {
+            val fp = File(hdrHistogramPrefix + "-" + entry.second + ".txt")
+            entry.first.outputPercentileDistribution(PrintStream(fp), 
1_000_000.0)
+        }
+    }
+}
diff --git 
a/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollector.kt
 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollector.kt
new file mode 100644
index 0000000..07732e8
--- /dev/null
+++ 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollector.kt
@@ -0,0 +1,98 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.google.common.base.Throwables
+import com.rustyrazorblade.easycassstress.Either
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.example.data.Group
+import org.apache.parquet.example.data.simple.SimpleGroupFactory
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.example.ExampleParquetWriter
+import org.apache.parquet.schema.MessageTypeParser
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+/**
+ * Publishes all the requests to a Apache Parquet file to allow analytical 
tools to process the raw data rather than
+ * having to rely solely on metrics (which are sampled).
+ */
+class ParquetCollector(
+    fileOrDirectory: File,
+) : AsyncCollector(fileOrDirectory) {
+    override fun createWriter(fileOrDirectory: File): Writer =
+        ParquetTableWriter(if (fileOrDirectory.isDirectory) 
File(fileOrDirectory, "rawlog.parquet") else fileOrDirectory)
+
+    class ParquetTableWriter(
+        file: File,
+    ) : Writer {
+        private val writer: ParquetWriter<Group>
+        private val groupFactory: SimpleGroupFactory
+
+        init {
+            file.delete()
+            val path = Path("file://${file.absolutePath}")
+            println("Parquet Log path $path")
+            val schema =
+                MessageTypeParser.parseMessageType(
+                    "message example {\n" +
+                        "  required binary operation (UTF8);\n" +
+                        "  required boolean success;\n" +
+                        "  required binary failure_reason (UTF8);\n" +
+                        "  required binary failure_stacktrace (UTF8);\n" +
+                        "  required int64 service_start_time_ms;\n" +
+                        "  required int64 service_duration_ns;\n" +
+                        "  required int64 request_start_time_ms;\n" +
+                        "  required int64 request_duration_ns;\n" +
+                        "}",
+                )
+            groupFactory = SimpleGroupFactory(schema)
+            writer = 
ExampleParquetWriter.builder(path).withType(schema).build()
+        }
+
+        override fun write(event: Event) {
+            val requestStartNanos = event.op.createdAtNanos
+            val requestStartMillis = event.op.createdAtMillis
+            val requestDurationNanos = event.endNanos - requestStartNanos
+
+            val serviceStartNanos = event.startNanos
+            val serviceStartMillis = requestStartMillis + 
TimeUnit.NANOSECONDS.toMillis(serviceStartNanos - requestStartNanos)
+            val serviceDurationNanos = event.endNanos - serviceStartNanos
+            val group =
+                groupFactory
+                    .newGroup()
+                    .append("operation", op(event))
+                    .append("success", event.result is Either.Left)
+                    .append("failure_reason", reasonName(event.result))
+                    .append("failure_stacktrace", 
reasonStackTrace(event.result))
+                    .append("service_start_time_ms", serviceStartMillis)
+                    .append("service_duration_ns", serviceDurationNanos)
+                    .append("request_start_time_ms", requestStartMillis)
+                    .append("request_duration_ns", requestDurationNanos)
+            writer.write(group)
+        }
+
+        override fun close() {
+            writer.close()
+        }
+    }
+
+    companion object {
+        private fun op(event: Event) =
+            event.op.javaClass.simpleName
+                .replace("Statement", "")
+
+        private fun reasonName(result: Either<AsyncResultSet, Throwable>) =
+            if (result is Either.Right) {
+                result.value.javaClass.simpleName
+            } else {
+                ""
+            }
+
+        private fun reasonStackTrace(result: Either<AsyncResultSet, 
Throwable>) =
+            if (result is Either.Right) {
+                Throwables.getStackTraceAsString(result.value)
+            } else {
+                ""
+            }
+    }
+}
diff --git a/src/main/kotlin/com/rustyrazorblade/easycassstress/commands/Run.kt 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/commands/Run.kt
index 35c6fd1..d49594b 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/commands/Run.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/commands/Run.kt
@@ -29,6 +29,7 @@ import 
com.datastax.oss.driver.api.core.config.DefaultDriverOption
 import com.datastax.oss.driver.api.core.config.DriverConfigLoader
 import com.google.common.base.Preconditions
 import com.google.common.util.concurrent.RateLimiter
+import com.rustyrazorblade.easycassstress.Context
 import com.rustyrazorblade.easycassstress.FileReporter
 import com.rustyrazorblade.easycassstress.Metrics
 import com.rustyrazorblade.easycassstress.Plugin
@@ -37,7 +38,10 @@ import com.rustyrazorblade.easycassstress.ProfileRunner
 import com.rustyrazorblade.easycassstress.RateLimiterOptimizer
 import com.rustyrazorblade.easycassstress.SchemaBuilder
 import com.rustyrazorblade.easycassstress.SingleLineConsoleReporter
-import com.rustyrazorblade.easycassstress.StressContext
+import com.rustyrazorblade.easycassstress.collector.Collector
+import com.rustyrazorblade.easycassstress.collector.CompositeCollector
+import com.rustyrazorblade.easycassstress.collector.HdrCollector
+import com.rustyrazorblade.easycassstress.collector.ParquetCollector
 import com.rustyrazorblade.easycassstress.converters.ConsistencyLevelConverter
 import com.rustyrazorblade.easycassstress.converters.HumanReadableConverter
 import com.rustyrazorblade.easycassstress.converters.HumanReadableTimeConverter
@@ -47,7 +51,6 @@ import me.tongfei.progressbar.ProgressBar
 import me.tongfei.progressbar.ProgressBarStyle
 import org.apache.logging.log4j.kotlin.logger
 import java.io.File
-import java.io.PrintStream
 import java.util.Timer
 import kotlin.concurrent.fixedRateTimer
 import kotlin.concurrent.schedule
@@ -56,9 +59,7 @@ import kotlin.concurrent.thread
 val DEFAULT_ITERATIONS: Long = 1000000
 
 class NoSplitter : IParameterSplitter {
-    override fun split(value: String?): MutableList<String> {
-        return mutableListOf(value!!)
-    }
+    override fun split(value: String?): MutableList<String> = 
mutableListOf(value!!)
 }
 
 /**
@@ -66,7 +67,9 @@ class NoSplitter : IParameterSplitter {
  * It's used solely for logging and reporting purposes.
  */
 @Parameters(commandDescription = "Run a cassandra-easy-stress profile")
-class Run(val command: String) : IStressCommand {
+class Run(
+    val command: String,
+) : IStressCommand {
     @Parameter(names = ["--host"])
     var host = System.getenv("CASSANDRA_EASY_STRESS_CASSANDRA_HOST") ?: 
"127.0.0.1"
 
@@ -224,6 +227,14 @@ class Run(val command: String) : IStressCommand {
     @Parameter(names = ["--csv"], description = "Write metrics to this file in 
CSV format.")
     var csvFile = ""
 
+    @Parameter(
+        names = ["--parquet"],
+        description =
+            "Capture client latency metrics to a Apache Parquet file at the 
specified path.  " +
+                "If the file is a directory, the file will be named 
rawlog.parquet within that directory",
+    )
+    var parquetFile = ""
+
     @Parameter(names = ["--paging"], description = "Override the driver's 
default page size.")
     var paging: Int? = null
 
@@ -278,7 +289,8 @@ class Run(val command: String) : IStressCommand {
     val session by lazy {
         // Build a programmatic config
         var configLoaderBuilder =
-            DriverConfigLoader.programmaticBuilder()
+            DriverConfigLoader
+                .programmaticBuilder()
                 // Default consistency levels
                 .withString(DefaultDriverOption.REQUEST_CONSISTENCY, 
consistencyLevel.toString())
                 .withString(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY, 
serialConsistencyLevel.toString())
@@ -309,14 +321,18 @@ class Run(val command: String) : IStressCommand {
 
         // Build the CqlSession
         val sessionBuilder =
-            CqlSession.builder()
+            CqlSession
+                .builder()
                 .addContactPoint(java.net.InetSocketAddress(host, cqlPort))
                 .withAuthCredentials(username, password)
                 .withConfigLoader(configLoaderBuilder.build())
 
         // Add SSL if needed
         if (ssl) {
-            
sessionBuilder.withSslContext(javax.net.ssl.SSLContext.getDefault())
+            sessionBuilder.withSslContext(
+                javax.net.ssl.SSLContext
+                    .getDefault(),
+            )
         }
 
         // Show settings about to be used
@@ -405,9 +421,12 @@ class Run(val command: String) : IStressCommand {
 
         var runnersExecuted = 0L
 
+        val collector = createCollector()
+        val context = Context(session, this, metrics, fieldRegistry, 
rateLimiter, collector)
+
         try {
             // run the prepare for each
-            val runners = createRunners(plugin, metrics, fieldRegistry, 
rateLimiter)
+            val runners = createRunners(plugin, context)
 
             populateData(plugin, runners, metrics)
             metrics.resetErrors()
@@ -443,20 +462,6 @@ class Run(val command: String) : IStressCommand {
             for (reporter in metrics.reporters) {
                 reporter.report()
             }
-
-            // print out the hdr histograms if requested to 3 separate files
-            if (hdrHistogramPrefix != "") {
-                val pairs =
-                    listOf(
-                        Pair(metrics.mutationHistogram, "mutations"),
-                        Pair(metrics.selectHistogram, "reads"),
-                        Pair(metrics.deleteHistogram, "deletes"),
-                    )
-                for (entry in pairs) {
-                    val fp = File(hdrHistogramPrefix + "-" + entry.second + 
".txt")
-                    entry.first.outputPercentileDistribution(PrintStream(fp), 
1_000_000.0)
-                }
-            }
         } catch (e: Exception) {
             println(
                 "There was an error with cassandra-easy-stress.  Please file a 
bug at " +
@@ -467,12 +472,28 @@ class Run(val command: String) : IStressCommand {
             // we need to be able to run multiple tests in the same JVM
             // without this cleanup we could have the metrics runner still 
running and it will cause subsequent tests to fail
             metrics.shutdown()
+            collector.close(context)
             Thread.sleep(1000)
 
             println("Stress complete, $runnersExecuted.")
         }
     }
 
+    private fun createCollector(): Collector {
+        val collectors = ArrayList<Collector>()
+
+        if (hdrHistogramPrefix != "") {
+            collectors.add(HdrCollector(hdrHistogramPrefix))
+        }
+        if (parquetFile != "") {
+            collectors.add(ParquetCollector(File(parquetFile)))
+        }
+        if (collectors.size == 1) {
+            return collectors[0]
+        }
+        return CompositeCollector(*collectors.toTypedArray())
+    }
+
     private fun getRateLimiter(): RateLimiter {
         val tmp = RateLimiter.create(rate.toDouble())
         tmp.acquire(rate.toInt())
@@ -548,23 +569,23 @@ class Run(val command: String) : IStressCommand {
 
     private fun createRunners(
         plugin: Plugin,
-        metrics: Metrics,
-        fieldRegistry: Registry,
-        rateLimiter: RateLimiter?,
+        sharedContext: Context,
     ): List<ProfileRunner> {
         val runners =
             IntRange(0, threads - 1).map {
                 // println("Connecting")
                 println("Connecting to Cassandra cluster ...")
-                val context = StressContext(session, this, it, metrics, 
fieldRegistry, rateLimiter)
+                val context = sharedContext.stress(it)
                 ProfileRunner.create(context, plugin.instance)
             }
 
         val executed =
-            runners.parallelStream().map {
-                println("Preparing statements.")
-                it.prepare()
-            }.count()
+            runners
+                .parallelStream()
+                .map {
+                    println("Preparing statements.")
+                    it.prepare()
+                }.count()
 
         println("$executed threads prepared.")
         return runners
@@ -646,7 +667,8 @@ class Run(val command: String) : IStressCommand {
 
         for (statement in plugin.instance.schema()) {
             val s =
-                SchemaBuilder.create(statement)
+                SchemaBuilder
+                    .create(statement)
                     .withCompaction(compaction)
                     .withCompression(compression)
                     .withRowCache(rowCache)
diff --git 
a/src/main/kotlin/com/rustyrazorblade/easycassstress/workloads/IStressProfile.kt
 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/workloads/IStressProfile.kt
index e1328e3..039bc25 100644
--- 
a/src/main/kotlin/com/rustyrazorblade/easycassstress/workloads/IStressProfile.kt
+++ 
b/src/main/kotlin/com/rustyrazorblade/easycassstress/workloads/IStressProfile.kt
@@ -17,7 +17,6 @@
  */
 package com.rustyrazorblade.easycassstress.workloads
 
-import com.codahale.metrics.Timer.Context
 import com.datastax.oss.driver.api.core.CqlSession
 import com.datastax.oss.driver.api.core.cql.AsyncResultSet
 import com.datastax.oss.driver.api.core.cql.BoundStatement
@@ -42,9 +41,7 @@ interface IStressRunner {
      * However, certain workloads may need custom setup.
      * @see Locking
      **/
-    fun getNextPopulate(partitionKey: PartitionKey): Operation {
-        return getNextMutation(partitionKey)
-    }
+    fun getNextPopulate(partitionKey: PartitionKey): Operation = 
getNextMutation(partitionKey)
 
     /**
      * Callback after a query executes successfully.
@@ -113,9 +110,7 @@ interface IStressProfile {
      */
     fun getFieldGenerators(): Map<Field, FieldGenerator> = mapOf()
 
-    fun getDefaultReadRate(): Double {
-        return .01
-    }
+    fun getDefaultReadRate(): Double = .01
 
     fun getPopulateOption(args: Run): PopulateOption = 
PopulateOption.Standard()
 
@@ -126,18 +121,25 @@ sealed class Operation(
     val bound: BoundStatement? = null,
     val statement: String? = null,
 ) {
-    // we're going to track metrics on the mutations differently
-    // inserts will also carry data that might be saved for later validation
-    // clustering keys won't be realistic to compute in the framework
-    lateinit var startTime: Context
+    val createdAtNanos = System.nanoTime()
+    val createdAtMillis = System.currentTimeMillis()
 
-    class Mutation(bound: BoundStatement, val callbackPayload: Any? = null) : 
Operation(bound)
+    class Mutation(
+        bound: BoundStatement,
+        val callbackPayload: Any? = null,
+    ) : Operation(bound)
 
-    class SelectStatement(bound: BoundStatement) : Operation(bound)
+    class SelectStatement(
+        bound: BoundStatement,
+    ) : Operation(bound)
 
-    class Deletion(bound: BoundStatement) : Operation(bound)
+    class Deletion(
+        bound: BoundStatement,
+    ) : Operation(bound)
 
     class Stop : Operation(null)
 
-    class DDL(statement: String) : Operation(null, statement = statement)
+    class DDL(
+        statement: String,
+    ) : Operation(null, statement = statement)
 }
diff --git 
a/src/test/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollectorTest.kt
 
b/src/test/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollectorTest.kt
new file mode 100644
index 0000000..0801d81
--- /dev/null
+++ 
b/src/test/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollectorTest.kt
@@ -0,0 +1,65 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.CqlSession
+import com.google.common.io.Files
+import com.rustyrazorblade.easycassstress.Context
+import com.rustyrazorblade.easycassstress.Metrics
+import com.rustyrazorblade.easycassstress.commands.Run
+import com.rustyrazorblade.easycassstress.generators.Registry
+import io.mockk.mockk
+import org.assertj.core.api.Assertions
+import org.assertj.core.api.Condition
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.io.TempDir
+import java.io.File
+import java.nio.file.Path
+
+internal class ParquetCollectorTest {
+    @TempDir
+    var tempDir: Path? = null
+
+    @Test
+    fun nonExistingDirectory() {
+        val fileOrDirectory = 
tempDir!!.resolve("doesnotexist").resolve("noreallydoesnotexist").toFile()
+        val c = ParquetCollector(fileOrDirectory)
+        c.close(ctx)
+
+        Assertions.assertThat(fileOrDirectory).exists().isFile
+    }
+
+    @Test
+    fun existingDirectory() {
+        val expected = tempDir!!.resolve("rawlog.parquet").toFile()
+        val c = ParquetCollector(tempDir!!.toFile())
+        c.close(ctx)
+
+        Assertions.assertThat(expected).exists().isFile
+    }
+
+    @Test
+    fun existingFile() {
+        val expected = tempDir!!.resolve("rawlog.parquet").toFile()
+        val unexpectedBytes = "some text".toByteArray()
+        Files.write(unexpectedBytes, expected)
+        val c = ParquetCollector(expected)
+        c.close(ctx)
+
+        Assertions.assertThat(expected).exists().isFile.doesNotHave(
+            object : Condition<File>() {
+                override fun matches(value: File?): Boolean = 
Files.toByteArray(value).equals(unexpectedBytes)
+            },
+        )
+    }
+
+    companion object {
+        val ctx =
+            Context(
+                mockk<CqlSession>(),
+                mockk<Run>(),
+                mockk<Metrics>(),
+                mockk<Registry>(),
+                null,
+                mockk<Collector>(),
+            )
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to