This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 490286c  Remove notion of Modification
490286c is described below

commit 490286ce44f2494cf8c38751c16768c952440d8b
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Wed Nov 29 17:54:10 2023 +0100

    Remove notion of Modification
    
    Patch for CASSANDRA-19117 by Alex Petrov; reviewed by Abe Ratnofsky
---
 README.md                                          |  16 ++-
 conf/default.yaml                                  |   5 +-
 conf/example.yaml                                  |   5 +-
 conf/external.yaml                                 |   5 +-
 .../src/harry/concurrent/Uninterruptibles.java     |  31 ++++++
 harry-core/src/harry/core/Configuration.java       |  39 +++-----
 harry-core/src/harry/dsl/HistoryBuilder.java       |  60 ++++--------
 .../src/harry/model/DescriptorSelectorBuilder.java | 104 --------------------
 harry-core/src/harry/model/OpSelectors.java        |  57 +++--------
 .../sut/injvm/InJVMTokenAwareVisitExecutor.java    |  65 ++++++------
 harry-core/src/harry/reconciler/Reconciler.java    |  11 +--
 .../src/harry/visitors/GeneratingVisitor.java      |  20 +---
 harry-core/src/harry/visitors/LoggingVisitor.java  |   8 +-
 harry-core/src/harry/visitors/LtsVisitor.java      |  16 +--
 harry-core/src/harry/visitors/MutatingVisitor.java | 109 ++++++---------------
 .../src/harry/visitors/ReplayingVisitor.java       |  36 +------
 harry-core/src/harry/visitors/VisitExecutor.java   |   6 +-
 harry-core/test/harry/model/OpSelectorsTest.java   |  31 +++---
 harry-core/test/harry/operations/RelationTest.java |  24 +----
 .../generators/DataGeneratorsIntegrationTest.java  |   1 -
 .../harry/model/HistoryBuilderIntegrationTest.java |   3 +-
 .../test/harry/model/HistoryBuilderTest.java       |   7 +-
 .../test/harry/model/IntegrationTestBase.java      |   5 +-
 .../harry/model/QuerySelectorNegativeTest.java     |   3 +-
 .../model/QuiescentCheckerIntegrationTest.java     |   3 +-
 .../test/harry/op/RowVisitorTest.java              |   3 +-
 .../test/resources/single_partition_test.yml       |   5 +-
 27 files changed, 195 insertions(+), 483 deletions(-)

diff --git a/README.md b/README.md
index 2daaa37..1cf3a6f 100644
--- a/README.md
+++ b/README.md
@@ -396,7 +396,7 @@ combo: visitor knows about internals of the cluster it is 
dealing with.
 
 Generally, visitor has to follow the rules specified by DescriptorSelector and 
PdSelector: (it can only visit issue
 mutations against the partition that PdSelector has picked for this LTS), and 
