This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f764b2d5d4 IGNITE-23134 Add transactional benchmarks to measure
operations with lock conficts (#4685)
f764b2d5d4 is described below
commit f764b2d5d47679b09028124a89ad19888f7420f1
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Nov 12 16:07:43 2024 +0200
IGNITE-23134 Add transactional benchmarks to measure operations with lock
conficts (#4685)
---
.../ignite/internal/replicator/ReplicaService.java | 6 +-
.../benchmark/AbstractMultiNodeBenchmark.java | 14 +-
.../TxBalanceRetryOperationBenchmark.java | 226 +++++++++++++++++++++
.../benchmark/TxUpsertRetryOperationBenchmark.java | 215 ++++++++++++++++++++
4 files changed, 458 insertions(+), 3 deletions(-)
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 6af9d8bc4a..729f88f50d 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -237,11 +237,13 @@ public class ReplicaService {
return null;
}, partitionOperationsExecutor);
} else {
- if (retryExecutor != null &&
matchAny(unwrapCause(errResp.throwable()), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR))
{
+ int replicaOperationRetryInterval =
replicationConfiguration.replicaOperationRetryInterval().value();
+ if (retryExecutor != null &&
matchAny(unwrapCause(errResp.throwable()), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR)
+ && replicaOperationRetryInterval > 0) {
retryExecutor.schedule(
// Need to resubmit again to pool which is
valid for synchronous IO execution.
() ->
partitionOperationsExecutor.execute(() ->
res.completeExceptionally(errResp.throwable())),
-
replicationConfiguration.replicaOperationRetryInterval().value(), MILLISECONDS);
+ replicaOperationRetryInterval,
MILLISECONDS);
} else {
res.completeExceptionally(errResp.throwable());
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index bde9e1b730..cce6f4386f 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
@@ -74,6 +75,11 @@ public class AbstractMultiNodeBenchmark {
@Param({"false", "true"})
private boolean fsync;
+ @Nullable
+ protected String clusterConfiguration() {
+ return null;
+ }
+
/**
* Starts ignite node and creates table {@link #TABLE_NAME}.
*/
@@ -198,7 +204,7 @@ public class AbstractMultiNodeBenchmark {
+ " clientConnector: { port:{} },\n"
+ " rest.port: {},\n"
+ " raft.fsync = " + fsync() + ",\n"
- + " system.partitionsLogPath = \"" + logPath() + "\""
+ + " system.partitionsLogPath = \"" + logPath() + "\",\n"
+ "}";
for (int i = 0; i < nodes(); i++) {
@@ -213,9 +219,15 @@ public class AbstractMultiNodeBenchmark {
String metaStorageNodeName = nodeName(BASE_PORT);
+ @Language("HOCON")
+ String clusterCfg = "ignite {\n"
+ + clusterConfiguration() + "\n"
+ + "}";
+
InitParameters initParameters = InitParameters.builder()
.metaStorageNodeNames(metaStorageNodeName)
.clusterName("cluster")
+ .clusterConfiguration(clusterCfg)
.build();
TestIgnitionManager.init(igniteServers.get(0), initParameters);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxBalanceRetryOperationBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxBalanceRetryOperationBenchmark.java
new file mode 100644
index 0000000000..b3222a8e2c
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxBalanceRetryOperationBenchmark.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.MismatchingTransactionOutcomeException;
+import org.apache.ignite.tx.Transaction;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark that tests lock conflicts on concurrent balance transfers.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(8)
+@Warmup(iterations = 10, time = 5)
+@Measurement(iterations = 20, time = 5)
+@BenchmarkMode({Mode.AverageTime, Mode.Throughput})
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TxBalanceRetryOperationBenchmark extends
AbstractMultiNodeBenchmark {
+ private static final IgniteLogger LOG =
Loggers.forClass(TxBalanceRetryOperationBenchmark.class);
+
+ private static RecordView<Tuple> recordView;
+
+ private static IgniteTransactions transactions;
+
+ @Param({"false"})
+ private boolean fsync;
+
+ @Param({"100", "1000"})
+ private int keysUpperBound;
+
+ @Param({"waitTimeout", "replicaOperationRetry"})
+ private String txRetryMode;
+
+ @Override
+ protected String clusterConfiguration() {
+ if (txRetryMode.equals("waitTimeout")) {
+ return "transaction: { deadlockPreventionPolicy: { waitTimeout:
30, txIdComparator: NATURAL } },"
+ + "replication: { replicaOperationRetryInterval: 0 }";
+ } else {
+ assert txRetryMode.equals("replicaOperationRetry");
+
+ return "transaction: { deadlockPreventionPolicy: { waitTimeout: 0,
txIdComparator: NATURAL } },"
+ + "replication: { replicaOperationRetryInterval: 10 }";
+ }
+ }
+
+ /**
+ * Benchmark's entry point.
+ */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" +
TxBalanceRetryOperationBenchmark.class.getSimpleName() + ".*")
+ .build();
+
+ new Runner(opt).run();
+ }
+
+ /**
+ * Setup.
+ */
+ @Setup(Level.Trial)
+ public void setup() {
+ recordView = publicIgnite.tables().table(TABLE_NAME).recordView();
+ transactions = publicIgnite.transactions();
+
+ Transaction tx = transactions.begin();
+
+ for (int i = 0; i < keysUpperBound; i++) {
+ recordView.insert(tx, Tuple.create().set("id", i).set("amount",
1000.0f));
+ }
+
+ tx.commit();
+ }
+
+ /**
+ * Print counters.
+ *
+ * @param counters Counters.
+ */
+ @TearDown(Level.Iteration)
+ public void printCounters(TxnCounters counters) {
+ LOG.info("Total txns: " + counters.txnCounter.get());
+ LOG.info("Rolled back txns: " + counters.rollbackCounter.get());
+ counters.reset();
+ }
+
+ @Override
+ protected void createTable(String tableName) {
+ createTable(tableName,
+ List.of(
+ "id int",
+ "amount float"
+ ),
+ List.of("id"),
+ List.of()
+ );
+ }
+
+ /**
+ * Perform transaction.
+ *
+ * @param state Benchmark state.
+ */
+ @Benchmark
+ public void doTx(BenchmarkState state) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Transaction tx = state.tx;
+
+ int from = random.nextInt(keysUpperBound);
+ int to = from;
+ while (to == from) {
+ to = random.nextInt(keysUpperBound);
+ }
+
+ try {
+ float amountFrom = recordView.get(tx, Tuple.create().set("id",
from)).value("amount");
+ float amountTo = recordView.get(tx, Tuple.create().set("id",
to)).value("amount");
+
+ recordView.upsert(tx, Tuple.create().set("id", from).set("amount",
amountFrom - 10));
+ recordView.upsert(tx, Tuple.create().set("id", to).set("amount",
amountTo + 10));
+ } catch (Exception e) {
+ state.toBeRolledBack = true;
+ }
+ }
+
+ /**
+ * Transaction state.
+ */
+ @State(Scope.Thread)
+ public static class BenchmarkState {
+ InternalTransaction tx;
+ boolean toBeRolledBack;
+
+ /**
+ * Start transaction.
+ *
+ * @param counters Counters.
+ */
+ @Setup(Level.Invocation)
+ public void startTx(TxnCounters counters) {
+ tx = (InternalTransaction) transactions.begin();
+ toBeRolledBack = false;
+ counters.txnCounter.incrementAndGet();
+ }
+
+ /**
+ * Finish transaction.
+ *
+ * @param counters Counters.
+ */
+ @TearDown(Level.Invocation)
+ public void finishTx(TxnCounters counters) {
+ if (toBeRolledBack) {
+ try {
+ tx.rollback();
+ } catch (MismatchingTransactionOutcomeException ex) {
+ // No-op.
+ }
+
+ counters.rollbackCounter.incrementAndGet();
+ }
+ try {
+ tx.commit();
+ } catch (MismatchingTransactionOutcomeException ex) {
+ counters.rollbackCounter.incrementAndGet();
+ }
+ }
+ }
+
+ /**
+ * Transaction counters.
+ */
+ @State(Scope.Benchmark)
+ public static class TxnCounters {
+ AtomicInteger txnCounter = new AtomicInteger();
+ AtomicInteger rollbackCounter = new AtomicInteger();
+
+ void reset() {
+ txnCounter.set(0);
+ rollbackCounter.set(0);
+ }
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxUpsertRetryOperationBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxUpsertRetryOperationBenchmark.java
new file mode 100644
index 0000000000..8daf4e2f63
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TxUpsertRetryOperationBenchmark.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.MismatchingTransactionOutcomeException;
+import org.apache.ignite.tx.Transaction;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark that tests lock conflicts on concurrent upserts.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(8)
+@Warmup(iterations = 10, time = 5)
+@Measurement(iterations = 20, time = 5)
+@BenchmarkMode({Mode.AverageTime, Mode.Throughput})
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TxUpsertRetryOperationBenchmark extends
AbstractMultiNodeBenchmark {
+ private static final IgniteLogger LOG =
Loggers.forClass(TxBalanceRetryOperationBenchmark.class);
+
+ private static RecordView<Tuple> recordView;
+
+ private static IgniteTransactions transactions;
+
+ @Param({"false"})
+ private boolean fsync;
+
+ @Param({"100", "1000"})
+ private int keysUpperBound;
+
+ @Param({"waitTimeout", "replicaOperationRetry"})
+ private String txRetryMode;
+
+ @Override
+ protected String clusterConfiguration() {
+ if (txRetryMode.equals("waitTimeout")) {
+ return "transaction: { deadlockPreventionPolicy: { waitTimeout:
30, txIdComparator: NATURAL } },"
+ + "replication: { replicaOperationRetryInterval: 0 }";
+ } else {
+ assert txRetryMode.equals("replicaOperationRetry");
+
+ return "transaction: { deadlockPreventionPolicy: { waitTimeout: 0,
txIdComparator: NATURAL } },"
+ + "replication: { replicaOperationRetryInterval: 10 }";
+ }
+ }
+
+ /**
+ * Benchmark's entry point.
+ */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" +
TxUpsertRetryOperationBenchmark.class.getSimpleName() + ".*")
+ .build();
+
+ new Runner(opt).run();
+ }
+
+ /**
+ * Setup.
+ */
+ @Setup(Level.Trial)
+ public void setup() {
+ recordView = publicIgnite.tables().table(TABLE_NAME).recordView();
+ transactions = publicIgnite.transactions();
+
+ Transaction tx = transactions.begin();
+
+ for (int i = 0; i < keysUpperBound; i++) {
+ recordView.insert(tx, Tuple.create().set("id", i).set("amount",
1000.0f));
+ }
+
+ tx.commit();
+ }
+
+ /**
+ * Print counters.
+ *
+ * @param counters Counters.
+ */
+ @TearDown(Level.Iteration)
+ public void printCounters(TxnCounters counters) {
+ LOG.info("Total txns: " + counters.txnCounter.get());
+ LOG.info("Rolled back txns: " + counters.rollbackCounter.get());
+ counters.reset();
+ }
+
+ @Override
+ protected void createTable(String tableName) {
+ createTable(tableName,
+ List.of(
+ "id int",
+ "amount float"
+ ),
+ List.of("id"),
+ List.of()
+ );
+ }
+
+ /**
+ * Perform a transaction.
+ *
+ * @param state Benchmark state.
+ */
+ @Benchmark
+ public void doTx(BenchmarkState state) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Transaction tx = state.tx;
+
+ int key = random.nextInt(keysUpperBound);
+
+ try {
+ Double v = random.nextDouble(1000.0);
+ recordView.upsert(tx, Tuple.create().set("id", key).set("amount",
v.floatValue()));
+ } catch (Exception e) {
+ state.toBeRolledBack = true;
+ }
+ }
+
+ /**
+ * Benchmark state.
+ */
+ @State(Scope.Thread)
+ public static class BenchmarkState {
+ InternalTransaction tx;
+ boolean toBeRolledBack;
+
+ /**
+ * Start transaction.
+ */
+ @Setup(Level.Invocation)
+ public void startTx(TxnCounters counters) {
+ tx = (InternalTransaction) transactions.begin();
+ toBeRolledBack = false;
+ counters.txnCounter.incrementAndGet();
+ }
+
+ /**
+ * Finish transaction.
+ */
+ @TearDown(Level.Invocation)
+ public void finishTx(TxnCounters counters) {
+ if (toBeRolledBack) {
+ try {
+ tx.rollback();
+ } catch (MismatchingTransactionOutcomeException ex) {
+ // No-op.
+ }
+
+ counters.rollbackCounter.incrementAndGet();
+ }
+ try {
+ tx.commit();
+ } catch (MismatchingTransactionOutcomeException ex) {
+ counters.rollbackCounter.incrementAndGet();
+ }
+ }
+ }
+
+ /**
+ * Transaction counters.
+ */
+ @State(Scope.Benchmark)
+ public static class TxnCounters {
+ AtomicInteger txnCounter = new AtomicInteger();
+ AtomicInteger rollbackCounter = new AtomicInteger();
+
+ void reset() {
+ txnCounter.set(0);
+ rollbackCounter.set(0);
+ }
+ }
+}