This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 70a5aa0258 IGNITE-20617 Sql. Fix performance degradation in SELECTS
(#2804)
70a5aa0258 is described below
commit 70a5aa0258b39689b3b26c196f3c09e30e0daf36
Author: korlov42 <[email protected]>
AuthorDate: Tue Nov 7 14:42:41 2023 +0200
IGNITE-20617 Sql. Fix performance degradation in SELECTS (#2804)
The problem is that none of sql-related requests set observable timestamp
to response's meta. As a result, the value of clock.now() is sent to a client,
and the next request is waiting for proper safe time.
---
.../handler/ClientInboundMessageHandler.java | 4 +-
.../requests/sql/ClientSqlCursorCloseRequest.java | 22 +++-
.../sql/ClientSqlCursorNextPageRequest.java | 10 +-
.../requests/sql/ClientSqlExecuteRequest.java | 9 +-
.../client/ObservableTimestampPropagationTest.java | 38 +++++++
...chmark.java => AbstractMultiNodeBenchmark.java} | 123 +++++++++++++++------
.../ignite/internal/benchmark/InsertBenchmark.java | 11 +-
.../ignite/internal/benchmark/SelectBenchmark.java | 11 +-
...{SqlOneNodeBenchmark.java => SqlBenchmark.java} | 15 ++-
.../ignite/internal/tx/HybridTimestampTracker.java | 2 +-
.../internal/tx/impl/IgniteTransactionsImpl.java | 11 +-
11 files changed, 202 insertions(+), 54 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 0dbf9239d5..58d3cf6ff4 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -673,10 +673,10 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
return ClientSqlExecuteRequest.process(in, out, sql,
resources, metrics, igniteTransactions);
case ClientOp.SQL_CURSOR_NEXT_PAGE:
- return ClientSqlCursorNextPageRequest.process(in, out,
resources);
+ return ClientSqlCursorNextPageRequest.process(in, out,
resources, igniteTransactions);
case ClientOp.SQL_CURSOR_CLOSE:
- return ClientSqlCursorCloseRequest.process(in, resources);
+ return ClientSqlCursorCloseRequest.process(in, out, resources,
igniteTransactions);
case ClientOp.PARTITION_ASSIGNMENT_GET:
return ClientTablePartitionAssignmentGetRequest.process(in,
out, igniteTables);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
index 3758860258..a49c897eec 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
@@ -19,8 +19,10 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
/**
* Client SQL cursor close request.
@@ -29,15 +31,27 @@ public class ClientSqlCursorCloseRequest {
/**
* Processes the request.
*
- * @param in Unpacker.
+ * @param in Unpacker.
+ * @param out Packer.
* @param resources Resources.
+ * @param transactions Transactional facade. Used to acquire last observed
time to propagate to client in response.
+ * @return Future representing result of operation.
*/
- public static CompletableFuture<Void> process(ClientMessageUnpacker in,
ClientResourceRegistry resources)
- throws IgniteInternalCheckedException {
+ public static CompletableFuture<Void> process(
+ ClientMessageUnpacker in,
+ ClientMessagePacker out,
+ ClientResourceRegistry resources,
+ IgniteTransactionsImpl transactions
+ ) throws IgniteInternalCheckedException {
long resourceId = in.unpackLong();
ClientSqlResultSet asyncResultSet =
resources.remove(resourceId).get(ClientSqlResultSet.class);
- return asyncResultSet.closeAsync();
+ return asyncResultSet.closeAsync()
+ .thenApply(ignored -> {
+ out.meta(transactions.observableTimestamp());
+
+ return null;
+ });
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
index 1c95c76a40..1100ddef36 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
@@ -24,6 +24,7 @@ import
org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
/**
* Client SQL cursor next page request.
@@ -34,13 +35,15 @@ public class ClientSqlCursorNextPageRequest {
*
* @param in Unpacker.
* @param out Packer.
- * @return Future.
+ * @param transactions Transactional facade. Used to acquire last observed
time to propagate to client in response.
+ * @return Future representing result of operation.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
- ClientResourceRegistry resources)
- throws IgniteInternalCheckedException {
+ ClientResourceRegistry resources,
+ IgniteTransactionsImpl transactions
+ ) throws IgniteInternalCheckedException {
long resourceId = in.unpackLong();
var resultSet =
resources.get(resourceId).get(ClientSqlResultSet.class);
@@ -49,6 +52,7 @@ public class ClientSqlCursorNextPageRequest {
.thenCompose(r -> {
packCurrentPage(out, r);
out.packBoolean(r.hasMorePages());
+ out.meta(transactions.observableTimestamp());
if (!r.hasMorePages()) {
try {
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 79929e98c0..f36c7c1dd0 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -64,7 +64,8 @@ public class ClientSqlExecuteRequest {
* @param sql SQL API.
* @param resources Resources.
* @param metrics Metrics.
- * @return Future.
+ * @param transactions Transactional facade. Used to acquire last observed
time to propagate to client in response.
+ * @return Future representing result of operation.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
@@ -92,11 +93,7 @@ public class ClientSqlExecuteRequest {
return session
.executeAsync(tx, statement, arguments)
.thenCompose(asyncResultSet -> {
- //noinspection StatementWithEmptyBody
- if (tx == null) {
- // TODO IGNITE-20232 Propagate observable timestamp to
sql engine using internal API.
- // out.meta(asyncResultSet.tx().readTimestamp());
- }
+ out.meta(transactions.observableTimestamp());
return writeResultSetAsync(out, resources, asyncResultSet,
session, metrics);
});
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
index a3633f9728..71db772329 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
@@ -18,7 +18,9 @@
package org.apache.ignite.client;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import java.util.UUID;
@@ -28,6 +30,8 @@ import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
@@ -62,6 +66,7 @@ public class ObservableTimestampPropagationTest extends
BaseIgniteAbstractTest {
}
@Test
+ @SuppressWarnings("resource")
public void testClientPropagatesLatestKnownHybridTimestamp() {
assertNull(lastObservableTimestamp());
@@ -88,6 +93,35 @@ public class ObservableTimestampPropagationTest extends
BaseIgniteAbstractTest {
client.transactions().begin(new TransactionOptions().readOnly(true));
client.transactions().begin(new TransactionOptions().readOnly(true));
assertEquals(11, lastObservableTimestamp());
+
+ Statement statement = client.sql().statementBuilder()
+ .property("hasMorePages", true)
+ .query("SELECT 1")
+ .build();
+
+ // Execution of a SQL query should propagate observable time, not the
current time of the clock.
+ currentServerTimestamp.set(20);
+ updateObservableTimestamp(14);
+ AsyncResultSet<?> rs =
await(client.sql().createSession().executeAsync(null, statement));
+ assertEquals(14, lastObservableTimestamp());
+
+ assertNotNull(rs);
+
+ // Every fetch should propagate observable time, not the current time
of the clock.
+ currentServerTimestamp.set(20);
+ updateObservableTimestamp(18);
+ await(rs.fetchNextPage());
+ assertEquals(18, lastObservableTimestamp());
+
+ currentServerTimestamp.set(24);
+ updateObservableTimestamp(20);
+ await(rs.fetchNextPage());
+ assertEquals(20, lastObservableTimestamp());
+
+ // Closing a result set should propagate observable time as well.
+ updateObservableTimestamp(22);
+ await(rs.closeAsync());
+ assertEquals(22, lastObservableTimestamp());
}
private static @Nullable Long lastObservableTimestamp() {
@@ -95,4 +129,8 @@ public class ObservableTimestampPropagationTest extends
BaseIgniteAbstractTest {
return ts == null ? null : ts.longValue() >> LOGICAL_TIME_BITS_SIZE;
}
+
+ private static void updateObservableTimestamp(long newTime) {
+
ignite.timestampTracker().update(HybridTimestamp.hybridTimestamp(newTime <<
LOGICAL_TIME_BITS_SIZE));
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
similarity index 54%
rename from
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
rename to
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index e2dbacc0aa..7166d040fb 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -17,24 +17,32 @@
package org.apache.ignite.internal.benchmark;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.sql.engine.property.PropertiesHelper.newBuilder;
import static
org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
-import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.InitParameters;
import org.apache.ignite.internal.app.IgniteImpl;
-import
org.apache.ignite.internal.network.configuration.NetworkConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.sql.engine.QueryContext;
import org.apache.ignite.internal.sql.engine.QueryProperty;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.intellij.lang.annotations.Language;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
@@ -43,14 +51,16 @@ import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
/**
- * Base benchmark class for {@link SelectBenchmark} and {@link
InsertBenchmark}. Starts an Ignite node with a single table
- * {@link #TABLE_NAME}, that has single PK column and 10 value columns.
+ * Base benchmark class for {@link SelectBenchmark} and {@link
InsertBenchmark}.
+ *
+ * <p>Starts an Ignite cluster with a single table {@link #TABLE_NAME}, that
has
+ * single PK column and 10 value columns.
*/
@State(Scope.Benchmark)
-public class AbstractOneNodeBenchmark {
- private static final int PORT = NetworkConfigurationSchema.DEFAULT_PORT;
-
- private static final String NODE_NAME = "node" + PORT;
+public class AbstractMultiNodeBenchmark {
+ private static final int BASE_PORT = 3344;
+ private static final int BASE_CLIENT_PORT = 10800;
+ private static final int BASE_REST_PORT = 10300;
protected static final String FIELD_VAL = "a".repeat(100);
@@ -66,27 +76,7 @@ public class AbstractOneNodeBenchmark {
*/
@Setup
public final void nodeSetUp() throws IOException {
- Path workDir =
Files.createTempDirectory("tmpDirPrefix").toFile().toPath();
-
- @Language("HOCON")
- String config = "network: {\n"
- + " nodeFinder:{\n"
- + " netClusterNodes: [ \"localhost:" + PORT + "\"]
\n"
- + " }\n"
- + "},"
- + "raft.fsync = " + fsync;
-
- var fut = TestIgnitionManager.start(NODE_NAME, config,
workDir.resolve(NODE_NAME));
-
- TestIgnitionManager.init(new InitParametersBuilder()
- .clusterName("cluster")
- .destinationNodeName(NODE_NAME)
- .cmgNodeNames(List.of(NODE_NAME))
- .metaStorageNodeNames(List.of(NODE_NAME))
- .build()
- );
-
- clusterNode = (IgniteImpl) fut.join();
+ startCluster();
var queryEngine = clusterNode.queryEngine();
@@ -111,7 +101,7 @@ public class AbstractOneNodeBenchmark {
+ ");";
try {
- var context = QueryContext.create(SqlQueryType.SINGLE_STMT_TYPES);
+ var context = QueryContext.create(SqlQueryType.ALL);
getAllFromCursor(
await(queryEngine.querySingleAsync(sessionId, context,
clusterNode.transactions(), sql))
@@ -121,8 +111,77 @@ public class AbstractOneNodeBenchmark {
}
}
+ /**
+ * Stops the cluster.
+ *
+ * @throws Exception In case of any error.
+ */
@TearDown
- public final void nodeTearDown() {
- IgnitionManager.stop(NODE_NAME);
+ public final void nodeTearDown() throws Exception {
+ List<AutoCloseable> closeables = IntStream.range(0, nodes())
+ .mapToObj(i -> nodeName(BASE_PORT + i))
+ .map(nodeName -> (AutoCloseable) () ->
IgnitionManager.stop(nodeName))
+ .collect(toList());
+
+ IgniteUtils.closeAll(closeables);
+ }
+
+ private void startCluster() throws IOException {
+ Path workDir =
Files.createTempDirectory("tmpDirPrefix").toFile().toPath();
+
+ String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+ List<CompletableFuture<Ignite>> futures = new ArrayList<>();
+
+ @Language("HOCON")
+ String configTemplate = "{\n"
+ + " \"network\": {\n"
+ + " \"port\":{},\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " clientConnector: { port:{} },\n"
+ + " rest.port: {},\n"
+ + " raft.fsync = " + fsync
+ + "}";
+
+ for (int i = 0; i < nodes(); i++) {
+ int port = BASE_PORT + i;
+ String nodeName = nodeName(port);
+
+ String config = IgniteStringFormatter.format(configTemplate, port,
connectNodeAddr,
+ BASE_CLIENT_PORT + i, BASE_REST_PORT + i);
+
+ futures.add(TestIgnitionManager.start(nodeName, config,
workDir.resolve(nodeName)));
+ }
+
+ String metaStorageNodeName = nodeName(BASE_PORT);
+
+ InitParameters initParameters = InitParameters.builder()
+ .destinationNodeName(metaStorageNodeName)
+ .metaStorageNodeNames(List.of(metaStorageNodeName))
+ .clusterName("cluster")
+ .build();
+
+ TestIgnitionManager.init(initParameters);
+
+ for (CompletableFuture<Ignite> future : futures) {
+ assertThat(future, willCompleteSuccessfully());
+
+ if (clusterNode == null) {
+ clusterNode = (IgniteImpl) await(future);
+ } else {
+ await(future);
+ }
+ }
+ }
+
+ private static String nodeName(int port) {
+ return "node_" + port;
+ }
+
+ protected int nodes() {
+ return 3;
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
index f9a828a180..d47d137948 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
@@ -39,6 +39,7 @@ import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
@@ -60,7 +61,10 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
@Measurement(iterations = 20, time = 2)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
-public class InsertBenchmark extends AbstractOneNodeBenchmark {
+public class InsertBenchmark extends AbstractMultiNodeBenchmark {
+ @Param({"1", "2", "3"})
+ private int clusterSize;
+
/**
* Benchmark for SQL insert via embedded client.
*/
@@ -309,4 +313,9 @@ public class InsertBenchmark extends
AbstractOneNodeBenchmark {
return format(insertQueryTemplate, TABLE_NAME, "ycsb_key", fieldsQ,
valQ);
}
+
+ @Override
+ protected int nodes() {
+ return clusterSize;
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
index c65639491d..f502821205 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
@@ -38,6 +38,7 @@ import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
@@ -60,7 +61,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@SuppressWarnings({"WeakerAccess", "unused"})
-public class SelectBenchmark extends AbstractOneNodeBenchmark {
+public class SelectBenchmark extends AbstractMultiNodeBenchmark {
private static final int TABLE_SIZE = 30_000;
private static final String SELECT_ALL_FROM_USERTABLE = "select * from
usertable where ycsb_key = ?";
@@ -68,6 +69,9 @@ public class SelectBenchmark extends AbstractOneNodeBenchmark
{
private KeyValueView<Tuple, Tuple> keyValueView;
+ @Param({"1", "2", "3"})
+ private int clusterSize;
+
/**
* Fills the table with data.
*/
@@ -262,6 +266,11 @@ public class SelectBenchmark extends
AbstractOneNodeBenchmark {
return kvView;
}
}
+
+ @Override
+ protected int nodes() {
+ return clusterSize;
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlBenchmark.java
similarity index 92%
rename from
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
rename to
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlBenchmark.java
index 0762242cbd..c41e5639f5 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlBenchmark.java
@@ -30,6 +30,7 @@ import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
@@ -42,7 +43,7 @@ import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
/**
- * Benchmark that runs sql queries via embedded client on single node cluster.
+ * Benchmark that runs sql queries via embedded client on clusters of
different size.
*/
@State(Scope.Benchmark)
@Fork(1)
@@ -52,11 +53,14 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@SuppressWarnings({"WeakerAccess", "unused"})
-public class SqlOneNodeBenchmark extends AbstractOneNodeBenchmark {
+public class SqlBenchmark extends AbstractMultiNodeBenchmark {
private static final int TABLE_SIZE = 30_000;
private Session session;
+ @Param({"1", "2", "3"})
+ private int clusterSize;
+
/** Fills the table with data. */
@Setup
public void setUp() throws IOException {
@@ -148,11 +152,16 @@ public class SqlOneNodeBenchmark extends
AbstractOneNodeBenchmark {
*/
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
- .include(".*" + SqlOneNodeBenchmark.class.getSimpleName() +
".*")
+ .include(".*" + SqlBenchmark.class.getSimpleName() + ".*")
.build();
new Runner(opt).run();
}
+
+ @Override
+ protected int nodes() {
+ return clusterSize;
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
index 636ba07457..849fa04079 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
@@ -44,7 +44,7 @@ public class HybridTimestampTracker {
*
* @param ts Timestamp to use for update.
*/
- public void update(HybridTimestamp ts) {
+ public void update(@Nullable HybridTimestamp ts) {
long tsVal = HybridTimestamp.hybridTimestampToLong(ts);
timestamp.updateAndGet(x -> Math.max(x, tsVal));
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
index 41d3101872..8ab3ed32b0 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
@@ -50,10 +50,19 @@ public class IgniteTransactionsImpl implements
IgniteTransactions {
*
* @param ts Timestamp.
*/
- public void updateObservableTimestamp(HybridTimestamp ts) {
+ public void updateObservableTimestamp(@Nullable HybridTimestamp ts) {
observableTimestampTracker.update(ts);
}
+ /**
+ * Gets current value of observable timestamp.
+ *
+ * @return Timestamp or {@code null} if the tracker has never been updated.
+ */
+ public @Nullable HybridTimestamp observableTimestamp() {
+ return observableTimestampTracker.get();
+ }
+
/**
* Begins a transaction.
* TODO:IGNITE-20232 Remove this method; instead, an interface method
should be used.