DescriptorSelector (it can visit exactly
-DescriptorSelector#numberOfModifications rows within this partition, 
operations have to have a type specified by
+DescriptorSelector#operationsPerLts rows within this partition, operations 
have to have a type specified by
 #operationKind, clustering and value descriptors have to be in accordance with 
DescriptorSelector#cd and
 DescriptorSelector#vds). The reason for these limitations is because model has 
to be able to reproduce the exact
 sequence of events that was applied to system under test.
@@ -436,7 +436,6 @@ by `-1` can be done in 64 steps.
 Let's introduce some definitions:
 
 * `lts` is a *logical timestamp*, an entity (number in our case), given by the 
clock, on which some action occurs
-* `m` is a *modification id*, a sequential number of the modification that 
occurs on `lts`
 * `rts` is an approximate real-time as of clock for this run
 * `pid` is a partition position, a number between `0` and N, for `N` unique 
generated partitions
 * `pd` is a partition descriptor, a unique descriptor identifying the partition
@@ -447,12 +446,11 @@ entities. Hierarchically, the generation process looks as 
follows:
 
 * `lts` is an entry point, from which the decision process starts
 * `pd` is picked from `lts`, and determines which partition is going to be 
visited
-* for `(pd, lts)` combination, `#mods` (the number of modification batches) 
and `#rows` (the number of rows per
-  modification batch) is determined. `m` is an index of the modification 
batch, and `i` is an index of the operation in
-  the modification batch.
-* `cd` is picked based on `(pd, lts)`, and `n`, a sequential number of the 
operation among all modification batches
+* for `(pd, lts)` combination, `#rows` (the number of rows per modification 
batch) is determined. There can be at most
+one modification batch per LTS.
+* `cd` is picked based on `(pd, lts)`, and `n`, a sequential number of the 
operation in the batch
 * operation type (whether we're going to perform a write, delete, range 
delete, etc), columns involved in this
-  operation, and values for the modification are picked depending on the `pd`, 
`lts`, `m`, and `i`
+  operation, and values for the modification are picked depending on the `pd`, 
`lts`, and `i`
 
 Most of this formalization is implemented in `OpSelectors`, and is relied upon 
in`PartitionVisitor` and any
 implementation of a `Model`.
@@ -561,8 +559,8 @@ void validatePartitionState(long validationLts, 
List<ResultSetRow> rows) {
              if (row.lts[col] != rowLts)
                continue;
 
-             long m = descriptorSelector.modificationId(pd, row.cd, rowLts, 
row.vds[col], col);
-             long vd = descriptorSelector.vd(pd, row.cd, rowLts, m, col);
+             long rid = descriptorSelector.rowId(pd, row.cd, rowLts, 
row.vds[col], col);
+             long vd = descriptorSelector.vd(pd, row.cd, rowLts, rid, col);
 
             // If the value model predicts doesn't match the one received from 
the database, throw an exception
              if (vd != row.vds[col])
diff --git a/conf/default.yaml b/conf/default.yaml
index 4a20782..769cb5d 100644
--- a/conf/default.yaml
+++ b/conf/default.yaml
@@ -58,10 +58,7 @@ partition_descriptor_selector:
 # each kind of operation is going to occur.
 clustering_descriptor_selector:
   default:
-    modifications_per_lts:
-      type: "constant"
-      constant: 4
-    rows_per_modification:
+    operations_per_lts:
       type: "constant"
       constant: 2
     operation_kind_weights:
diff --git a/conf/example.yaml b/conf/example.yaml
index 727109e..18dbfae 100644
--- a/conf/example.yaml
+++ b/conf/example.yaml
@@ -76,10 +76,7 @@ partition_descriptor_selector:
 # each kind of operation is going to occur.
 clustering_descriptor_selector:
   default:
-    modifications_per_lts:
-      type: "constant"
-      constant: 2
-    rows_per_modification:
+    operations_per_lts;:
       type: "constant"
       constant: 2
     operation_kind_weights:
diff --git a/conf/external.yaml b/conf/external.yaml
index 8c48d61..fad7438 100644
--- a/conf/external.yaml
+++ b/conf/external.yaml
@@ -79,10 +79,7 @@ partition_descriptor_selector:
 # each kind of operation is going to occur.
 clustering_descriptor_selector:
   default:
-    modifications_per_lts:
-      type: "constant"
-      constant: 2
-    rows_per_modification:
+    operations_per_lts:
       type: "constant"
       constant: 2
     operation_kind_weights:
diff --git a/harry-core/src/harry/concurrent/Uninterruptibles.java 
b/harry-core/src/harry/concurrent/Uninterruptibles.java
new file mode 100644
index 0000000..a310382
--- /dev/null
+++ b/harry-core/src/harry/concurrent/Uninterruptibles.java
@@ -0,0 +1,31 @@
+package harry.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+public class Uninterruptibles
+{
+    public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
+        boolean interrupted = false;
+
+        try {
+            long remainingNanos = unit.toNanos(sleepFor);
+            long end = System.nanoTime() + remainingNanos;
+
+            while(true) {
+                try {
+                    TimeUnit.NANOSECONDS.sleep(remainingNanos);
+                    return;
+                } catch (InterruptedException t) {
+                    interrupted = true;
+                    remainingNanos = end - System.nanoTime();
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+
+        }
+    }
+
+}
diff --git a/harry-core/src/harry/core/Configuration.java 
b/harry-core/src/harry/core/Configuration.java
index fd22982..9a0c7b7 100644
--- a/harry-core/src/harry/core/Configuration.java
+++ b/harry-core/src/harry/core/Configuration.java
@@ -810,8 +810,7 @@ public class Configuration
     // TODO: configure fractions/fractional builder
     public static class CDSelectorConfigurationBuilder
     {
-        private DistributionConfig modifications_per_lts = new 
ConstantDistributionConfig(10);
-        private DistributionConfig rows_per_modification = new 
ConstantDistributionConfig(10);
+        private DistributionConfig operations_per_lts = new 
ConstantDistributionConfig(10);
         private int max_partition_size = 100;
         private Map<OpSelectors.OperationKind, Integer> operation_kind_weights 
= new OperationKindSelectorBuilder()
                                                                                
  .addWeight(OpSelectors.OperationKind.DELETE_ROW, 1)
@@ -821,15 +820,9 @@ public class Configuration
         private Map<OpSelectors.OperationKind, long[]> column_mask_bitsets;
         private int[] fractions;
 
-        public CDSelectorConfigurationBuilder 
setNumberOfModificationsDistribution(DistributionConfig modifications_per_lts)
+        public CDSelectorConfigurationBuilder 
setOperationsPerLtsDistribution(DistributionConfig operations_per_lts)
         {
-            this.modifications_per_lts = modifications_per_lts;
-            return this;
-        }
-
-        public CDSelectorConfigurationBuilder 
setRowsPerModificationDistribution(DistributionConfig rows_per_modification)
-        {
-            this.rows_per_modification = rows_per_modification;
+            this.operations_per_lts = operations_per_lts;
             return this;
         }
 
@@ -863,16 +856,14 @@ public class Configuration
         {
             if (fractions == null)
             {
-                return new 
DefaultCDSelectorConfiguration(modifications_per_lts,
-                                                          
rows_per_modification,
+                return new DefaultCDSelectorConfiguration(operations_per_lts,
                                                           max_partition_size,
                                                           
operation_kind_weights,
                                                           column_mask_bitsets);
             }
             else
             {
-                return new 
HierarchicalCDSelectorConfiguration(modifications_per_lts,
-                                                               
rows_per_modification,
+                return new 
HierarchicalCDSelectorConfiguration(operations_per_lts,
                                                                
max_partition_size,
                                                                
operation_kind_weights,
                                                                
column_mask_bitsets,
@@ -884,21 +875,18 @@ public class Configuration
     @JsonTypeName("default")
     public static class DefaultCDSelectorConfiguration implements 
CDSelectorConfiguration
     {
-        public final DistributionConfig modifications_per_lts;
-        public final DistributionConfig rows_per_modification;
+        public final DistributionConfig operations_per_lts;
         public final int max_partition_size;
         public final Map<OpSelectors.OperationKind, Integer> 
operation_kind_weights;
         public final Map<OpSelectors.OperationKind, long[]> 
column_mask_bitsets;
 
         @JsonCreator
-        public 
DefaultCDSelectorConfiguration(@JsonProperty("modifications_per_lts") 
DistributionConfig modifications_per_lts,
-                                              
@JsonProperty("rows_per_modification") DistributionConfig rows_per_modification,
+        public 
DefaultCDSelectorConfiguration(@JsonProperty("operations_per_lts") 
DistributionConfig operations_per_lts,
                                               @JsonProperty(value = 
"window_size", defaultValue = "100") int max_partition_size,
                                               
@JsonProperty("operation_kind_weights") Map<OpSelectors.OperationKind, Integer> 
operation_kind_weights,
                                               
@JsonProperty("column_mask_bitsets") Map<OpSelectors.OperationKind, long[]> 
column_mask_bitsets)
         {
-            this.modifications_per_lts = modifications_per_lts;
-            this.rows_per_modification = rows_per_modification;
+            this.operations_per_lts = operations_per_lts;
             this.max_partition_size = max_partition_size;
             this.operation_kind_weights = operation_kind_weights;
             this.column_mask_bitsets = column_mask_bitsets;
@@ -935,8 +923,7 @@ public class Configuration
             return new OpSelectors.DefaultDescriptorSelector(rng,
                                                              
columnSelector(schemaSpec),
                                                              
OpSelectors.OperationSelector.weighted(operation_kind_weights),
-                                                             
modifications_per_lts.make(),
-                                                             
rows_per_modification.make(),
+                                                             
operations_per_lts.make(),
                                                              
max_partition_size);
         }
     }
@@ -945,14 +932,13 @@ public class Configuration
     {
         private final int[] fractions;
 
-        public HierarchicalCDSelectorConfiguration(DistributionConfig 
modifications_per_lts,
-                                                   DistributionConfig 
rows_per_modification,
+        public HierarchicalCDSelectorConfiguration(DistributionConfig 
operations_per_lts,
                                                    int max_partition_size,
                                                    
Map<OpSelectors.OperationKind, Integer> operation_kind_weights,
                                                    
Map<OpSelectors.OperationKind, long[]> column_mask_bitsets,
                                                    int[] fractions)
         {
-            super(modifications_per_lts, rows_per_modification, 
max_partition_size, operation_kind_weights, column_mask_bitsets);
+            super(operations_per_lts, max_partition_size, 
operation_kind_weights, column_mask_bitsets);
             this.fractions = fractions;
         }
 
@@ -962,8 +948,7 @@ public class Configuration
                                                                   fractions,
                                                                   
columnSelector(schemaSpec),
                                                                   
OpSelectors.OperationSelector.weighted(operation_kind_weights),
-                                                                  
modifications_per_lts.make(),
-                                                                  
rows_per_modification.make(),
+                                                                  
operations_per_lts.make(),
                                                                   
max_partition_size);
         }
     }
diff --git a/harry-core/src/harry/dsl/HistoryBuilder.java 
b/harry-core/src/harry/dsl/HistoryBuilder.java
index 06e09de..f7f48c5 100644
--- a/harry-core/src/harry/dsl/HistoryBuilder.java
+++ b/harry-core/src/harry/dsl/HistoryBuilder.java
@@ -18,20 +18,10 @@
 
 package harry.dsl;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 
-import harry.core.Configuration;
 import harry.core.Run;
 import harry.model.OpSelectors;
 import harry.visitors.MutatingRowVisitor;
@@ -124,7 +114,7 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>
 
     private static abstract class Step
     {
-        public abstract ReplayingVisitor.Batch toBatch(long pd, long lts, long 
m, LongSupplier opIdSupplier);
+        public abstract List<ReplayingVisitor.Operation> build(long pd, long 
lts, LongSupplier opIdSupplier);
     }
 
     private class BatchStep extends Step
@@ -136,7 +126,7 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>
             this.steps = steps;
         }
 
-        public ReplayingVisitor.Batch toBatch(long pd, long lts, long m, 
LongSupplier opIdSupplier)
+        public List<ReplayingVisitor.Operation> build(long pd, long lts, 
LongSupplier opIdSupplier)
         {
             ReplayingVisitor.Operation[] ops = new 
ReplayingVisitor.Operation[steps.size()];
             for (int i = 0; i < ops.length; i++)
@@ -147,7 +137,7 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>
                 ops[i] = op(cd, opId, opStep.opType);
             }
 
-            return HistoryBuilder.batch(m, ops);
+            return Arrays.asList(ops);
         }
     }
 
@@ -160,12 +150,11 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>
             this.opType = opType;
         }
 
-        public ReplayingVisitor.Batch toBatch(long pd, long lts, long m, 
LongSupplier opIdSupplier)
+        public List<ReplayingVisitor.Operation> build(long pd, long lts, 
LongSupplier opIdSupplier)
         {
             long opId = opIdSupplier.getAsLong();
             long cd = HistoryBuilder.this.cd(pd, lts, opId);
-            return HistoryBuilder.batch(m,
-                                        HistoryBuilder.op(cd, 
opIdSupplier.getAsLong(), opType));
+            return Arrays.asList(HistoryBuilder.op(cd, 
opIdSupplier.getAsLong(), opType));
         }
     }
 
@@ -215,7 +204,7 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>
         }
 
         /**
-         * Execute operations listed by users of this PartitionBuilder with 
same logical timestamp.
+         * Execute operations listed by users of this PartitionBuilder with 
same logical timestamp. Namely, as a bach.
          */
         public PartitionBuilder simultaneously()
         {
@@ -361,32 +350,22 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>
 
         void addSteps(List<Step> steps)
         {
-            List<ReplayingVisitor.Batch> batches = new ArrayList<>();
-            Counter m = new Counter();
+            List<ReplayingVisitor.Operation> operations = new ArrayList<>();
             Counter opId = new Counter();
             for (Step step : steps)
             {
-                batches.add(step.toBatch(pd, lts, m.get(), 
opId::getAndIncrement));
-
-                if (sequentially)
-                {
-                    assert lts == log.size();
-                    addToLog(pd, batches);
-                    m.reset();
-                }
-                else
-                {
-                    m.increment();
-                }
+                operations.addAll(step.build(pd, lts, opId::getAndIncrement));
 
+                assert lts == log.size();
+                addToLog(pd, operations);
                 opId.reset();
             }
 
             // If we were generating steps for the partition with same LTS, 
add remaining steps
-            if (!batches.isEmpty())
+            if (!operations.isEmpty())
             {
                 assert !sequentially;
-                addToLog(pd, batches);
+                addToLog(pd, operations);
             }
         }
 
@@ -397,7 +376,7 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>
         }
     }
 
-    private void addToLog(long pd, List<ReplayingVisitor.Batch> batches)
+    private void addToLog(long pd, List<ReplayingVisitor.Operation> operations)
     {
         pdToLtsMap.compute(pd, (ignore, ltss) -> {
             if (null == ltss)
@@ -406,8 +385,8 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>
             return ltss;
         });
 
-        log.add(visit(lts++, pd, batches.toArray(new 
ReplayingVisitor.Batch[0])));
-        batches.clear();
+        log.add(visit(lts++, pd, operations.toArray(new 
ReplayingVisitor.Operation[0])));
+        operations.clear();
     }
 
     private static class Counter
@@ -591,16 +570,11 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>
         PartitionBuilder partitionBuilder();
     }
 
-    public static ReplayingVisitor.Visit visit(long lts, long pd, 
ReplayingVisitor.Batch... ops)
+    public static ReplayingVisitor.Visit visit(long lts, long pd, 
ReplayingVisitor.Operation... ops)
     {
         return new ReplayingVisitor.Visit(lts, pd, ops);
     }
 
-    public static ReplayingVisitor.Batch batch(long m, 
ReplayingVisitor.Operation... ops)
-    {
-        return new ReplayingVisitor.Batch(m, ops);
-    }
-
     public static ReplayingVisitor.Operation op(long cd, long opId, 
OpSelectors.OperationKind opType)
     {
         return new ReplayingVisitor.Operation(cd, opId, opType);
diff --git a/harry-core/src/harry/model/DescriptorSelectorBuilder.java 
b/harry-core/src/harry/model/DescriptorSelectorBuilder.java
deleted file mode 100644
index ea2ffd8..0000000
--- a/harry-core/src/harry/model/DescriptorSelectorBuilder.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package harry.model;
-
-import java.util.Objects;
-import java.util.function.Function;
-
-import harry.core.Configuration;
-import harry.ddl.SchemaSpec;
-import harry.generators.Surjections;
-import harry.generators.distribution.Distribution;
-import harry.util.BitSet;
-
-import static 
harry.model.OpSelectors.DefaultDescriptorSelector.DEFAULT_OP_SELECTOR;
-
-public class DescriptorSelectorBuilder implements 
Configuration.CDSelectorConfiguration
-{
-    private Function<SchemaSpec, OpSelectors.ColumnSelector> 
columnSelectorFactory;
-    private OpSelectors.OperationSelector operationSelector = 
DEFAULT_OP_SELECTOR;
-    private Distribution numberOfRowsDistribution = new 
Distribution.ScaledDistribution(2, 30);
-    private Distribution numberOfModificationsDistribution = new 
Distribution.ScaledDistribution(1, 3);
-    private int maxPartitionSize = Integer.MAX_VALUE;
-    private Function<SchemaSpec, int[]> fractionsSupplier = null;
-
-    public DescriptorSelectorBuilder setFractions(int[] fractions)
-    {
-        this.fractionsSupplier = (schema) -> fractions;
-        return this;
-    }
-
-    public DescriptorSelectorBuilder setFractions(Function<SchemaSpec, int[]> 
fractions)
-    {
-        this.fractionsSupplier = fractions;
-        return this;
-    }
-
-    public DescriptorSelectorBuilder 
setColumnSelector(Surjections.Surjection<BitSet> selector)
-    {
-        this.columnSelectorFactory = (schemaSpec) -> new 
OpSelectors.ColumnSelectorBuilder().forAll(schemaSpec, selector).build();
-        return this;
-    }
-
-    public DescriptorSelectorBuilder 
setColumnSelectorFactory(Function<SchemaSpec, OpSelectors.ColumnSelector> 
columnMaskSelector)
-    {
-        this.columnSelectorFactory = 
Objects.requireNonNull(columnMaskSelector, "mask");
-        return this;
-    }
-
-    public DescriptorSelectorBuilder 
setOperationSelector(OpSelectors.OperationSelector operationSelector)
-    {
-        this.operationSelector = Objects.requireNonNull(operationSelector, 
"type");
-        return this;
-    }
-
-    /**
-     * In a given modification, we are only able to visit as many rows as 
there are rows in the partition, so
-     * we'll always be limited by this.
-     **/
-    public DescriptorSelectorBuilder 
setRowsPerModificationDistribution(Distribution numberOfRowsDistribution)
-    {
-        this.numberOfRowsDistribution = 
Objects.requireNonNull(numberOfRowsDistribution, "distribution");
-        return this;
-    }
-
-    public DescriptorSelectorBuilder 
setNumberOfModificationsDistribution(Distribution 
numberOfModificationsDistribution)
-    {
-        this.numberOfModificationsDistribution = 
Objects.requireNonNull(numberOfModificationsDistribution, "distribution");
-        return this;
-    }
-
-    public DescriptorSelectorBuilder setMaxPartitionSize(int maxPartitionSize)
-    {
-        if (maxPartitionSize <= 0)
-            throw new IllegalArgumentException("Max partition size should be 
positive");
-        this.maxPartitionSize = maxPartitionSize;
-        return this;
-    }
-
-    public OpSelectors.DescriptorSelector make(OpSelectors.Rng rng, SchemaSpec 
schemaSpec)
-    {
-        return new OpSelectors.DefaultDescriptorSelector(rng,
-                                                         
columnSelectorFactory.apply(schemaSpec),
-                                                         operationSelector,
-                                                         
numberOfModificationsDistribution,
-                                                         
numberOfRowsDistribution,
-                                                         maxPartitionSize);
-    }
-}
diff --git a/harry-core/src/harry/model/OpSelectors.java 
b/harry-core/src/harry/model/OpSelectors.java
index c36e248..d9714b1 100644
--- a/harry-core/src/harry/model/OpSelectors.java
+++ b/harry-core/src/harry/model/OpSelectors.java
@@ -22,8 +22,6 @@ import java.util.EnumMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
 
 import harry.core.Configuration;
 import harry.core.VisibleForTesting;
@@ -144,9 +142,7 @@ public interface OpSelectors
      */
     public abstract class DescriptorSelector
     {
-        public abstract int numberOfModifications(long lts);
-
-        public abstract int opsPerModification(long lts);
+        public abstract int operationsPerLts(long lts);
 
         public abstract int maxPartitionSize();
 
@@ -192,8 +188,8 @@ public interface OpSelectors
 
         public long[] descriptors(long pd, long cd, long lts, long opId, 
List<ColumnSpec<?>> columns, BitSet mask, BitSet setColumns, int offset)
         {
-            assert opId < opsPerModification(lts) * numberOfModifications(lts) 
: String.format("Operation id %d exceeds the maximum expected number of 
operations %d (%d * %d)",
-                                                                               
                opId, opsPerModification(lts) * numberOfModifications(lts), 
opsPerModification(lts), numberOfModifications(lts));
+            assert opId < operationsPerLts(lts) : String.format("Operation id 
%d exceeds the maximum expected number of operations per lts %d",
+                                                                opId, 
operationsPerLts(lts));
             long[] descriptors = new long[columns.size()];
 
             for (int i = 0; i < descriptors.length; i++)
@@ -220,10 +216,8 @@ public interface OpSelectors
 
         public abstract BitSet columnMask(long pd, long lts, long opId, 
OperationKind opType);
 
-        // TODO: why is this one unused?
-        public abstract long rowId(long pd, long lts, long cd);
+        public abstract long opId(long pd, long lts, long cd);
 
-        public abstract long modificationId(long pd, long cd, long lts, long 
vd, int col);
     }
 
     public static class PCGFast implements OpSelectors.Rng
@@ -587,15 +581,13 @@ public interface OpSelectors
                                               int[] fractions,
                                               ColumnSelector columnSelector,
                                               OperationSelector 
operationSelector,
-                                              Distribution 
modificationsPerLtsDistribution,
-                                              Distribution 
rowsPerModificationsDistribution,
+                                              Distribution 
operationsPerLtsDistribution,
                                               int maxPartitionSize)
         {
             super(rng,
                   columnSelector,
                   operationSelector,
-                  modificationsPerLtsDistribution,
-                  rowsPerModificationsDistribution,
+                  operationsPerLtsDistribution,
                   maxPartitionSize);
             this.fractions = fractions;
         }
@@ -644,8 +636,7 @@ public interface OpSelectors
     public static class DefaultDescriptorSelector extends DescriptorSelector
     {
         protected final static long ROW_ID_STREAM = 0x726F4772069640AL;
-        protected final static long NUMBER_OF_MODIFICATIONS_STREAM = 
0xf490c5272baL;
-        protected final static long ROWS_PER_OPERATION_STREAM = 0x5e03812e293L;
+        protected final static long OPERATIONS_PER_LTS_STREAM = 0x5e03812e293L;
         protected final static long BITSET_IDX_STREAM = 0x92eb607bef1L;
 
         public static OperationSelector DEFAULT_OP_SELECTOR = 
OperationSelector.weighted(Surjections.weights(45, 45, 3, 2, 2, 1, 1, 1),
@@ -661,35 +652,26 @@ public interface OpSelectors
         protected final OpSelectors.Rng rng;
         protected final OperationSelector operationSelector;
         protected final ColumnSelector columnSelector;
-        protected final Distribution modificationsPerLtsDistribution;
-        protected final Distribution rowsPerModificationsDistribution;
+        protected final Distribution operationsPerLtsDistribution;
         protected final int maxPartitionSize;
 
         public DefaultDescriptorSelector(OpSelectors.Rng rng,
                                          ColumnSelector columnMaskSelector,
                                          OperationSelector operationSelector,
-                                         Distribution 
modificationsPerLtsDistribution,
-                                         Distribution 
rowsPerModificationsDistribution,
+                                         Distribution 
operationsPerLtsDistribution,
                                          int maxPartitionSize)
         {
             this.rng = rng;
 
             this.operationSelector = operationSelector;
             this.columnSelector = columnMaskSelector;
-
-            this.modificationsPerLtsDistribution = 
modificationsPerLtsDistribution;
-            this.rowsPerModificationsDistribution = 
rowsPerModificationsDistribution;
+            this.operationsPerLtsDistribution = operationsPerLtsDistribution;
             this.maxPartitionSize = maxPartitionSize;
         }
 
-        public int numberOfModifications(long lts)
-        {
-            return (int) 
modificationsPerLtsDistribution.skew(rng.randomNumber(lts, 
NUMBER_OF_MODIFICATIONS_STREAM));
-        }
-
-        public int opsPerModification(long lts)
+        public int operationsPerLts(long lts)
         {
-            return (int) 
rowsPerModificationsDistribution.skew(rng.randomNumber(lts, 
ROWS_PER_OPERATION_STREAM));
+            return (int) 
operationsPerLtsDistribution.skew(rng.randomNumber(lts, 
OPERATIONS_PER_LTS_STREAM));
         }
 
         // TODO: this is not the best way to calculate a clustering offset; 
potentially we'd like to use
@@ -707,7 +689,7 @@ public interface OpSelectors
         // TODO: this won't work for entropy-adjusted CDs, at least the way 
they're implemented now
         public boolean isCdVisitedBy(long pd, long lts, long cd)
         {
-            return rowId(pd, lts, cd) < (numberOfModifications(lts) * 
opsPerModification(lts));
+            return opId(pd, lts, cd) < operationsPerLts(lts);
         }
 
         public long randomCd(long pd, long entropy)
@@ -730,7 +712,7 @@ public interface OpSelectors
             return rng.prev(positionInPartition, pd);
         }
 
-        public long rowId(long pd, long lts, long cd)
+        public long opId(long pd, long lts, long cd)
         {
             int partitionSize = maxPartitionSize();
             int clusteringOffset = clusteringOffset(lts);
@@ -757,13 +739,11 @@ public interface OpSelectors
         // TODO: create this bitset once per lts
         public BitSet partitionLevelOperationsMask(long pd, long lts)
         {
-            int totalOps = opsPerModification(lts) * 
numberOfModifications(lts);
+            int totalOps = operationsPerLts(lts);
             if (totalOps > 64)
             {
                 throw new IllegalArgumentException("RngUtils#randomBits 
currently supports only up to 64 bits of entropy, so we can not " +
-                                                   "split partition and row 
level operations for more than 64 operations at the moment." +
-                                                   "Set modifications_per_lts 
to a number that is lower than 64 and use rows_per_modification" +
-                                                   "to have more operations 
per LTS instead");
+                                                   "split partition and row 
level operations for more than 64 operations at the moment.");
             }
 
             long seed = rng.randomNumber(pd, lts);
@@ -799,11 +779,6 @@ public interface OpSelectors
         {
             return rng.randomNumber(opId + 1, pd ^ cd ^ lts ^ col);
         }
-
-        public long modificationId(long pd, long cd, long lts, long vd, int 
col)
-        {
-            return rng.sequenceNumber(vd, pd ^ cd ^ lts ^ col) - 1;
-        }
     }
 
     public enum OperationKind
diff --git 
a/harry-core/src/harry/model/sut/injvm/InJVMTokenAwareVisitExecutor.java 
b/harry-core/src/harry/model/sut/injvm/InJVMTokenAwareVisitExecutor.java
index 7256855..b0f4b95 100644
--- a/harry-core/src/harry/model/sut/injvm/InJVMTokenAwareVisitExecutor.java
+++ b/harry-core/src/harry/model/sut/injvm/InJVMTokenAwareVisitExecutor.java
@@ -24,9 +24,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.concurrent.Uninterruptibles;
 import harry.core.Run;
 import harry.ddl.SchemaSpec;
 import harry.model.sut.SystemUnderTest;
@@ -46,6 +50,8 @@ import static 
harry.model.sut.TokenPlacementModel.peerStateToNodes;
 
 public class InJVMTokenAwareVisitExecutor extends 
LoggingVisitor.LoggingVisitorExecutor
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(InJVMTokenAwareVisitExecutor.class);
+
     private final InJvmSut sut;
     private final TokenPlacementModel.ReplicationFactor rf;
     private final SystemUnderTest.ConsistencyLevel cl;
@@ -72,44 +78,43 @@ public class InJVMTokenAwareVisitExecutor extends 
LoggingVisitor.LoggingVisitorE
     }
 
     @Override
-    protected void executeAsyncWithRetries(long lts, long pd, 
CompletableFuture<Object[][]> future, CompiledStatement statement)
-    {
-        executeAsyncWithRetries(lts, pd, future, statement, 0);
-    }
-
-    private void executeAsyncWithRetries(long lts, long pd, 
CompletableFuture<Object[][]> future, CompiledStatement statement, int retries)
+    protected Object[][] executeWithRetries(long lts, long pd, 
CompiledStatement statement)
     {
         if (sut.isShutdown())
             throw new IllegalStateException("System under test is shut down");
 
-        if (retries > this.MAX_RETRIES)
-            throw new IllegalStateException(String.format("Can not execute 
statement %s after %d retries", statement, retries));
+        int retries = 0;
 
-        Object[] pk =  schema.inflatePartitionKey(pd);
+        Object[] pk = schema.inflatePartitionKey(pd);
         List<TokenPlacementModel.Node> replicas = 
getRing().replicasFor(TokenUtil.token(ByteUtils.compose(ByteUtils.objectsToBytes(pk))));
-
-        TokenPlacementModel.Node replica = replicas.get((int) (lts % 
replicas.size()));
-        if (cl == SystemUnderTest.ConsistencyLevel.NODE_LOCAL)
-        {
-            future.complete(executeNodeLocal(statement.cql(), replica, 
statement.bindings()));
-        }
-        else
+        while (retries++ < MAX_RETRIES)
         {
-            CompletableFuture.supplyAsync(() ->  sut.cluster
-                                                 .stream()
-                                                 .filter((n) -> 
n.config().broadcastAddress().toString().contains(replica.id))
-                                                 .findFirst()
-                                                 .get()
-                                                 .coordinator()
-                                                 .execute(statement.cql(), 
InJvmSut.toApiCl(cl), statement.bindings()), executor)
-                             .whenComplete((res, t) ->
-                                           {
-                                               if (t != null)
-                                                   executor.schedule(() -> 
executeAsyncWithRetries(lts, pd, future, statement, retries + 1), 1, 
TimeUnit.SECONDS);
-                                               else
-                                                   future.complete(res);
-                                           });
+            try
+            {
+                TokenPlacementModel.Node replica = replicas.get((int) (lts % 
replicas.size()));
+                if (cl == SystemUnderTest.ConsistencyLevel.NODE_LOCAL)
+                {
+                    return executeNodeLocal(statement.cql(), replica, 
statement.bindings());
+                }
+                else
+                {
+                    return sut.cluster
+                           .stream()
+                           .filter((n) -> 
n.config().broadcastAddress().toString().contains(replica.id))
+                           .findFirst()
+                           .get()
+                           .coordinator()
+                           .execute(statement.cql(), InJvmSut.toApiCl(cl), 
statement.bindings());
+                }
+            }
+            catch (Throwable t)
+            {
+                int delaySecs = 1;
+                logger.error(String.format("Caught message while trying to 
execute: %s. Scheduled to retry in %s seconds", statement, delaySecs), t);
+                Uninterruptibles.sleepUninterruptibly(delaySecs, 
TimeUnit.SECONDS);
+            }
         }
+        throw new IllegalStateException(String.format("Can not execute 
statement %s after %d retries", statement, retries));
     }
 
     protected TokenPlacementModel.ReplicatedRanges getRing()
diff --git a/harry-core/src/harry/reconciler/Reconciler.java 
b/harry-core/src/harry/reconciler/Reconciler.java
index d3726c2..d6abe75 100644
--- a/harry-core/src/harry/reconciler/Reconciler.java
+++ b/harry-core/src/harry/reconciler/Reconciler.java
@@ -32,7 +32,6 @@ import harry.model.OpSelectors;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
 import harry.runner.DataTracker;
-import harry.util.BitSet;
 import harry.util.Ranges;
 import harry.util.StringUtils;
 import harry.visitors.GeneratingVisitor;
@@ -93,7 +92,7 @@ public class Reconciler
             private final List<ReplayingVisitor.Operation> columnDeletes = new 
ArrayList<>();
 
             @Override
-            protected void operation(long lts, long pd, long cd, long m, long 
opId, OpSelectors.OperationKind opType)
+            protected void operation(long lts, long pd, long cd, long opId, 
OpSelectors.OperationKind opType)
             {
                 if (hadPartitionDeletion)
                     return;
@@ -239,12 +238,6 @@ public class Reconciler
                 }
             }
 
-            @Override
-            protected void afterBatch(long lts, long pd, long m) {}
-
-            @Override
-            protected void beforeBatch(long lts, long pd, long m) {}
-
             @Override
             public void shutdown() throws InterruptedException {}
         }
@@ -304,7 +297,7 @@ public class Reconciler
 
         public String toString()
         {
-            return toString((SchemaSpec) null);
+            return toString(null);
         }
 
         public String toString(SchemaSpec schema)
diff --git a/harry-core/src/harry/visitors/GeneratingVisitor.java 
b/harry-core/src/harry/visitors/GeneratingVisitor.java
index 624330f..bd95349 100644
--- a/harry-core/src/harry/visitors/GeneratingVisitor.java
+++ b/harry-core/src/harry/visitors/GeneratingVisitor.java
@@ -47,23 +47,13 @@ public class GeneratingVisitor extends LtsVisitor
     private void generate(long lts, long pd)
     {
         beforeLts(lts, pd);
-
-        int modificationsCount = descriptorSelector.numberOfModifications(lts);
-        int opsPerModification = descriptorSelector.opsPerModification(lts);
-
-        for (long m = 0; m < modificationsCount; m++)
+        int opsPerLts = descriptorSelector.operationsPerLts(lts);
+        for (long opId = 0; opId < opsPerLts; opId++)
         {
-            beforeBatch(lts, pd, m);
-            for (long i = 0; i < opsPerModification; i++)
-            {
-                long opId = m * opsPerModification + i;
-                long cd = descriptorSelector.cd(pd, lts, opId, schema);
-                OpSelectors.OperationKind opType = 
descriptorSelector.operationType(pd, lts, opId);
-                operation(lts, pd, cd, m, opId, opType);
-            }
-            afterBatch(lts, pd, m);
+            long cd = descriptorSelector.cd(pd, lts, opId, schema);
+            OpSelectors.OperationKind opType = 
descriptorSelector.operationType(pd, lts, opId);
+            operation(lts, pd, cd, opId, opType);
         }
-
         afterLts(lts, pd);
     }
 }
diff --git a/harry-core/src/harry/visitors/LoggingVisitor.java 
b/harry-core/src/harry/visitors/LoggingVisitor.java
index 4acf7fe..0fd086d 100644
--- a/harry-core/src/harry/visitors/LoggingVisitor.java
+++ b/harry-core/src/harry/visitors/LoggingVisitor.java
@@ -63,11 +63,11 @@ public class LoggingVisitor extends GeneratingVisitor
         }
 
         @Override
-        protected CompiledStatement operationInternal(long lts, long pd, long 
cd, long m, long opId, OpSelectors.OperationKind opType)
+        protected CompiledStatement operationInternal(long lts, long pd, long 
cd, long opId, OpSelectors.OperationKind opType)
         {
-            CompiledStatement statement = super.operationInternal(lts, pd, cd, 
m, opId, opType);
-            log(String.format("LTS: %d. Pd %d. Cd %d. M %d. OpId: %d Statement 
%s\n",
-                              lts, pd, cd, m, opId, statement));
+            CompiledStatement statement = super.operationInternal(lts, pd, cd, 
opId, opType);
+            log(String.format("LTS: %d. Pd %d. Cd %d. OpId: %d Statement %s\n",
+                              lts, pd, cd, opId, statement));
             return statement;
         }
 
diff --git a/harry-core/src/harry/visitors/LtsVisitor.java 
b/harry-core/src/harry/visitors/LtsVisitor.java
index 5b37db0..6b1f157 100644
--- a/harry-core/src/harry/visitors/LtsVisitor.java
+++ b/harry-core/src/harry/visitors/LtsVisitor.java
@@ -71,21 +71,9 @@ public abstract class LtsVisitor extends VisitExecutor 
implements Visitor
     }
 
     @Override
-    protected void beforeBatch(long lts, long pd, long m)
+    protected void operation(long lts, long pd, long cd, long opId, 
OpSelectors.OperationKind opType)
     {
-        delegate.beforeBatch(lts, pd, m);
-    }
-
-    @Override
-    protected void operation(long lts, long pd, long cd, long m, long opId, 
OpSelectors.OperationKind opType)
-    {
-        delegate.operation(lts, pd, cd, m, opId, opType);
-    }
-
-    @Override
-    protected void afterBatch(long lts, long pd, long m)
-    {
-        delegate.afterBatch(lts, pd, m);
+        delegate.operation(lts, pd, cd, opId, opType);
     }
 
     @Override
diff --git a/harry-core/src/harry/visitors/MutatingVisitor.java 
b/harry-core/src/harry/visitors/MutatingVisitor.java
index c8f45f7..f08c8f9 100644
--- a/harry-core/src/harry/visitors/MutatingVisitor.java
+++ b/harry-core/src/harry/visitors/MutatingVisitor.java
@@ -21,15 +21,13 @@ package harry.visitors;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import harry.concurrent.Uninterruptibles;
 import harry.core.Configuration;
 import harry.core.Run;
 import harry.model.OpSelectors;
@@ -73,10 +71,6 @@ public class MutatingVisitor extends GeneratingVisitor
         private final List<String> statements = new ArrayList<>();
         private final List<Object> bindings = new ArrayList<>();
 
-        private final List<CompletableFuture<?>> futures = new ArrayList<>();
-
-        protected final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(2);
-
         protected final OpSelectors.DescriptorSelector descriptorSelector;
         protected final DataTracker tracker;
         protected final SystemUnderTest sut;
@@ -106,50 +100,6 @@ public class MutatingVisitor extends GeneratingVisitor
 
         @Override
         public void afterLts(long lts, long pd)
-        {
-            // TODO: switch to Cassandra futures!
-            for (CompletableFuture<?> future : futures)
-            {
-                try
-                {
-                    future.get(10, TimeUnit.SECONDS);
-                }
-                catch (Throwable t)
-                {
-                    int complete = 0;
-                    for (CompletableFuture<?> f : futures)
-                        if (f.isDone()) complete++;
-
-                    throw new IllegalStateException(String.format("Couldn't 
repeat operations within timeout bounds. %d out of %d futures complete", 
complete, futures.size()), t);
-                }
-            }
-            futures.clear();
-            tracker.endModification(lts);
-        }
-
-        @Override
-        public void beforeBatch(long lts, long pd, long m)
-        {
-            statements.clear();
-            bindings.clear();
-        }
-
-        @Override
-        public void operation(long lts, long pd, long cd, long m, long opId, 
OpSelectors.OperationKind opType)
-        {
-            CompiledStatement statement = operationInternal(lts, pd, cd, m, 
opId, opType);
-
-            statements.add(statement.cql());
-            Collections.addAll(bindings, statement.bindings());
-        }
-
-        protected CompiledStatement operationInternal(long lts, long pd, long 
cd, long m, long opId, OpSelectors.OperationKind opType)
-        {
-            return rowVisitor.perform(opType, lts, pd, cd, opId);
-        }
-
-        @Override
-        public void afterBatch(long lts, long pd, long m)
         {
             if (statements.isEmpty())
             {
@@ -158,52 +108,57 @@ public class MutatingVisitor extends GeneratingVisitor
             }
 
             String query = String.join(" ", statements);
-
             if (statements.size() > 1)
                 query = String.format("BEGIN UNLOGGED BATCH\n%s\nAPPLY 
BATCH;", query);
 
             Object[] bindingsArray = new Object[bindings.size()];
             bindings.toArray(bindingsArray);
-
-            CompletableFuture<Object[][]> future = new CompletableFuture<>();
-            executeAsyncWithRetries(lts, pd, future, new 
CompiledStatement(query, bindingsArray));
-            futures.add(future);
-
             statements.clear();
             bindings.clear();
+
+            executeWithRetries(lts, pd, new CompiledStatement(query, 
bindingsArray));
+            tracker.endModification(lts);
         }
 
-        protected void executeAsyncWithRetries(long lts, long pd, 
CompletableFuture<Object[][]> future, CompiledStatement statement)
+        @Override
+        public void operation(long lts, long pd, long cd, long opId, 
OpSelectors.OperationKind opType)
         {
-            executeAsyncWithRetries(future, statement, 0);
+            CompiledStatement statement = operationInternal(lts, pd, cd, opId, 
opType);
+            statements.add(statement.cql());
+            Collections.addAll(bindings, statement.bindings());
         }
 
-        private void executeAsyncWithRetries(CompletableFuture<Object[][]> 
future, CompiledStatement statement, int retries)
+        protected CompiledStatement operationInternal(long lts, long pd, long 
cd, long opId, OpSelectors.OperationKind opType)
+        {
+            return rowVisitor.perform(opType, lts, pd, cd, opId);
+        }
+
+        protected Object[][] executeWithRetries(long lts, long pd, 
CompiledStatement statement)
         {
             if (sut.isShutdown())
                 throw new IllegalStateException("System under test is shut 
down");
 
-            if (retries > this.maxRetries)
-                throw new IllegalStateException(String.format("Can not execute 
statement %s after %d retries", statement, retries));
-
-            sut.executeAsync(statement.cql(), consistencyLevel, 
statement.bindings())
-               .whenComplete((res, t) -> {
-                   if (t != null)
-                   {
-                       logger.error("Caught message while trying to execute " 
+  statement, t);
-                       int delaySecs = 1;
-                       executor.schedule(() -> executeAsyncWithRetries(future, 
statement, retries + 1), delaySecs, TimeUnit.SECONDS);
-                       logger.info("Scheduled retry to happen with delay {} 
seconds", delaySecs);
-                   }
-                   else
-                       future.complete(res);
-               });
+            int retries = 0;
+
+            while (retries++ < maxRetries)
+            {
+                try
+                {
+                    return sut.execute(statement.cql(), consistencyLevel, 
statement.bindings());
+                }
+                catch (Throwable t)
+                {
+                    int delaySecs = 1;
+                    logger.error(String.format("Caught message while trying to 
execute: %s. Scheduled to retry in %s seconds", statement, delaySecs), t);
+                    Uninterruptibles.sleepUninterruptibly(delaySecs, 
TimeUnit.SECONDS);
+                }
+            }
+
+            throw new IllegalStateException(String.format("Can not execute 
statement %s after %d retries", statement, retries));
         }
 
         public void shutdown() throws InterruptedException
         {
-            executor.shutdown();
-            executor.awaitTermination(30, TimeUnit.SECONDS);
         }
     }
 }
diff --git a/harry-core/src/harry/visitors/ReplayingVisitor.java 
b/harry-core/src/harry/visitors/ReplayingVisitor.java
index bd3e1c2..42db25c 100644
--- a/harry-core/src/harry/visitors/ReplayingVisitor.java
+++ b/harry-core/src/harry/visitors/ReplayingVisitor.java
@@ -18,6 +18,7 @@
 
 package harry.visitors;
 
+import java.awt.image.AffineTransformOp;
 import java.util.Arrays;
 import java.util.function.LongSupplier;
 
@@ -44,15 +45,8 @@ public abstract class ReplayingVisitor extends LtsVisitor
     private void replay(Visit visit)
     {
         beforeLts(visit.lts, visit.pd);
-
-        for (Batch batch : visit.operations)
-        {
-            beforeBatch(visit.lts, visit.pd, batch.m);
-            for (Operation operation : batch.operations)
-                operation(visit.lts, visit.pd, operation.cd, batch.m, 
operation.opId, operation.opType);
-            afterBatch(visit.lts, visit.pd, batch.m);
-        }
-
+        for (Operation operation : visit.operations)
+            operation(visit.lts, visit.pd, operation.cd, operation.opId, 
operation.opType);
         afterLts(visit.lts, visit.pd);
     }
 
@@ -60,9 +54,9 @@ public abstract class ReplayingVisitor extends LtsVisitor
     {
         public final long lts;
         public final long pd;
-        public final Batch[] operations;
+        public final Operation[] operations;
 
-        public Visit(long lts, long pd, Batch[] operations)
+        public Visit(long lts, long pd, Operation[] operations)
         {
             this.lts = lts;
             this.pd = pd;
@@ -79,26 +73,6 @@ public abstract class ReplayingVisitor extends LtsVisitor
         }
     }
 
-    public static class Batch
-    {
-        public final long m;
-        public final Operation[] operations;
-
-        public Batch(long m, Operation[] operations)
-        {
-            this.m = m;
-            this.operations = operations;
-        }
-
-        public String toString()
-        {
-            return "Batch{" +
-                   "m=" + m +
-                   ", operations=[" + Arrays.toString(operations) +
-                   "]}";
-        }
-    }
-
     public static class Operation
     {
         public final long cd;
diff --git a/harry-core/src/harry/visitors/VisitExecutor.java 
b/harry-core/src/harry/visitors/VisitExecutor.java
index 63efba0..b911d0a 100644
--- a/harry-core/src/harry/visitors/VisitExecutor.java
+++ b/harry-core/src/harry/visitors/VisitExecutor.java
@@ -26,11 +26,7 @@ public abstract class VisitExecutor
 
     protected abstract void afterLts(long lts, long pd);
 
-    protected abstract void beforeBatch(long lts, long pd, long m);
-
-    protected abstract void operation(long lts, long pd, long cd, long m, long 
opId, OpSelectors.OperationKind kind);
-
-    protected abstract void afterBatch(long lts, long pd, long m);
+    protected abstract void operation(long lts, long pd, long cd, long opId, 
OpSelectors.OperationKind kind);
 
     public abstract void shutdown() throws InterruptedException;
 }
diff --git a/harry-core/test/harry/model/OpSelectorsTest.java 
b/harry-core/test/harry/model/OpSelectorsTest.java
index b2afd19..8cb766d 100644
--- a/harry-core/test/harry/model/OpSelectorsTest.java
+++ b/harry-core/test/harry/model/OpSelectorsTest.java
@@ -62,7 +62,6 @@ public class OpSelectorsTest
                                                                                
                              new 
OpSelectors.ColumnSelectorBuilder().forAll(schema)
                                                                                
                                                                     .build(),
                                                                                
                              
OpSelectors.DefaultDescriptorSelector.DEFAULT_OP_SELECTOR,
-                                                                               
                              new Distribution.ScaledDistribution(1, 3),
                                                                                
                              new Distribution.ScaledDistribution(2, 10),
                                                                                
                              50);
 
@@ -73,19 +72,16 @@ public class OpSelectorsTest
         for (int lts = 0; lts < RUNS; lts++)
         {
             long pd = pdSupplier.pd(lts);
-            for (int m = 0; m < descriptorSelector.numberOfModifications(lts); 
m++)
+
+            int opsPerLts = descriptorSelector.operationsPerLts(lts);
+            for (int opId = 0; opId < opsPerLts; opId++)
             {
-                int opsPerMod = descriptorSelector.opsPerModification(lts);
-                for (int rowId = 0; rowId < opsPerMod; rowId++)
+                long cd = descriptorSelector.cd(pd, lts, opId);
+                Assert.assertEquals(opId, descriptorSelector.opId(pd, lts, 
cd));
+                Assert.assertTrue(descriptorSelector.isCdVisitedBy(pd, lts, 
cd));
+                for (int col = 0; col < 10; col++)
                 {
-                    long cd = descriptorSelector.cd(pd, lts, rowId);
-                    Assert.assertEquals(rowId, descriptorSelector.rowId(pd, 
lts, cd));
-                    Assert.assertTrue(descriptorSelector.isCdVisitedBy(pd, 
lts, cd));
-                    for (int col = 0; col < 10; col++)
-                    {
-                        long vd = descriptorSelector.vd(pd, cd, lts, m, col);
-                        Assert.assertEquals(m, 
descriptorSelector.modificationId(pd, cd, lts, vd, col));
-                    }
+                    long vd = descriptorSelector.vd(pd, cd, lts, opId, col);
                 }
             }
         }
@@ -225,8 +221,7 @@ public class OpSelectorsTest
                                                                                
                                                      
OpSelectors.OperationKind.DELETE_COLUMN,
                                                                                
                                                      
OpSelectors.OperationKind.INSERT,
                                                                                
                                                      
OpSelectors.OperationKind.UPDATE),
-                                                                               
               new Distribution.ConstantDistribution(2),
-                                                                               
               new Distribution.ConstantDistribution(5),
+                                                                               
               new Distribution.ConstantDistribution(10),
                                                                                
               10);
 
         Map<Long, Set<Long>> partitionMap = new HashMap<>();
@@ -340,8 +335,7 @@ public class OpSelectorsTest
                                                                                
                                                           
OpSelectors.OperationKind.DELETE_COLUMN,
                                                                                
                                                           
OpSelectors.OperationKind.INSERT,
                                                                                
                                                           
OpSelectors.OperationKind.UPDATE),
-                                                                               
                    new Distribution.ConstantDistribution(2),
-                                                                               
                    new Distribution.ConstantDistribution(5),
+                                                                               
                    new Distribution.ConstantDistribution(10),
                                                                                
                    100);
 
         Set<Long> ck1 = new TreeSet<>();
@@ -385,14 +379,13 @@ public class OpSelectorsTest
         OpSelectors.DescriptorSelector descriptorSelector = new 
OpSelectors.DefaultDescriptorSelector(rng,
                                                                                
                       null,
                                                                                
                       selector,
-                                                                               
                       new Distribution.ConstantDistribution(2),
-                                                                               
                       new Distribution.ConstantDistribution(2),
+                                                                               
                       new Distribution.ConstantDistribution(10),
                                                                                
                       100);
 
         EnumMap<OpSelectors.OperationKind, Integer> m = new 
EnumMap<OpSelectors.OperationKind, Integer>(OpSelectors.OperationKind.class);
         for (int lts = 0; lts < 1000000; lts++)
         {
-            int total = descriptorSelector.numberOfModifications(lts) * 
descriptorSelector.numberOfModifications(lts);
+            int total = descriptorSelector.operationsPerLts(lts);
             long pd = pdSelector.pd(lts);
             for (int opId = 0; opId < total; opId++)
             {
diff --git a/harry-core/test/harry/operations/RelationTest.java 
b/harry-core/test/harry/operations/RelationTest.java
index 178bd96..6804619 100644
--- a/harry-core/test/harry/operations/RelationTest.java
+++ b/harry-core/test/harry/operations/RelationTest.java
@@ -110,11 +110,6 @@ public class RelationTest
                                                                           
throw new RuntimeException("not implemented");
                                                                       }
 
-                                                                      public 
long maxLts(long lts)
-                                                                      {
-                                                                          
throw new RuntimeException("not implemented");
-                                                                      }
-
                                                                       public 
long minLtsAt(long position)
                                                                       {
                                                                           
throw new RuntimeException("not implemented");
@@ -126,11 +121,6 @@ public class RelationTest
                                                                           
throw new RuntimeException("not implemented");
                                                                       }
 
-                                                                      public 
long positionFor(long lts)
-                                                                      {
-                                                                          
throw new RuntimeException("not implemented");
-                                                                      }
-
                                                                       public 
long maxPosition(long maxLts)
                                                                       {
                                                                           
throw new RuntimeException("not implemented");
@@ -138,12 +128,7 @@ public class RelationTest
                                                                   },
                                                                   new 
OpSelectors.DescriptorSelector()
                                                                   {
-                                                                      public 
int numberOfModifications(long lts)
-                                                                      {
-                                                                          
throw new RuntimeException("not implemented");
-                                                                      }
-
-                                                                      public 
int opsPerModification(long lts)
+                                                                      public 
int operationsPerLts(long lts)
                                                                       {
                                                                           
throw new RuntimeException("not implemented");
                                                                       }
@@ -183,12 +168,7 @@ public class RelationTest
                                                                           
throw new RuntimeException("not implemented");
                                                                       }
 
-                                                                      public 
long rowId(long pd, long lts, long cd)
-                                                                      {
-                                                                          
return 0;
-                                                                      }
-
-                                                                      public 
long modificationId(long pd, long cd, long lts, long vd, int col)
+                                                                      public 
long opId(long pd, long lts, long cd)
                                                                       {
                                                                           
return 0;
                                                                       }
diff --git 
a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java 
b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
index c91f86b..542e1e2 100644
--- a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
+++ b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
@@ -101,7 +101,6 @@ public class DataGeneratorsIntegrationTest extends CQLTester
                                                                                
                OpSelectors.columnSelectorBuilder().forAll(schema_).build(),
                                                                                
                
OpSelectors.OperationSelector.weighted(Surjections.weights(100), opKind),
                                                                                
                new Distribution.ConstantDistribution(2),
-                                                                               
                new Distribution.ConstantDistribution(2),
                                                                                
                100);
                                           })
                                           .build()
diff --git 
a/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java 
b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
index 9f9c259..c94a231 100644
--- a/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
+++ b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
@@ -47,8 +47,7 @@ public class HistoryBuilderIntegrationTest extends 
ModelTestBase
         return super.configuration(seed, schema)
                     .setPartitionDescriptorSelector((ignore) -> new 
HistoryBuilder.PdSelector())
                     // TODO: ideally, we want a custom/tailored clustering 
descriptor selector
-                    .setClusteringDescriptorSelector((builder) -> 
builder.setNumberOfModificationsDistribution(new 
Configuration.ConstantDistributionConfig(100_000))
-                                                                         
.setRowsPerModificationDistribution(new 
Configuration.ConstantDistributionConfig(100_000)));
+                    .setClusteringDescriptorSelector((builder) -> 
builder.setOperationsPerLtsDistribution(new 
Configuration.ConstantDistributionConfig(100_000)));
     }
 
     @Test
diff --git a/harry-integration/test/harry/model/HistoryBuilderTest.java 
b/harry-integration/test/harry/model/HistoryBuilderTest.java
index e7f3413..a2079f2 100644
--- a/harry-integration/test/harry/model/HistoryBuilderTest.java
+++ b/harry-integration/test/harry/model/HistoryBuilderTest.java
@@ -41,8 +41,7 @@ public class HistoryBuilderTest
 
         Configuration config = IntegrationTestBase.sharedConfiguration(1, 
schema)
                                                   
.setPartitionDescriptorSelector((ignore) -> new HistoryBuilder.PdSelector())
-                                                  
.setClusteringDescriptorSelector((builder) -> 
builder.setNumberOfModificationsDistribution(new 
Configuration.ConstantDistributionConfig(100_000))
-                                                                               
                        .setRowsPerModificationDistribution(new 
Configuration.ConstantDistributionConfig(100_000)))
+                                                  
.setClusteringDescriptorSelector((builder) -> 
builder.setOperationsPerLtsDistribution(new 
Configuration.ConstantDistributionConfig(100_000)))
                                                   .build();
 
         Run run = config.createRun();
@@ -95,7 +94,7 @@ public class HistoryBuilderTest
                                     }
                                 }
 
-                                public void operation(long lts, long pd, long 
cd, long m, long opId, OpSelectors.OperationKind kind)
+                                public void operation(long lts, long pd, long 
cd, long opId, OpSelectors.OperationKind kind)
                                 {
                                     switch (kind)
                                     {
@@ -127,8 +126,6 @@ public class HistoryBuilderTest
                                 }
 
                                 public void afterLts(long lts, long pd){}
-                                public void beforeBatch(long lts, long pd, 
long m){}
-                                public void afterBatch(long lts, long pd, long 
m){}
                                 public void shutdown() {}
                             });
                             visitor.replayAll();
diff --git a/harry-integration/test/harry/model/IntegrationTestBase.java 
b/harry-integration/test/harry/model/IntegrationTestBase.java
index 0d89ae2..bccdc27 100644
--- a/harry-integration/test/harry/model/IntegrationTestBase.java
+++ b/harry-integration/test/harry/model/IntegrationTestBase.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.distributed.test.TestBaseImpl;
 
 public class IntegrationTestBase extends TestBaseImpl
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(IntegrationTestBase.class);
+    protected static final Logger logger = 
LoggerFactory.getLogger(IntegrationTestBase.class);
     protected static Cluster cluster;
     protected static InJvmSut sut;
 
@@ -84,8 +84,7 @@ public class IntegrationTestBase extends TestBaseImpl
     public static Configuration.CDSelectorConfigurationBuilder 
sharedCDSelectorConfiguration()
     {
         return new Configuration.CDSelectorConfigurationBuilder()
-               .setNumberOfModificationsDistribution(new 
Configuration.ConstantDistributionConfig(2))
-               .setRowsPerModificationDistribution(new 
Configuration.ConstantDistributionConfig(2))
+               .setOperationsPerLtsDistribution(new 
Configuration.ConstantDistributionConfig(2))
                .setMaxPartitionSize(100)
                .setOperationKindWeights(new 
Configuration.OperationKindSelectorBuilder()
                                         
.addWeight(OpSelectors.OperationKind.DELETE_ROW, 1)
diff --git a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java 
b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
index 356882f..cd73bc4 100644
--- a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
@@ -102,8 +102,7 @@ public class QuerySelectorNegativeTest extends 
IntegrationTestBase
             beforeEach();
             Configuration config = gen.get()
                                       
.setClusteringDescriptorSelector(sharedCDSelectorConfiguration()
-                                                                       
.setNumberOfModificationsDistribution(new 
Configuration.ConstantDistributionConfig(2))
-                                                                       
.setRowsPerModificationDistribution(new 
Configuration.ConstantDistributionConfig(2))
+                                                                       
.setOperationsPerLtsDistribution(new 
Configuration.ConstantDistributionConfig(2))
                                                                        
.setMaxPartitionSize(2000)
                                                                        
.build())
                                       .build();
diff --git 
a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java 
b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
index 741e526..47f90cc 100644
--- a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
+++ b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
@@ -216,8 +216,7 @@ public class QuiescentCheckerIntegrationTest extends 
ModelTestBase
     {
         return super.configuration(seed, schema)
                     
.setClusteringDescriptorSelector(sharedCDSelectorConfiguration()
-                                                     
.setNumberOfModificationsDistribution(new 
Configuration.ConstantDistributionConfig(2))
-                                                     
.setRowsPerModificationDistribution(new 
Configuration.ConstantDistributionConfig(2))
+                                                     
.setOperationsPerLtsDistribution(new 
Configuration.ConstantDistributionConfig(2))
                                                      .setMaxPartitionSize(100)
                                                      .build());
     }
diff --git a/harry-integration/test/harry/op/RowVisitorTest.java 
b/harry-integration/test/harry/op/RowVisitorTest.java
index a644afa..d5ae9e0 100644
--- a/harry-integration/test/harry/op/RowVisitorTest.java
+++ b/harry-integration/test/harry/op/RowVisitorTest.java
@@ -67,8 +67,7 @@ public class RowVisitorTest extends CQLTester
                                                                                
                           new 
OpSelectors.ColumnSelectorBuilder().forAll(schema)
                                                                                
                                                                  .build(),
                                                                                
                           DEFAULT_OP_SELECTOR,
-                                                                               
                           new Distribution.ScaledDistribution(1, 3),
-                                                                               
                           new Distribution.ScaledDistribution(2, 30),
+                                                                               
                           new Distribution.ScaledDistribution(1, 30),
                                                                                
                           100);
 
             Run run = new Run(rng,
diff --git a/harry-integration/test/resources/single_partition_test.yml 
b/harry-integration/test/resources/single_partition_test.yml
index 3f2017f..fdb327f 100644
--- a/harry-integration/test/resources/single_partition_test.yml
+++ b/harry-integration/test/resources/single_partition_test.yml
@@ -21,10 +21,7 @@ partition_descriptor_selector:
 
 clustering_descriptor_selector:
   default:
-    modifications_per_lts:
-      type: "constant"
-      constant: 2
-    rows_per_modification:
+    operations_per_lts:
       type: "constant"
       constant: 2
     operation_kind_weights:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to