This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f764b2d5d4 IGNITE-23134 Add transactional benchmarks to measure 
operations with lock conficts (#4685)
f764b2d5d4 is described below

commit f764b2d5d47679b09028124a89ad19888f7420f1
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Nov 12 16:07:43 2024 +0200

    IGNITE-23134 Add transactional benchmarks to measure operations with lock 
conficts (#4685)
---
 .../ignite/internal/replicator/ReplicaService.java |   6 +-
 .../benchmark/AbstractMultiNodeBenchmark.java      |  14 +-
 .../TxBalanceRetryOperationBenchmark.java          | 226 +++++++++++++++++++++
 .../benchmark/TxUpsertRetryOperationBenchmark.java | 215 ++++++++++++++++++++
 4 files changed, 458 insertions(+), 3 deletions(-)

diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 6af9d8bc4a..729f88f50d 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -237,11 +237,13 @@ public class ReplicaService {
                             return null;
                         }, partitionOperationsExecutor);
                     } else {
-                        if (retryExecutor != null && 
matchAny(unwrapCause(errResp.throwable()), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR)) 
{
+                        int replicaOperationRetryInterval = 
replicationConfiguration.replicaOperationRetryInterval().value();
+                        if (retryExecutor != null && 
matchAny(unwrapCause(errResp.throwable()), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR)
+                                && replicaOperationRetryInterval > 0) {
                             retryExecutor.schedule(
                                     // Need to resubmit again to pool which is 
valid for synchronous IO execution.
                                     () -> 
partitionOperationsExecutor.execute(() -> 
res.completeExceptionally(errResp.throwable())),
-                                    
replicationConfiguration.replicaOperationRetryInterval().value(), MILLISECONDS);
+                                    replicaOperationRetryInterval, 
MILLISECONDS);
                         } else {
                             res.completeExceptionally(errResp.throwable());
                         }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index bde9e1b730..cce6f4386f 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
 import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
@@ -74,6 +75,11 @@ public class AbstractMultiNodeBenchmark {
     @Param({"false", "true"})
     private boolean fsync;
 
+    @Nullable
+    protected String clusterConfiguration() {
+        return null;
+    }
+
     /**
      * Starts ignite node and creates table {@link #TABLE_NAME}.
      */
@@ -198,7 +204,7 @@ public class AbstractMultiNodeBenchmark {
                 + "  clientConnector: { port:{} },\n"
                 + "  rest.port: {},\n"
                 + "  raft.fsync = " + fsync() + ",\n"
-                + "  system.partitionsLogPath = \"" + logPath() + "\""
+                + "  system.partitionsLogPath = \"" + logPath() + "\",\n"
                 + "}";
 
         for (int i = 0; i < nodes(); i++) {
@@ -213,9 +219,15 @@ public class AbstractMultiNodeBenchmark {
 
         String metaStorageNodeName = nodeName(BASE_PORT);
 
+        @Language("HOCON")
+        String clusterCfg = "ignite {\n"
+                + clusterConfiguration() + "\n"
+                + "}";
+
         InitParameters initParameters = InitParameters.builder()
                 .metaStorageNodeNames(metaStorageNodeName)
                 .clusterName("cluster")
+                .clusterConfiguration(clusterCfg)
                 .build();
 
         TestIgnitionManager.init(igniteServers.get(0), initParameters);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxBalanceRetryOperationBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxBalanceRetryOperationBenchmark.java
new file mode 100644
index 0000000000..b3222a8e2c
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxBalanceRetryOperationBenchmark.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.MismatchingTransactionOutcomeException;
+import org.apache.ignite.tx.Transaction;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark that tests lock conflicts on concurrent balance transfers.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(8)
+@Warmup(iterations = 10, time = 5)
+@Measurement(iterations = 20, time = 5)
+@BenchmarkMode({Mode.AverageTime, Mode.Throughput})
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TxBalanceRetryOperationBenchmark extends 
AbstractMultiNodeBenchmark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxBalanceRetryOperationBenchmark.class);
+
+    private static RecordView<Tuple> recordView;
+
+    private static IgniteTransactions transactions;
+
+    @Param({"false"})
+    private boolean fsync;
+
+    @Param({"100", "1000"})
+    private int keysUpperBound;
+
+    @Param({"waitTimeout", "replicaOperationRetry"})
+    private String txRetryMode;
+
+    @Override
+    protected String clusterConfiguration() {
+        if (txRetryMode.equals("waitTimeout")) {
+            return "transaction: { deadlockPreventionPolicy: { waitTimeout: 
30, txIdComparator: NATURAL } },"
+                    + "replication: { replicaOperationRetryInterval: 0 }";
+        } else {
+            assert txRetryMode.equals("replicaOperationRetry");
+
+            return "transaction: { deadlockPreventionPolicy: { waitTimeout: 0, 
txIdComparator: NATURAL } },"
+                    + "replication: { replicaOperationRetryInterval: 10 }";
+        }
+    }
+
+    /**
+     * Benchmark's entry point.
+     */
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+                .include(".*" + 
TxBalanceRetryOperationBenchmark.class.getSimpleName() + ".*")
+                .build();
+
+        new Runner(opt).run();
+    }
+
+    /**
+     * Setup.
+     */
+    @Setup(Level.Trial)
+    public void setup() {
+        recordView = publicIgnite.tables().table(TABLE_NAME).recordView();
+        transactions = publicIgnite.transactions();
+
+        Transaction tx = transactions.begin();
+
+        for (int i = 0; i < keysUpperBound; i++) {
+            recordView.insert(tx, Tuple.create().set("id", i).set("amount", 
1000.0f));
+        }
+
+        tx.commit();
+    }
+
+    /**
+     * Print counters.
+     *
+     * @param counters Counters.
+     */
+    @TearDown(Level.Iteration)
+    public void printCounters(TxnCounters counters) {
+        LOG.info("Total txns: " + counters.txnCounter.get());
+        LOG.info("Rolled back txns: " + counters.rollbackCounter.get());
+        counters.reset();
+    }
+
+    @Override
+    protected void createTable(String tableName) {
+        createTable(tableName,
+                List.of(
+                        "id int",
+                        "amount float"
+                ),
+                List.of("id"),
+                List.of()
+        );
+    }
+
+    /**
+     * Perform transaction.
+     *
+     * @param state Benchmark state.
+     */
+    @Benchmark
+    public void doTx(BenchmarkState state) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        Transaction tx = state.tx;
+
+        int from = random.nextInt(keysUpperBound);
+        int to = from;
+        while (to == from) {
+            to = random.nextInt(keysUpperBound);
+        }
+
+        try {
+            float amountFrom = recordView.get(tx, Tuple.create().set("id", 
from)).value("amount");
+            float amountTo = recordView.get(tx, Tuple.create().set("id", 
to)).value("amount");
+
+            recordView.upsert(tx, Tuple.create().set("id", from).set("amount", 
amountFrom - 10));
+            recordView.upsert(tx, Tuple.create().set("id", to).set("amount", 
amountTo + 10));
+        } catch (Exception e) {
+            state.toBeRolledBack = true;
+        }
+    }
+
+    /**
+     * Transaction state.
+     */
+    @State(Scope.Thread)
+    public static class BenchmarkState {
+        InternalTransaction tx;
+        boolean toBeRolledBack;
+
+        /**
+         * Start transaction.
+         *
+         * @param counters Counters.
+         */
+        @Setup(Level.Invocation)
+        public void startTx(TxnCounters counters) {
+            tx = (InternalTransaction) transactions.begin();
+            toBeRolledBack = false;
+            counters.txnCounter.incrementAndGet();
+        }
+
+        /**
+         * Finish transaction.
+         *
+         * @param counters Counters.
+         */
+        @TearDown(Level.Invocation)
+        public void finishTx(TxnCounters counters) {
+            if (toBeRolledBack) {
+                try {
+                    tx.rollback();
+                } catch (MismatchingTransactionOutcomeException ex) {
+                    // No-op.
+                }
+
+                counters.rollbackCounter.incrementAndGet();
+            }
+            try {
+                tx.commit();
+            } catch (MismatchingTransactionOutcomeException ex) {
+                counters.rollbackCounter.incrementAndGet();
+            }
+        }
+    }
+
+    /**
+     * Transaction counters.
+     */
+    @State(Scope.Benchmark)
+    public static class TxnCounters {
+        AtomicInteger txnCounter = new AtomicInteger();
+        AtomicInteger rollbackCounter = new AtomicInteger();
+
+        void reset() {
+            txnCounter.set(0);
+            rollbackCounter.set(0);
+        }
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxUpsertRetryOperationBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxUpsertRetryOperationBenchmark.java
new file mode 100644
index 0000000000..8daf4e2f63
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxUpsertRetryOperationBenchmark.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.MismatchingTransactionOutcomeException;
+import org.apache.ignite.tx.Transaction;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark that tests lock conflicts on concurrent upserts.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(8)
+@Warmup(iterations = 10, time = 5)
+@Measurement(iterations = 20, time = 5)
+@BenchmarkMode({Mode.AverageTime, Mode.Throughput})
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TxUpsertRetryOperationBenchmark extends 
AbstractMultiNodeBenchmark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxBalanceRetryOperationBenchmark.class);
+
+    private static RecordView<Tuple> recordView;
+
+    private static IgniteTransactions transactions;
+
+    @Param({"false"})
+    private boolean fsync;
+
+    @Param({"100", "1000"})
+    private int keysUpperBound;
+
+    @Param({"waitTimeout", "replicaOperationRetry"})
+    private String txRetryMode;
+
+    @Override
+    protected String clusterConfiguration() {
+        if (txRetryMode.equals("waitTimeout")) {
+            return "transaction: { deadlockPreventionPolicy: { waitTimeout: 
30, txIdComparator: NATURAL } },"
+                    + "replication: { replicaOperationRetryInterval: 0 }";
+        } else {
+            assert txRetryMode.equals("replicaOperationRetry");
+
+            return "transaction: { deadlockPreventionPolicy: { waitTimeout: 0, 
txIdComparator: NATURAL } },"
+                    + "replication: { replicaOperationRetryInterval: 10 }";
+        }
+    }
+
+    /**
+     * Benchmark's entry point.
+     */
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+                .include(".*" + 
TxUpsertRetryOperationBenchmark.class.getSimpleName() + ".*")
+                .build();
+
+        new Runner(opt).run();
+    }
+
+    /**
+     * Setup.
+     */
+    @Setup(Level.Trial)
+    public void setup() {
+        recordView = publicIgnite.tables().table(TABLE_NAME).recordView();
+        transactions = publicIgnite.transactions();
+
+        Transaction tx = transactions.begin();
+
+        for (int i = 0; i < keysUpperBound; i++) {
+            recordView.insert(tx, Tuple.create().set("id", i).set("amount", 
1000.0f));
+        }
+
+        tx.commit();
+    }
+
+    /**
+     * Print counters.
+     *
+     * @param counters Counters.
+     */
+    @TearDown(Level.Iteration)
+    public void printCounters(TxnCounters counters) {
+        LOG.info("Total txns: " + counters.txnCounter.get());
+        LOG.info("Rolled back txns: " + counters.rollbackCounter.get());
+        counters.reset();
+    }
+
+    @Override
+    protected void createTable(String tableName) {
+        createTable(tableName,
+                List.of(
+                        "id int",
+                        "amount float"
+                ),
+                List.of("id"),
+                List.of()
+        );
+    }
+
+    /**
+     * Perform a transaction.
+     *
+     * @param state Benchmark state.
+     */
+    @Benchmark
+    public void doTx(BenchmarkState state) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        Transaction tx = state.tx;
+
+        int key = random.nextInt(keysUpperBound);
+
+        try {
+            Double v = random.nextDouble(1000.0);
+            recordView.upsert(tx, Tuple.create().set("id", key).set("amount", 
v.floatValue()));
+        } catch (Exception e) {
+            state.toBeRolledBack = true;
+        }
+    }
+
+    /**
+     * Benchmark state.
+     */
+    @State(Scope.Thread)
+    public static class BenchmarkState {
+        InternalTransaction tx;
+        boolean toBeRolledBack;
+
+        /**
+         * Start transaction.
+         */
+        @Setup(Level.Invocation)
+        public void startTx(TxnCounters counters) {
+            tx = (InternalTransaction) transactions.begin();
+            toBeRolledBack = false;
+            counters.txnCounter.incrementAndGet();
+        }
+
+        /**
+         * Finish transaction.
+         */
+        @TearDown(Level.Invocation)
+        public void finishTx(TxnCounters counters) {
+            if (toBeRolledBack) {
+                try {
+                    tx.rollback();
+                } catch (MismatchingTransactionOutcomeException ex) {
+                    // No-op.
+                }
+
+                counters.rollbackCounter.incrementAndGet();
+            }
+            try {
+                tx.commit();
+            } catch (MismatchingTransactionOutcomeException ex) {
+                counters.rollbackCounter.incrementAndGet();
+            }
+        }
+    }
+
+    /**
+     * Transaction counters.
+     */
+    @State(Scope.Benchmark)
+    public static class TxnCounters {
+        AtomicInteger txnCounter = new AtomicInteger();
+        AtomicInteger rollbackCounter = new AtomicInteger();
+
+        void reset() {
+            txnCounter.set(0);
+            rollbackCounter.set(0);
+        }
+    }
+}

Reply via email to