This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e7cb0881aa5 IGNITE-25827 Add FlushConsolidationHandler on client and
server
e7cb0881aa5 is described below
commit e7cb0881aa5bfd0a4f2ec448aace420c29c527f3
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Mon Jul 21 16:17:41 2025 +0300
IGNITE-25827 Add FlushConsolidationHandler on client and server
---
.../ignite/internal/client/proto/ClientOp.java | 43 ++++++++
.../ignite/client/handler/ClientHandlerModule.java | 2 +
.../handler/ClientInboundMessageHandler.java | 35 +------
.../io/netty/NettyClientConnectionMultiplexer.java | 2 +
.../org/apache/ignite/client/RetryPolicyTest.java | 2 +-
modules/runner/build.gradle | 16 ++-
.../internal/benchmark/ClientKvBenchmark.java | 87 +++++++++-------
.../internal/benchmark/ClientKvGetBenchmark.java | 109 +++++++++++++++++++++
.../internal/benchmark/ClientKvPutBenchmark.java | 44 +++++++++
.../benchmark/ClientKvPutManyBenchmark.java | 53 ++++++++++
.../internal/benchmark/RemoteKvBenchmark.java | 32 +-----
.../internal/benchmark/UpsertKvBenchmark.java | 4 +-
12 files changed, 322 insertions(+), 107 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index f90ee3a4389..d0a3f5bb0b5 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -214,6 +214,9 @@ public class ClientOp {
/** Batch mask. */
private static final BitSet BATCH_MASK = new BitSet(64);
+ /** Partition operation mask. */
+ private static final BitSet OP_MASK = new BitSet(64);
+
static {
WRITE_MASK.set(TUPLE_UPSERT);
WRITE_MASK.set(TUPLE_GET_AND_UPSERT);
@@ -235,6 +238,22 @@ public class ClientOp {
BATCH_MASK.set(TUPLE_INSERT_ALL);
BATCH_MASK.set(TUPLE_DELETE_ALL);
BATCH_MASK.set(TUPLE_DELETE_ALL_EXACT);
+
+ OP_MASK.set(TABLES_GET);
+ OP_MASK.set(TUPLE_UPSERT);
+ OP_MASK.set(TUPLE_GET);
+ OP_MASK.set(TUPLE_GET_AND_UPSERT);
+ OP_MASK.set(TUPLE_INSERT);
+ OP_MASK.set(TUPLE_REPLACE);
+ OP_MASK.set(TUPLE_REPLACE_EXACT);
+ OP_MASK.set(TUPLE_GET_AND_REPLACE);
+ OP_MASK.set(TUPLE_DELETE);
+ OP_MASK.set(TUPLE_DELETE_EXACT);
+ OP_MASK.set(TUPLE_GET_AND_DELETE);
+ OP_MASK.set(TUPLE_CONTAINS_KEY);
+ OP_MASK.set(STREAMER_BATCH_SEND);
+ OP_MASK.set(TX_COMMIT);
+ OP_MASK.set(TX_ROLLBACK);
}
/**
@@ -256,4 +275,28 @@ public class ClientOp {
public static boolean isBatch(int opCode) {
return BATCH_MASK.get(opCode);
}
+
+ /**
+ * Test if the partition operation.
+ *
+ * @param opCode The operation code.
+ * @return The status.
+ */
+ public static boolean isPartitionOperation(int opCode) {
+ // Sql-related operation must do some bookkeeping first on the
client's thread to avoid races
+ // (for instance, cancellation must not be processed until execution
request is registered).
+ // || opCode == ClientOp.SQL_EXEC
+ // || opCode == ClientOp.SQL_EXEC_BATCH
+ // || opCode == ClientOp.SQL_EXEC_SCRIPT
+ // || opCode == ClientOp.SQL_QUERY_META;
+
+ // TODO: IGNITE-23641 The batch operations were excluded because fast
switching leads to performance degradation for them.
+ // || opCode == ClientOp.TUPLE_UPSERT_ALL
+ // || opCode == ClientOp.TUPLE_GET_ALL
+ // || opCode == ClientOp.TUPLE_INSERT_ALL
+ // || opCode == ClientOp.TUPLE_DELETE_ALL
+ // || opCode == ClientOp.TUPLE_DELETE_ALL_EXACT
+ // || opCode == ClientOp.TUPLE_CONTAINS_ALL_KEYS;
+ return OP_MASK.get(opCode);
+ }
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index d185f3fcddc..bbd890c4274 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.BindException;
@@ -351,6 +352,7 @@ public class ClientHandlerModule implements
IgniteComponent, PlatformComputeTran
handler = messageHandler;
ch.pipeline().addLast(
+ new
FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES,
true),
new ClientMessageDecoder(),
messageHandler
);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index bc39fd8893e..98496108d81 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -733,7 +733,7 @@ public class ClientInboundMessageHandler
return;
}
- if (isPartitionOperation(opCode)) {
+ if (ClientOp.isPartitionOperation(opCode)) {
long requestId0 = requestId;
int opCode0 = opCode;
@@ -999,39 +999,6 @@ public class ClientInboundMessageHandler
}
}
- private static boolean isPartitionOperation(int opCode) {
- return opCode == ClientOp.TABLES_GET
- || opCode == ClientOp.TUPLE_UPSERT
- || opCode == ClientOp.TUPLE_GET
- || opCode == ClientOp.TUPLE_GET_AND_UPSERT
- || opCode == ClientOp.TUPLE_INSERT
- || opCode == ClientOp.TUPLE_REPLACE
- || opCode == ClientOp.TUPLE_REPLACE_EXACT
- || opCode == ClientOp.TUPLE_GET_AND_REPLACE
- || opCode == ClientOp.TUPLE_DELETE
- || opCode == ClientOp.TUPLE_DELETE_EXACT
- || opCode == ClientOp.TUPLE_GET_AND_DELETE
- || opCode == ClientOp.TUPLE_CONTAINS_KEY
- || opCode == ClientOp.STREAMER_BATCH_SEND
- || opCode == ClientOp.TX_COMMIT
- || opCode == ClientOp.TX_ROLLBACK;
-
- // Sql-related operation must do some bookkeeping first on the
client's thread to avoid races
- // (for instance, cancellation must not be processed until
execution request is registered).
- // || opCode == ClientOp.SQL_EXEC
- // || opCode == ClientOp.SQL_EXEC_BATCH
- // || opCode == ClientOp.SQL_EXEC_SCRIPT
- // || opCode == ClientOp.SQL_QUERY_META;
-
- // TODO: IGNITE-23641 The batch operations were excluded
because fast switching leads to performance degradation for them.
- // || opCode == ClientOp.TUPLE_UPSERT_ALL
- // || opCode == ClientOp.TUPLE_GET_ALL
- // || opCode == ClientOp.TUPLE_INSERT_ALL
- // || opCode == ClientOp.TUPLE_DELETE_ALL
- // || opCode == ClientOp.TUPLE_DELETE_ALL_EXACT
- // || opCode == ClientOp.TUPLE_CONTAINS_ALL_KEYS;
- }
-
private void processOperationInternal(
ChannelHandlerContext ctx,
ClientMessageUnpacker in,
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
index 8cb41a82168..91f7a9a5858 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import java.io.File;
@@ -89,6 +90,7 @@ public class NettyClientConnectionMultiplexer implements
ClientConnectionMultipl
}
ch.pipeline().addLast(
+ new
FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES,
true),
new ClientMessageDecoder(),
new NettyClientMessageHandler());
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 35acae23fb4..56db449b17e 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -213,7 +213,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest
{
var nullOpFields = new ArrayList<String>();
for (var field : ClientOp.class.getDeclaredFields()) {
- if ("WRITE_MASK".equals(field.getName()) ||
"BATCH_MASK".equals(field.getName())) {
+ if ("WRITE_MASK".equals(field.getName()) ||
"BATCH_MASK".equals(field.getName()) || "OP_MASK".equals(field.getName())) {
continue;
}
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 54ebeb35852..e1f20d4fe06 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -263,8 +263,18 @@ tasks.register('runUpsertBenchmark', JavaExec) {
enableAssertions = true
}
-tasks.register('runClientBenchmark', JavaExec) {
- mainClass = 'org.apache.ignite.internal.benchmark.ClientKvBenchmark'
+tasks.register('runClientPutBenchmark', JavaExec) {
+ mainClass = 'org.apache.ignite.internal.benchmark.ClientKvPutBenchmark'
+
+ jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true',
'-Xmx16g']
+
+ classpath = sourceSets.integrationTest.runtimeClasspath
+
+ enableAssertions = true
+}
+
+tasks.register('runClientGetBenchmark', JavaExec) {
+ mainClass = 'org.apache.ignite.internal.benchmark.ClientKvGetBenchmark'
jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true',
'-Xmx16g']
@@ -279,6 +289,8 @@ tasks.register('runRemoteBenchmark', JavaExec) {
jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true',
'-Xmx16g']
classpath = sourceSets.integrationTest.runtimeClasspath
+
+ enableAssertions = true
}
jar {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
index f584b813b46..e3fb22ba783 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.benchmark;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.client.IgniteClient;
@@ -26,8 +29,6 @@ import org.apache.ignite.internal.client.table.ClientTable;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
-import org.apache.ignite.tx.Transaction;
-import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
@@ -37,11 +38,10 @@ import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
-import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
import org.openjdk.jmh.runner.options.OptionsBuilder;
/**
@@ -49,23 +49,21 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
*/
@State(Scope.Benchmark)
@Fork(1)
-@Threads(4)
@Warmup(iterations = 10, time = 2)
@Measurement(iterations = 20, time = 2)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
-public class ClientKvBenchmark extends AbstractMultiNodeBenchmark {
- private final Tuple tuple = Tuple.create();
+public abstract class ClientKvBenchmark extends AbstractMultiNodeBenchmark {
+ protected static final int DEFAULT_THREADS_COUNT = 1;
+
+ protected final Tuple tuple = Tuple.create();
protected IgniteClient client;
- private KeyValueView<Tuple, Tuple> kvView;
+ protected KeyValueView<Tuple, Tuple> kvView;
@Param({"0"})
- private int offset; // 1073741824 for second client to ensure unique keys
-
- @Param({"5"})
- private int batch;
+ protected int offset; // 1073741824 for second client to ensure unique keys
@Param({"false"})
private boolean fsync;
@@ -73,12 +71,21 @@ public class ClientKvBenchmark extends
AbstractMultiNodeBenchmark {
@Param({"32"})
private int partitionCount;
+ @Param({"" + DEFAULT_THREADS_COUNT})
+ protected int threads;
+
private final AtomicInteger counter = new AtomicInteger();
private final ThreadLocal<Integer> gen = ThreadLocal.withInitial(() ->
offset + counter.getAndIncrement() * 20_000_000);
protected String[] addresses() {
- return new String[]{"127.0.0.1:10800", "127.0.0.1:10801"};
+ String[] nodes = new String[nodes()];
+
+ for (int i = 0; i < nodes.length; i++) {
+ nodes[i] = "127.0.0.1:1080" + i;
+ }
+
+ return nodes;
}
@Override
@@ -108,36 +115,42 @@ public class ClientKvBenchmark extends
AbstractMultiNodeBenchmark {
super.nodeTearDown();
}
- /**
- * Benchmark for KV upsert via embedded client.
- */
- @Benchmark
- public void upsert() {
- Transaction tx = client.transactions().begin();
- for (int i = 0; i < batch; i++) {
- Tuple key = Tuple.create().set("ycsb_key", nextId());
- kvView.put(tx, key, tuple);
- }
- tx.commit();
- }
-
- private int nextId() {
+ protected int nextId() {
int cur = gen.get() + 1;
gen.set(cur);
return cur;
}
- /**
- * Benchmark's entry point.
- */
- public static void main(String[] args) throws RunnerException {
- Options opt = new OptionsBuilder()
- .include(".*" + ClientKvBenchmark.class.getSimpleName() + ".*")
+ protected int nextId(ThreadLocal<Integer> base, ThreadLocal<Long> gen) {
+ long cur = gen.get();
+ gen.set(cur + 1);
+ return (int) (base.get() + cur);
+ }
+
+ static void runBenchmark(Class<?> cls, String[] args) throws
RunnerException {
+ Map<String, String> params = new HashMap<>(args.length);
+
+ for (int i = 0; i < args.length; i++) {
+ String arg = args[i];
+ if (arg.startsWith("jmh.")) {
+ String[] av = arg.substring(4).split("=");
+ params.put(av[0], av[1]);
+ }
+ }
+
+ final String threadsParamName = "threads";
+ int threadsCount = params.containsKey(threadsParamName) ?
Integer.parseInt(params.get(threadsParamName)) : DEFAULT_THREADS_COUNT;
+ ChainedOptionsBuilder builder = new OptionsBuilder()
+ .include(".*" + cls.getSimpleName() + ".*")
// .jvmArgsAppend("-Djmh.executor=VIRTUAL")
- // .addProfiler(JavaFlightRecorderProfiler.class,
"configName=profile.jfc")
- .build();
+ // .addProfiler(JavaFlightRecorderProfiler.class,
"configName=profile.jfc");
+ .threads(threadsCount);
- new Runner(opt).run();
+ for (Entry<String, String> entry : params.entrySet()) {
+ builder.param(entry.getKey(), entry.getValue());
+ }
+
+ new Runner(builder.build()).run();
}
@Override
@@ -147,7 +160,7 @@ public class ClientKvBenchmark extends
AbstractMultiNodeBenchmark {
@Override
protected int nodes() {
- return 2;
+ return 1;
}
@Override
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
new file mode 100644
index 00000000000..a2ab9b6653d
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.table.Tuple;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.runner.RunnerException;
+
+/**
+ * Put then Get benchmark.
+ */
+public class ClientKvGetBenchmark extends ClientKvBenchmark {
+ @Param("1000")
+ private int keysPerThread;
+
+ @Param("1000")
+ private int loadBatchSize;
+
+ @Override
+ public void setUp() {
+ super.setUp();
+
+ AtomicInteger locCounter = new AtomicInteger();
+ ThreadLocal<Integer> locBase = ThreadLocal.withInitial(() -> offset +
locCounter.getAndIncrement() * 20_000_000);
+ ThreadLocal<Long> locGen = ThreadLocal.withInitial(() -> 0L);
+
+ Thread[] pool = new Thread[threads];
+
+ System.out.println("Start loading: " + threads);
+
+ for (int i = 0; i < pool.length; i++) {
+ pool[i] = new Thread(() -> {
+ Map<Tuple, Tuple> batch = new HashMap<>(loadBatchSize);
+
+ for (int i1 = 0; i1 < keysPerThread; i1++) {
+ Tuple key = Tuple.create().set("ycsb_key", nextId(locBase,
locGen));
+ batch.put(key, tuple);
+
+ if (batch.size() == loadBatchSize) {
+ kvView.putAll(null, batch);
+ batch.clear();
+ }
+ }
+ });
+ }
+
+ for (Thread thread : pool) {
+ thread.start();
+ }
+
+ for (Thread thread : pool) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ System.out.println("Finish loading");
+ }
+
+ private final AtomicInteger counter = new AtomicInteger();
+
+ private final ThreadLocal<Integer> base = ThreadLocal.withInitial(() ->
offset + counter.getAndIncrement() * 20_000_000);
+
+ private final ThreadLocal<long[]> gen = ThreadLocal.withInitial(() -> new
long[1]);
+
+ /**
+ * Benchmark for KV upsert via embedded client.
+ */
+ @Benchmark
+ public void get() {
+ long[] cur = gen.get();
+ cur[0] = cur[0] + 1;
+
+ int id = (int) (base.get() + cur[0] % keysPerThread);
+
+ Tuple key = Tuple.create().set("ycsb_key", id);
+ Tuple val = kvView.get(null, key);
+ assert val != null : Thread.currentThread().getName() + " " + key;
+ }
+
+ /**
+ * Benchmark's entry point. Can be started from command line:
+ * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10
jmh.threads=1'
+ */
+ public static void main(String[] args) throws RunnerException {
+ runBenchmark(ClientKvGetBenchmark.class, args);
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutBenchmark.java
new file mode 100644
index 00000000000..bfee1c88e00
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutBenchmark.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import org.apache.ignite.table.Tuple;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.runner.RunnerException;
+
+/**
+ * Put benchmark.
+ */
+public class ClientKvPutBenchmark extends ClientKvBenchmark {
+ /**
+ * Benchmark for KV upsert via embedded client.
+ */
+ @Benchmark
+ public void upsert() {
+ Tuple key = Tuple.create().set("ycsb_key", nextId());
+ kvView.put(null, key, tuple);
+ }
+
+ /**
+ * Benchmark's entry point. Can be started from command line:
+ * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10
jmh.threads=1'
+ */
+ public static void main(String[] args) throws RunnerException {
+ runBenchmark(ClientKvPutBenchmark.class, args);
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutManyBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutManyBenchmark.java
new file mode 100644
index 00000000000..5a56060e2ea
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutManyBenchmark.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.runner.RunnerException;
+
+/**
+ * Put many benchmark.
+ */
+public class ClientKvPutManyBenchmark extends ClientKvBenchmark {
+ @Param({"5"})
+ private int batch;
+
+ /**
+ * Benchmark for KV upsert via embedded client.
+ */
+ @Benchmark
+ public void upsert() {
+ Transaction tx = client.transactions().begin();
+ for (int i = 0; i < batch; i++) {
+ Tuple key = Tuple.create().set("ycsb_key", nextId());
+ kvView.put(tx, key, tuple);
+ }
+ tx.commit();
+ }
+
+ /**
+ * Benchmark's entry point. Can be started from command line:
+ * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10
jmh.threads=1'
+ */
+ public static void main(String[] args) throws RunnerException {
+ runBenchmark(ClientKvPutManyBenchmark.class, args);
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
index 1cf5d761036..8f3cae627d3 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
@@ -17,33 +17,12 @@
package org.apache.ignite.internal.benchmark;
-import java.util.concurrent.TimeUnit;
import org.apache.ignite.client.IgniteClient;
-import org.apache.ignite.internal.lang.IgniteSystemProperties;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.annotations.Warmup;
-import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
/**
* Benchmark for a single upsert operation using externally enabled cluster.
*/
-@State(Scope.Benchmark)
-@Fork(1)
-@Threads(64)
-@Warmup(iterations = 10, time = 2)
-@Measurement(iterations = 20, time = 2)
-@BenchmarkMode(Mode.Throughput)
-@OutputTimeUnit(TimeUnit.SECONDS)
public class RemoteKvBenchmark extends ClientKvBenchmark {
@Override
protected boolean remote() {
@@ -57,9 +36,6 @@ public class RemoteKvBenchmark extends ClientKvBenchmark {
@Override
public void nodeSetUp() throws Exception {
-
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK,
"false");
-
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
"false");
-
client = IgniteClient.builder().addresses(addresses()).build();
publicIgnite = client;
@@ -70,12 +46,6 @@ public class RemoteKvBenchmark extends ClientKvBenchmark {
* Benchmark's entry point.
*/
public static void main(String[] args) throws RunnerException {
- Options opt = new OptionsBuilder()
- .include(".*" + RemoteKvBenchmark.class.getSimpleName() + ".*")
- // .jvmArgsAppend("-Djmh.executor=VIRTUAL")
- // .addProfiler(JavaFlightRecorderProfiler.class,
"configName=profile.jfc")
- .build();
-
- new Runner(opt).run();
+ runBenchmark(RemoteKvBenchmark.class, args);
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
index cdfe73a74cd..23be0ce86c3 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
@@ -87,8 +87,8 @@ public class UpsertKvBenchmark extends
AbstractMultiNodeBenchmark {
@Override
public void nodeSetUp() throws Exception {
-
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK,
"true");
-
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
"true");
+
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK,
"false");
+
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
"false");
super.nodeSetUp();
}