This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 70a5aa0258 IGNITE-20617 Sql. Fix performance degradation in SELECTS 
(#2804)
70a5aa0258 is described below

commit 70a5aa0258b39689b3b26c196f3c09e30e0daf36
Author: korlov42 <[email protected]>
AuthorDate: Tue Nov 7 14:42:41 2023 +0200

    IGNITE-20617 Sql. Fix performance degradation in SELECTS (#2804)
    
    The problem is that none of sql-related requests set observable timestamp 
to response's meta. As a result, the value of clock.now() is sent to a client, 
and the next request is waiting for proper safe time.
---
 .../handler/ClientInboundMessageHandler.java       |   4 +-
 .../requests/sql/ClientSqlCursorCloseRequest.java  |  22 +++-
 .../sql/ClientSqlCursorNextPageRequest.java        |  10 +-
 .../requests/sql/ClientSqlExecuteRequest.java      |   9 +-
 .../client/ObservableTimestampPropagationTest.java |  38 +++++++
 ...chmark.java => AbstractMultiNodeBenchmark.java} | 123 +++++++++++++++------
 .../ignite/internal/benchmark/InsertBenchmark.java |  11 +-
 .../ignite/internal/benchmark/SelectBenchmark.java |  11 +-
 ...{SqlOneNodeBenchmark.java => SqlBenchmark.java} |  15 ++-
 .../ignite/internal/tx/HybridTimestampTracker.java |   2 +-
 .../internal/tx/impl/IgniteTransactionsImpl.java   |  11 +-
 11 files changed, 202 insertions(+), 54 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 0dbf9239d5..58d3cf6ff4 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -673,10 +673,10 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
                 return ClientSqlExecuteRequest.process(in, out, sql, 
resources, metrics, igniteTransactions);
 
             case ClientOp.SQL_CURSOR_NEXT_PAGE:
-                return ClientSqlCursorNextPageRequest.process(in, out, 
resources);
+                return ClientSqlCursorNextPageRequest.process(in, out, 
resources, igniteTransactions);
 
             case ClientOp.SQL_CURSOR_CLOSE:
-                return ClientSqlCursorCloseRequest.process(in, resources);
+                return ClientSqlCursorCloseRequest.process(in, out, resources, 
igniteTransactions);
 
             case ClientOp.PARTITION_ASSIGNMENT_GET:
                 return ClientTablePartitionAssignmentGetRequest.process(in, 
out, igniteTables);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
index 3758860258..a49c897eec 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
@@ -19,8 +19,10 @@ package org.apache.ignite.client.handler.requests.sql;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 
 /**
  * Client SQL cursor close request.
@@ -29,15 +31,27 @@ public class ClientSqlCursorCloseRequest {
     /**
      * Processes the request.
      *
-     * @param in        Unpacker.
+     * @param in Unpacker.
+     * @param out Packer.
      * @param resources Resources.
+     * @param transactions Transactional facade. Used to acquire last observed 
time to propagate to client in response.
+     * @return Future representing result of operation.
      */
-    public static CompletableFuture<Void> process(ClientMessageUnpacker in, 
ClientResourceRegistry resources)
-            throws IgniteInternalCheckedException {
+    public static CompletableFuture<Void> process(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            ClientResourceRegistry resources,
+            IgniteTransactionsImpl transactions
+    ) throws IgniteInternalCheckedException {
         long resourceId = in.unpackLong();
 
         ClientSqlResultSet asyncResultSet = 
resources.remove(resourceId).get(ClientSqlResultSet.class);
 
-        return asyncResultSet.closeAsync();
+        return asyncResultSet.closeAsync()
+                .thenApply(ignored -> {
+                    out.meta(transactions.observableTimestamp());
+
+                    return null;
+                });
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
index 1c95c76a40..1100ddef36 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
@@ -24,6 +24,7 @@ import 
org.apache.ignite.client.handler.ClientResourceRegistry;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 
 /**
  * Client SQL cursor next page request.
@@ -34,13 +35,15 @@ public class ClientSqlCursorNextPageRequest {
      *
      * @param in  Unpacker.
      * @param out Packer.
-     * @return Future.
+     * @param transactions Transactional facade. Used to acquire last observed 
time to propagate to client in response.
+     * @return Future representing result of operation.
      */
     public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
-            ClientResourceRegistry resources)
-            throws IgniteInternalCheckedException {
+            ClientResourceRegistry resources,
+            IgniteTransactionsImpl transactions
+    ) throws IgniteInternalCheckedException {
         long resourceId = in.unpackLong();
 
         var resultSet = 
resources.get(resourceId).get(ClientSqlResultSet.class);
@@ -49,6 +52,7 @@ public class ClientSqlCursorNextPageRequest {
                 .thenCompose(r -> {
                     packCurrentPage(out, r);
                     out.packBoolean(r.hasMorePages());
+                    out.meta(transactions.observableTimestamp());
 
                     if (!r.hasMorePages()) {
                         try {
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 79929e98c0..f36c7c1dd0 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -64,7 +64,8 @@ public class ClientSqlExecuteRequest {
      * @param sql SQL API.
      * @param resources Resources.
      * @param metrics Metrics.
-     * @return Future.
+     * @param transactions Transactional facade. Used to acquire last observed 
time to propagate to client in response.
+     * @return Future representing result of operation.
      */
     public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
@@ -92,11 +93,7 @@ public class ClientSqlExecuteRequest {
         return session
                 .executeAsync(tx, statement, arguments)
                 .thenCompose(asyncResultSet -> {
-                    //noinspection StatementWithEmptyBody
-                    if (tx == null) {
-                        // TODO IGNITE-20232 Propagate observable timestamp to 
sql engine using internal API.
-                        // out.meta(asyncResultSet.tx().readTimestamp());
-                    }
+                    out.meta(transactions.observableTimestamp());
 
                     return writeResultSetAsync(out, resources, asyncResultSet, 
session, metrics);
                 });
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
index a3633f9728..71db772329 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.client;
 
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 import java.util.UUID;
@@ -28,6 +30,8 @@ import org.apache.ignite.internal.TestHybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.tx.TransactionOptions;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterAll;
@@ -62,6 +66,7 @@ public class ObservableTimestampPropagationTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
+    @SuppressWarnings("resource")
     public void testClientPropagatesLatestKnownHybridTimestamp() {
         assertNull(lastObservableTimestamp());
 
@@ -88,6 +93,35 @@ public class ObservableTimestampPropagationTest extends 
BaseIgniteAbstractTest {
         client.transactions().begin(new TransactionOptions().readOnly(true));
         client.transactions().begin(new TransactionOptions().readOnly(true));
         assertEquals(11, lastObservableTimestamp());
+
+        Statement statement = client.sql().statementBuilder()
+                .property("hasMorePages", true)
+                .query("SELECT 1")
+                .build();
+
+        // Execution of a SQL query should propagate observable time, not the 
current time of the clock.
+        currentServerTimestamp.set(20);
+        updateObservableTimestamp(14);
+        AsyncResultSet<?> rs = 
await(client.sql().createSession().executeAsync(null, statement));
+        assertEquals(14, lastObservableTimestamp());
+
+        assertNotNull(rs);
+
+        // Every fetch should propagate observable time, not the current time 
of the clock.
+        currentServerTimestamp.set(20);
+        updateObservableTimestamp(18);
+        await(rs.fetchNextPage());
+        assertEquals(18, lastObservableTimestamp());
+
+        currentServerTimestamp.set(24);
+        updateObservableTimestamp(20);
+        await(rs.fetchNextPage());
+        assertEquals(20, lastObservableTimestamp());
+
+        // Closing a result set should propagate observable time as well.
+        updateObservableTimestamp(22);
+        await(rs.closeAsync());
+        assertEquals(22, lastObservableTimestamp());
     }
 
     private static @Nullable Long lastObservableTimestamp() {
@@ -95,4 +129,8 @@ public class ObservableTimestampPropagationTest extends 
BaseIgniteAbstractTest {
 
         return ts == null ? null : ts.longValue() >> LOGICAL_TIME_BITS_SIZE;
     }
+
+    private static void updateObservableTimestamp(long newTime) {
+        
ignite.timestampTracker().update(HybridTimestamp.hybridTimestamp(newTime << 
LOGICAL_TIME_BITS_SIZE));
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
similarity index 54%
rename from 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
rename to 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index e2dbacc0aa..7166d040fb 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -17,24 +17,32 @@
 
 package org.apache.ignite.internal.benchmark;
 
+import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.sql.engine.property.PropertiesHelper.newBuilder;
 import static 
org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
-import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.InitParameters;
 import org.apache.ignite.internal.app.IgniteImpl;
-import 
org.apache.ignite.internal.network.configuration.NetworkConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.sql.engine.QueryContext;
 import org.apache.ignite.internal.sql.engine.QueryProperty;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.intellij.lang.annotations.Language;
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
@@ -43,14 +51,16 @@ import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.TearDown;
 
 /**
- * Base benchmark class for {@link SelectBenchmark} and {@link 
InsertBenchmark}. Starts an Ignite node with a single table
- * {@link #TABLE_NAME}, that has single PK column and 10 value columns.
+ * Base benchmark class for {@link SelectBenchmark} and {@link 
InsertBenchmark}.
+ *
+ * <p>Starts an Ignite cluster with a single table {@link #TABLE_NAME}, that 
has
+ * single PK column and 10 value columns.
  */
 @State(Scope.Benchmark)
-public class AbstractOneNodeBenchmark {
-    private static final int PORT = NetworkConfigurationSchema.DEFAULT_PORT;
-
-    private static final String NODE_NAME = "node" + PORT;
+public class AbstractMultiNodeBenchmark {
+    private static final int BASE_PORT = 3344;
+    private static final int BASE_CLIENT_PORT = 10800;
+    private static final int BASE_REST_PORT = 10300;
 
     protected static final String FIELD_VAL = "a".repeat(100);
 
@@ -66,27 +76,7 @@ public class AbstractOneNodeBenchmark {
      */
     @Setup
     public final void nodeSetUp() throws IOException {
-        Path workDir = 
Files.createTempDirectory("tmpDirPrefix").toFile().toPath();
-
-        @Language("HOCON")
-        String config = "network: {\n"
-                        + "  nodeFinder:{\n"
-                        + "    netClusterNodes: [ \"localhost:" + PORT + "\"] 
\n"
-                        + "  }\n"
-                        + "},"
-                        + "raft.fsync = " + fsync;
-
-        var fut =  TestIgnitionManager.start(NODE_NAME, config, 
workDir.resolve(NODE_NAME));
-
-        TestIgnitionManager.init(new InitParametersBuilder()
-                .clusterName("cluster")
-                .destinationNodeName(NODE_NAME)
-                .cmgNodeNames(List.of(NODE_NAME))
-                .metaStorageNodeNames(List.of(NODE_NAME))
-                .build()
-        );
-
-        clusterNode = (IgniteImpl) fut.join();
+        startCluster();
 
         var queryEngine = clusterNode.queryEngine();
 
@@ -111,7 +101,7 @@ public class AbstractOneNodeBenchmark {
                 + ");";
 
         try {
-            var context = QueryContext.create(SqlQueryType.SINGLE_STMT_TYPES);
+            var context = QueryContext.create(SqlQueryType.ALL);
 
             getAllFromCursor(
                     await(queryEngine.querySingleAsync(sessionId, context, 
clusterNode.transactions(), sql))
@@ -121,8 +111,77 @@ public class AbstractOneNodeBenchmark {
         }
     }
 
+    /**
+     * Stops the cluster.
+     *
+     * @throws Exception In case of any error.
+     */
     @TearDown
-    public final void nodeTearDown() {
-        IgnitionManager.stop(NODE_NAME);
+    public final void nodeTearDown() throws Exception {
+        List<AutoCloseable> closeables = IntStream.range(0, nodes())
+                .mapToObj(i -> nodeName(BASE_PORT + i))
+                .map(nodeName -> (AutoCloseable) () -> 
IgnitionManager.stop(nodeName))
+                .collect(toList());
+
+        IgniteUtils.closeAll(closeables);
+    }
+
+    private void startCluster() throws IOException {
+        Path workDir = 
Files.createTempDirectory("tmpDirPrefix").toFile().toPath();
+
+        String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+        List<CompletableFuture<Ignite>> futures = new ArrayList<>();
+
+        @Language("HOCON")
+        String configTemplate = "{\n"
+                + "  \"network\": {\n"
+                + "    \"port\":{},\n"
+                + "    \"nodeFinder\":{\n"
+                + "      \"netClusterNodes\": [ {} ]\n"
+                + "    }\n"
+                + "  },\n"
+                + "  clientConnector: { port:{} },\n"
+                + "  rest.port: {},\n"
+                + "  raft.fsync = " + fsync
+                + "}";
+
+        for (int i = 0; i < nodes(); i++) {
+            int port = BASE_PORT + i;
+            String nodeName = nodeName(port);
+
+            String config = IgniteStringFormatter.format(configTemplate, port, 
connectNodeAddr,
+                    BASE_CLIENT_PORT + i, BASE_REST_PORT + i);
+
+            futures.add(TestIgnitionManager.start(nodeName, config, 
workDir.resolve(nodeName)));
+        }
+
+        String metaStorageNodeName = nodeName(BASE_PORT);
+
+        InitParameters initParameters = InitParameters.builder()
+                .destinationNodeName(metaStorageNodeName)
+                .metaStorageNodeNames(List.of(metaStorageNodeName))
+                .clusterName("cluster")
+                .build();
+
+        TestIgnitionManager.init(initParameters);
+
+        for (CompletableFuture<Ignite> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            if (clusterNode == null) {
+                clusterNode = (IgniteImpl) await(future);
+            } else {
+                await(future);
+            }
+        }
+    }
+
+    private static String nodeName(int port) {
+        return "node_" + port;
+    }
+
+    protected int nodes() {
+        return 3;
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
index f9a828a180..d47d137948 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
@@ -39,6 +39,7 @@ import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
@@ -60,7 +61,10 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 @Measurement(iterations = 20, time = 2)
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
-public class InsertBenchmark extends AbstractOneNodeBenchmark {
+public class InsertBenchmark extends AbstractMultiNodeBenchmark {
+    @Param({"1", "2", "3"})
+    private int clusterSize;
+
     /**
      * Benchmark for SQL insert via embedded client.
      */
@@ -309,4 +313,9 @@ public class InsertBenchmark extends 
AbstractOneNodeBenchmark {
 
         return format(insertQueryTemplate, TABLE_NAME, "ycsb_key", fieldsQ, 
valQ);
     }
+
+    @Override
+    protected int nodes() {
+        return clusterSize;
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
index c65639491d..f502821205 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
@@ -38,6 +38,7 @@ import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
@@ -60,7 +61,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MICROSECONDS)
 @SuppressWarnings({"WeakerAccess", "unused"})
-public class SelectBenchmark extends AbstractOneNodeBenchmark {
+public class SelectBenchmark extends AbstractMultiNodeBenchmark {
     private static final int TABLE_SIZE = 30_000;
     private static final String SELECT_ALL_FROM_USERTABLE = "select * from 
usertable where ycsb_key = ?";
 
@@ -68,6 +69,9 @@ public class SelectBenchmark extends AbstractOneNodeBenchmark 
{
 
     private KeyValueView<Tuple, Tuple> keyValueView;
 
+    @Param({"1", "2", "3"})
+    private int clusterSize;
+
     /**
      * Fills the table with data.
      */
@@ -262,6 +266,11 @@ public class SelectBenchmark extends 
AbstractOneNodeBenchmark {
             return kvView;
         }
     }
+
+    @Override
+    protected int nodes() {
+        return clusterSize;
+    }
 }
 
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlBenchmark.java
similarity index 92%
rename from 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
rename to 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlBenchmark.java
index 0762242cbd..c41e5639f5 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlBenchmark.java
@@ -30,6 +30,7 @@ import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
@@ -42,7 +43,7 @@ import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 
 /**
- * Benchmark that runs sql queries via embedded client on single node cluster.
+ * Benchmark that runs sql queries via embedded client on clusters of 
different size.
  */
 @State(Scope.Benchmark)
 @Fork(1)
@@ -52,11 +53,14 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 @SuppressWarnings({"WeakerAccess", "unused"})
-public class SqlOneNodeBenchmark extends AbstractOneNodeBenchmark {
+public class SqlBenchmark extends AbstractMultiNodeBenchmark {
     private static final int TABLE_SIZE = 30_000;
 
     private Session session;
 
+    @Param({"1", "2", "3"})
+    private int clusterSize;
+
     /** Fills the table with data. */
     @Setup
     public void setUp() throws IOException {
@@ -148,11 +152,16 @@ public class SqlOneNodeBenchmark extends 
AbstractOneNodeBenchmark {
      */
     public static void main(String[] args) throws RunnerException {
         Options opt = new OptionsBuilder()
-                .include(".*" + SqlOneNodeBenchmark.class.getSimpleName() + 
".*")
+                .include(".*" + SqlBenchmark.class.getSimpleName() + ".*")
                 .build();
 
         new Runner(opt).run();
     }
+
+    @Override
+    protected int nodes() {
+        return clusterSize;
+    }
 }
 
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
index 636ba07457..849fa04079 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
@@ -44,7 +44,7 @@ public class HybridTimestampTracker {
      *
      * @param ts Timestamp to use for update.
      */
-    public void update(HybridTimestamp ts) {
+    public void update(@Nullable HybridTimestamp ts) {
         long tsVal = HybridTimestamp.hybridTimestampToLong(ts);
 
         timestamp.updateAndGet(x -> Math.max(x, tsVal));
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
index 41d3101872..8ab3ed32b0 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
@@ -50,10 +50,19 @@ public class IgniteTransactionsImpl implements 
IgniteTransactions {
      *
      * @param ts Timestamp.
      */
-    public void updateObservableTimestamp(HybridTimestamp ts) {
+    public void updateObservableTimestamp(@Nullable HybridTimestamp ts) {
         observableTimestampTracker.update(ts);
     }
 
+    /**
+     * Gets current value of observable timestamp.
+     *
+     * @return Timestamp or {@code null} if the tracker has never been updated.
+     */
+    public @Nullable HybridTimestamp observableTimestamp() {
+        return observableTimestampTracker.get();
+    }
+
     /**
      * Begins a transaction.
      * TODO:IGNITE-20232 Remove this method; instead, an interface method 
should be used.

Reply via email to