This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e7cb0881aa5 IGNITE-25827 Add FlushConsolidationHandler on client and 
server
e7cb0881aa5 is described below

commit e7cb0881aa5bfd0a4f2ec448aace420c29c527f3
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Mon Jul 21 16:17:41 2025 +0300

    IGNITE-25827 Add FlushConsolidationHandler on client and server
---
 .../ignite/internal/client/proto/ClientOp.java     |  43 ++++++++
 .../ignite/client/handler/ClientHandlerModule.java |   2 +
 .../handler/ClientInboundMessageHandler.java       |  35 +------
 .../io/netty/NettyClientConnectionMultiplexer.java |   2 +
 .../org/apache/ignite/client/RetryPolicyTest.java  |   2 +-
 modules/runner/build.gradle                        |  16 ++-
 .../internal/benchmark/ClientKvBenchmark.java      |  87 +++++++++-------
 .../internal/benchmark/ClientKvGetBenchmark.java   | 109 +++++++++++++++++++++
 .../internal/benchmark/ClientKvPutBenchmark.java   |  44 +++++++++
 .../benchmark/ClientKvPutManyBenchmark.java        |  53 ++++++++++
 .../internal/benchmark/RemoteKvBenchmark.java      |  32 +-----
 .../internal/benchmark/UpsertKvBenchmark.java      |   4 +-
 12 files changed, 322 insertions(+), 107 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index f90ee3a4389..d0a3f5bb0b5 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -214,6 +214,9 @@ public class ClientOp {
     /** Batch mask. */
     private static final BitSet BATCH_MASK = new BitSet(64);
 
+    /** Partition operation mask. */
+    private static final BitSet OP_MASK = new BitSet(64);
+
     static {
         WRITE_MASK.set(TUPLE_UPSERT);
         WRITE_MASK.set(TUPLE_GET_AND_UPSERT);
@@ -235,6 +238,22 @@ public class ClientOp {
         BATCH_MASK.set(TUPLE_INSERT_ALL);
         BATCH_MASK.set(TUPLE_DELETE_ALL);
         BATCH_MASK.set(TUPLE_DELETE_ALL_EXACT);
+
+        OP_MASK.set(TABLES_GET);
+        OP_MASK.set(TUPLE_UPSERT);
+        OP_MASK.set(TUPLE_GET);
+        OP_MASK.set(TUPLE_GET_AND_UPSERT);
+        OP_MASK.set(TUPLE_INSERT);
+        OP_MASK.set(TUPLE_REPLACE);
+        OP_MASK.set(TUPLE_REPLACE_EXACT);
+        OP_MASK.set(TUPLE_GET_AND_REPLACE);
+        OP_MASK.set(TUPLE_DELETE);
+        OP_MASK.set(TUPLE_DELETE_EXACT);
+        OP_MASK.set(TUPLE_GET_AND_DELETE);
+        OP_MASK.set(TUPLE_CONTAINS_KEY);
+        OP_MASK.set(STREAMER_BATCH_SEND);
+        OP_MASK.set(TX_COMMIT);
+        OP_MASK.set(TX_ROLLBACK);
     }
 
     /**
@@ -256,4 +275,28 @@ public class ClientOp {
     public static boolean isBatch(int opCode) {
         return BATCH_MASK.get(opCode);
     }
+
+    /**
+     * Test if the partition operation.
+     *
+     * @param opCode The operation code.
+     * @return The status.
+     */
+    public static boolean isPartitionOperation(int opCode) {
+        // Sql-related operation must do some bookkeeping first on the 
client's thread to avoid races
+        // (for instance, cancellation must not be processed until execution 
request is registered).
+        // || opCode == ClientOp.SQL_EXEC
+        // || opCode == ClientOp.SQL_EXEC_BATCH
+        // || opCode == ClientOp.SQL_EXEC_SCRIPT
+        // || opCode == ClientOp.SQL_QUERY_META;
+
+        // TODO: IGNITE-23641 The batch operations were excluded because fast 
switching leads to performance degradation for them.
+        // || opCode == ClientOp.TUPLE_UPSERT_ALL
+        // || opCode == ClientOp.TUPLE_GET_ALL
+        // || opCode == ClientOp.TUPLE_INSERT_ALL
+        // || opCode == ClientOp.TUPLE_DELETE_ALL
+        // || opCode == ClientOp.TUPLE_DELETE_ALL_EXACT
+        // || opCode == ClientOp.TUPLE_CONTAINS_ALL_KEYS;
+        return OP_MASK.get(opCode);
+    }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index d185f3fcddc..bbd890c4274 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.timeout.IdleStateHandler;
 import java.net.BindException;
@@ -351,6 +352,7 @@ public class ClientHandlerModule implements 
IgniteComponent, PlatformComputeTran
                             handler = messageHandler;
 
                             ch.pipeline().addLast(
+                                    new 
FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES,
 true),
                                     new ClientMessageDecoder(),
                                     messageHandler
                             );
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index bc39fd8893e..98496108d81 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -733,7 +733,7 @@ public class ClientInboundMessageHandler
                 return;
             }
 
-            if (isPartitionOperation(opCode)) {
+            if (ClientOp.isPartitionOperation(opCode)) {
                 long requestId0 = requestId;
                 int opCode0 = opCode;
 
@@ -999,39 +999,6 @@ public class ClientInboundMessageHandler
         }
     }
 
-    private static boolean isPartitionOperation(int opCode) {
-        return opCode == ClientOp.TABLES_GET
-                || opCode == ClientOp.TUPLE_UPSERT
-                || opCode == ClientOp.TUPLE_GET
-                || opCode == ClientOp.TUPLE_GET_AND_UPSERT
-                || opCode == ClientOp.TUPLE_INSERT
-                || opCode == ClientOp.TUPLE_REPLACE
-                || opCode == ClientOp.TUPLE_REPLACE_EXACT
-                || opCode == ClientOp.TUPLE_GET_AND_REPLACE
-                || opCode == ClientOp.TUPLE_DELETE
-                || opCode == ClientOp.TUPLE_DELETE_EXACT
-                || opCode == ClientOp.TUPLE_GET_AND_DELETE
-                || opCode == ClientOp.TUPLE_CONTAINS_KEY
-                || opCode == ClientOp.STREAMER_BATCH_SEND
-                || opCode == ClientOp.TX_COMMIT
-                || opCode == ClientOp.TX_ROLLBACK;
-
-                // Sql-related operation must do some bookkeeping first on the 
client's thread to avoid races
-                // (for instance, cancellation must not be processed until 
execution request is registered).
-                // || opCode == ClientOp.SQL_EXEC
-                // || opCode == ClientOp.SQL_EXEC_BATCH
-                // || opCode == ClientOp.SQL_EXEC_SCRIPT
-                // || opCode == ClientOp.SQL_QUERY_META;
-
-                // TODO: IGNITE-23641 The batch operations were excluded 
because fast switching leads to performance degradation for them.
-                // || opCode == ClientOp.TUPLE_UPSERT_ALL
-                // || opCode == ClientOp.TUPLE_GET_ALL
-                // || opCode == ClientOp.TUPLE_INSERT_ALL
-                // || opCode == ClientOp.TUPLE_DELETE_ALL
-                // || opCode == ClientOp.TUPLE_DELETE_ALL_EXACT
-                // || opCode == ClientOp.TUPLE_CONTAINS_ALL_KEYS;
-    }
-
     private void processOperationInternal(
             ChannelHandlerContext ctx,
             ClientMessageUnpacker in,
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
index 8cb41a82168..91f7a9a5858 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import java.io.File;
@@ -89,6 +90,7 @@ public class NettyClientConnectionMultiplexer implements 
ClientConnectionMultipl
                     }
 
                     ch.pipeline().addLast(
+                            new 
FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES,
 true),
                             new ClientMessageDecoder(),
                             new NettyClientMessageHandler());
                 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 35acae23fb4..56db449b17e 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -213,7 +213,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest 
{
         var nullOpFields = new ArrayList<String>();
 
         for (var field : ClientOp.class.getDeclaredFields()) {
-            if ("WRITE_MASK".equals(field.getName()) || 
"BATCH_MASK".equals(field.getName())) {
+            if ("WRITE_MASK".equals(field.getName()) || 
"BATCH_MASK".equals(field.getName()) || "OP_MASK".equals(field.getName())) {
                 continue;
             }
 
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 54ebeb35852..e1f20d4fe06 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -263,8 +263,18 @@ tasks.register('runUpsertBenchmark', JavaExec) {
     enableAssertions = true
 }
 
-tasks.register('runClientBenchmark', JavaExec) {
-    mainClass = 'org.apache.ignite.internal.benchmark.ClientKvBenchmark'
+tasks.register('runClientPutBenchmark', JavaExec) {
+    mainClass = 'org.apache.ignite.internal.benchmark.ClientKvPutBenchmark'
+
+    jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true', 
'-Xmx16g']
+
+    classpath = sourceSets.integrationTest.runtimeClasspath
+
+    enableAssertions = true
+}
+
+tasks.register('runClientGetBenchmark', JavaExec) {
+    mainClass = 'org.apache.ignite.internal.benchmark.ClientKvGetBenchmark'
 
     jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true', 
'-Xmx16g']
 
@@ -279,6 +289,8 @@ tasks.register('runRemoteBenchmark', JavaExec) {
     jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true', 
'-Xmx16g']
 
     classpath = sourceSets.integrationTest.runtimeClasspath
+
+    enableAssertions = true
 }
 
 jar {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
index f584b813b46..e3fb22ba783 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.benchmark;
 
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.client.IgniteClient;
@@ -26,8 +29,6 @@ import org.apache.ignite.internal.client.table.ClientTable;
 import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Tuple;
-import org.apache.ignite.tx.Transaction;
-import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Measurement;
@@ -37,11 +38,10 @@ import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.Threads;
 import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.RunnerException;
-import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 
 /**
@@ -49,23 +49,21 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
  */
 @State(Scope.Benchmark)
 @Fork(1)
-@Threads(4)
 @Warmup(iterations = 10, time = 2)
 @Measurement(iterations = 20, time = 2)
 @BenchmarkMode(Mode.Throughput)
 @OutputTimeUnit(TimeUnit.SECONDS)
-public class ClientKvBenchmark extends AbstractMultiNodeBenchmark {
-    private final Tuple tuple = Tuple.create();
+public abstract class ClientKvBenchmark extends AbstractMultiNodeBenchmark {
+    protected static final int DEFAULT_THREADS_COUNT = 1;
+
+    protected final Tuple tuple = Tuple.create();
 
     protected IgniteClient client;
 
-    private KeyValueView<Tuple, Tuple> kvView;
+    protected KeyValueView<Tuple, Tuple> kvView;
 
     @Param({"0"})
-    private int offset; // 1073741824 for second client to ensure unique keys
-
-    @Param({"5"})
-    private int batch;
+    protected int offset; // 1073741824 for second client to ensure unique keys
 
     @Param({"false"})
     private boolean fsync;
@@ -73,12 +71,21 @@ public class ClientKvBenchmark extends 
AbstractMultiNodeBenchmark {
     @Param({"32"})
     private int partitionCount;
 
+    @Param({"" + DEFAULT_THREADS_COUNT})
+    protected int threads;
+
     private final AtomicInteger counter = new AtomicInteger();
 
     private final ThreadLocal<Integer> gen = ThreadLocal.withInitial(() -> 
offset + counter.getAndIncrement() * 20_000_000);
 
     protected String[] addresses() {
-        return new String[]{"127.0.0.1:10800", "127.0.0.1:10801"};
+        String[] nodes = new String[nodes()];
+
+        for (int i = 0; i < nodes.length; i++) {
+            nodes[i] = "127.0.0.1:1080" + i;
+        }
+
+        return nodes;
     }
 
     @Override
@@ -108,36 +115,42 @@ public class ClientKvBenchmark extends 
AbstractMultiNodeBenchmark {
         super.nodeTearDown();
     }
 
-    /**
-     * Benchmark for KV upsert via embedded client.
-     */
-    @Benchmark
-    public void upsert() {
-        Transaction tx = client.transactions().begin();
-        for (int i = 0; i < batch; i++) {
-            Tuple key = Tuple.create().set("ycsb_key", nextId());
-            kvView.put(tx, key, tuple);
-        }
-        tx.commit();
-    }
-
-    private int nextId() {
+    protected int nextId() {
         int cur = gen.get() + 1;
         gen.set(cur);
         return cur;
     }
 
-    /**
-     * Benchmark's entry point.
-     */
-    public static void main(String[] args) throws RunnerException {
-        Options opt = new OptionsBuilder()
-                .include(".*" + ClientKvBenchmark.class.getSimpleName() + ".*")
+    protected int nextId(ThreadLocal<Integer> base, ThreadLocal<Long> gen) {
+        long cur = gen.get();
+        gen.set(cur + 1);
+        return (int) (base.get() + cur);
+    }
+
+    static void runBenchmark(Class<?> cls, String[] args) throws 
RunnerException {
+        Map<String, String> params = new HashMap<>(args.length);
+
+        for (int i = 0; i < args.length; i++) {
+            String arg = args[i];
+            if (arg.startsWith("jmh.")) {
+                String[] av = arg.substring(4).split("=");
+                params.put(av[0], av[1]);
+            }
+        }
+
+        final String threadsParamName = "threads";
+        int threadsCount = params.containsKey(threadsParamName) ? 
Integer.parseInt(params.get(threadsParamName)) : DEFAULT_THREADS_COUNT;
+        ChainedOptionsBuilder builder = new OptionsBuilder()
+                .include(".*" + cls.getSimpleName() + ".*")
                 // .jvmArgsAppend("-Djmh.executor=VIRTUAL")
-                // .addProfiler(JavaFlightRecorderProfiler.class, 
"configName=profile.jfc")
-                .build();
+                // .addProfiler(JavaFlightRecorderProfiler.class, 
"configName=profile.jfc");
+                .threads(threadsCount);
 
-        new Runner(opt).run();
+        for (Entry<String, String> entry : params.entrySet()) {
+            builder.param(entry.getKey(), entry.getValue());
+        }
+
+        new Runner(builder.build()).run();
     }
 
     @Override
@@ -147,7 +160,7 @@ public class ClientKvBenchmark extends 
AbstractMultiNodeBenchmark {
 
     @Override
     protected int nodes() {
-        return 2;
+        return 1;
     }
 
     @Override
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
new file mode 100644
index 00000000000..a2ab9b6653d
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.table.Tuple;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.runner.RunnerException;
+
+/**
+ * Put then Get benchmark.
+ */
+public class ClientKvGetBenchmark extends ClientKvBenchmark {
+    @Param("1000")
+    private int keysPerThread;
+
+    @Param("1000")
+    private int loadBatchSize;
+
+    @Override
+    public void setUp() {
+        super.setUp();
+
+        AtomicInteger locCounter = new AtomicInteger();
+        ThreadLocal<Integer> locBase = ThreadLocal.withInitial(() -> offset + 
locCounter.getAndIncrement() * 20_000_000);
+        ThreadLocal<Long> locGen = ThreadLocal.withInitial(() -> 0L);
+
+        Thread[] pool = new Thread[threads];
+
+        System.out.println("Start loading: " + threads);
+
+        for (int i = 0; i < pool.length; i++) {
+            pool[i] = new Thread(() -> {
+                Map<Tuple, Tuple> batch = new HashMap<>(loadBatchSize);
+
+                for (int i1 = 0; i1 < keysPerThread; i1++) {
+                    Tuple key = Tuple.create().set("ycsb_key", nextId(locBase, 
locGen));
+                    batch.put(key, tuple);
+
+                    if (batch.size() == loadBatchSize) {
+                        kvView.putAll(null, batch);
+                        batch.clear();
+                    }
+                }
+            });
+        }
+
+        for (Thread thread : pool) {
+            thread.start();
+        }
+
+        for (Thread thread : pool) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        System.out.println("Finish loading");
+    }
+
+    private final AtomicInteger counter = new AtomicInteger();
+
+    private final ThreadLocal<Integer> base = ThreadLocal.withInitial(() -> 
offset + counter.getAndIncrement() * 20_000_000);
+
+    private final ThreadLocal<long[]> gen = ThreadLocal.withInitial(() -> new 
long[1]);
+
+    /**
+     * Benchmark for KV upsert via embedded client.
+     */
+    @Benchmark
+    public void get() {
+        long[] cur = gen.get();
+        cur[0] = cur[0] + 1;
+
+        int id = (int) (base.get() + cur[0] % keysPerThread);
+
+        Tuple key = Tuple.create().set("ycsb_key", id);
+        Tuple val = kvView.get(null, key);
+        assert val != null : Thread.currentThread().getName() + " " + key;
+    }
+
+    /**
+     * Benchmark's entry point. Can be started from command line:
+     * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10 
jmh.threads=1'
+     */
+    public static void main(String[] args) throws RunnerException {
+        runBenchmark(ClientKvGetBenchmark.class, args);
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutBenchmark.java
new file mode 100644
index 00000000000..bfee1c88e00
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutBenchmark.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import org.apache.ignite.table.Tuple;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.runner.RunnerException;
+
+/**
+ * Put benchmark.
+ */
+public class ClientKvPutBenchmark extends ClientKvBenchmark {
+    /**
+     * Benchmark for KV upsert via embedded client.
+     */
+    @Benchmark
+    public void upsert() {
+        Tuple key = Tuple.create().set("ycsb_key", nextId());
+        kvView.put(null, key, tuple);
+    }
+
+    /**
+     * Benchmark's entry point. Can be started from command line:
+     * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10 
jmh.threads=1'
+     */
+    public static void main(String[] args) throws RunnerException {
+        runBenchmark(ClientKvPutBenchmark.class, args);
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutManyBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutManyBenchmark.java
new file mode 100644
index 00000000000..5a56060e2ea
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvPutManyBenchmark.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.runner.RunnerException;
+
+/**
+ * Put many benchmark.
+ */
+public class ClientKvPutManyBenchmark extends ClientKvBenchmark {
+    @Param({"5"})
+    private int batch;
+
+    /**
+     * Benchmark for KV upsert via embedded client.
+     */
+    @Benchmark
+    public void upsert() {
+        Transaction tx = client.transactions().begin();
+        for (int i = 0; i < batch; i++) {
+            Tuple key = Tuple.create().set("ycsb_key", nextId());
+            kvView.put(tx, key, tuple);
+        }
+        tx.commit();
+    }
+
+    /**
+     * Benchmark's entry point. Can be started from command line:
+     * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10 
jmh.threads=1'
+     */
+    public static void main(String[] args) throws RunnerException {
+        runBenchmark(ClientKvPutManyBenchmark.class, args);
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
index 1cf5d761036..8f3cae627d3 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
@@ -17,33 +17,12 @@
 
 package org.apache.ignite.internal.benchmark;
 
-import java.util.concurrent.TimeUnit;
 import org.apache.ignite.client.IgniteClient;
-import org.apache.ignite.internal.lang.IgniteSystemProperties;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.annotations.Warmup;
-import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.RunnerException;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
 
 /**
  * Benchmark for a single upsert operation using externally enabled cluster.
  */
-@State(Scope.Benchmark)
-@Fork(1)
-@Threads(64)
-@Warmup(iterations = 10, time = 2)
-@Measurement(iterations = 20, time = 2)
-@BenchmarkMode(Mode.Throughput)
-@OutputTimeUnit(TimeUnit.SECONDS)
 public class RemoteKvBenchmark extends ClientKvBenchmark {
     @Override
     protected boolean remote() {
@@ -57,9 +36,6 @@ public class RemoteKvBenchmark extends ClientKvBenchmark {
 
     @Override
     public void nodeSetUp() throws Exception {
-        
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, 
"false");
-        
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
 "false");
-
         client = IgniteClient.builder().addresses(addresses()).build();
         publicIgnite = client;
 
@@ -70,12 +46,6 @@ public class RemoteKvBenchmark extends ClientKvBenchmark {
      * Benchmark's entry point.
      */
     public static void main(String[] args) throws RunnerException {
-        Options opt = new OptionsBuilder()
-                .include(".*" + RemoteKvBenchmark.class.getSimpleName() + ".*")
-                // .jvmArgsAppend("-Djmh.executor=VIRTUAL")
-                // .addProfiler(JavaFlightRecorderProfiler.class, 
"configName=profile.jfc")
-                .build();
-
-        new Runner(opt).run();
+        runBenchmark(RemoteKvBenchmark.class, args);
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
index cdfe73a74cd..23be0ce86c3 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
@@ -87,8 +87,8 @@ public class UpsertKvBenchmark extends 
AbstractMultiNodeBenchmark {
 
     @Override
     public void nodeSetUp() throws Exception {
-        
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, 
"true");
-        
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
 "true");
+        
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, 
"false");
+        
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
 "false");
         super.nodeSetUp();
     }
 

Reply via email to