This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cd7d979901 IGNITE-18835 Get rid of skipping safe time waiting on a
primary node. (#1688)
cd7d979901 is described below
commit cd7d97990125cb02b2401c6140ea98da3ca646e4
Author: Mirza Aliev <[email protected]>
AuthorDate: Tue Mar 7 21:12:18 2023 +0400
IGNITE-18835 Get rid of skipping safe time waiting on a primary node.
(#1688)
---
.../ItRaftCommandLeftInLogUntilRestartTest.java | 7 +-
.../internal/table/ItReadOnlyTransactionTest.java | 211 ++++++++
.../ignite/internal/table/ItRoReadsTest.java | 543 +++++++++++++++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 10 +
.../replicator/PartitionReplicaListener.java | 20 +-
.../table/impl/DummyInternalTableImpl.java | 55 ++-
6 files changed, 835 insertions(+), 11 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 75688ae3e2..775a423dd7 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -32,7 +32,6 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
@@ -175,9 +174,9 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends
ClusterPerClassInteg
BinaryRowEx key = new
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 42));
if (isNode0Leader) {
- assertNull(table.internalTable().get(key, new
HybridClockImpl().now(), node1.node()).get());
+ assertNull(table.internalTable().get(key, node1.clock().now(),
node1.node()).get());
} else {
- assertNull(table.internalTable().get(key, new
HybridClockImpl().now(), node0.node()).get());
+ assertNull(table.internalTable().get(key, node1.clock().now(),
node0.node()).get());
}
var tx = node0.transactions().begin();
@@ -275,7 +274,7 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends
ClusterPerClassInteg
BinaryRowEx testKey = new
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("ID",
row[0]));
- BinaryRow readOnlyRow = table.internalTable().get(testKey, new
HybridClockImpl().now(), ignite.node()).get();
+ BinaryRow readOnlyRow = table.internalTable().get(testKey,
ignite.clock().now(), ignite.node()).get();
assertNotNull(readOnlyRow);
assertEquals(row[1], new Row(table.schemaView().schema(),
readOnlyRow).stringValue(2));
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
new file mode 100644
index 0000000000..dfc77c4e62
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test reads with specific timestamp.
+ */
+public class ItReadOnlyTransactionTest extends AbstractBasicIntegrationTest {
+ /** Table name. */
+ public static final String TABLE_NAME = "tbl";
+ /** Gap in future to request a data. */
+ public static final int FUTURE_GAP = 200;
+
+ @BeforeAll
+ public void beforeTestStart() {
+ sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val VARCHAR)
WITH REPLICAS=" + nodes() + ", PARTITIONS=10");
+
+ Ignite ignite = CLUSTER_NODES.get(0);
+
+ ignite.transactions().runInTransaction(tx -> {
+ for (int i = 0; i < 100; i++) {
+ sql(tx, "INSERT INTO " + TABLE_NAME + " VALUES (?, ?)", i,
"str " + i);
+ }
+
+ assertEquals(100, checkData(tx, id -> "str " + id));
+ });
+
+ assertEquals(100, checkData(null, id -> "str " + id));
+ }
+
+ /**
+ * Check rows in the table {@link ItReadOnlyTransactionTest#TABLE_NAME}.
+ *
+ * @param tx Transaction. The parameter might be {@code null} for implicit
transaction.
+ * @param valueMapper Function to map a primary key to a column.
+ * @return Count of rows in the table.
+ */
+ private static int checkData(Transaction tx, Function<Integer, String>
valueMapper) {
+ List<List<Object>> rows = sql(tx, "SELECT id, val FROM " + TABLE_NAME
+ " ORDER BY id");
+
+ for (List<Object> row : rows) {
+ var id = (Integer) row.get(0);
+
+ assertEquals(valueMapper.apply(id), row.get(1));
+ }
+
+ return rows.size();
+ }
+
+ @Test
+ public void testFutureRead() throws Exception {
+ for (int i = 0; i < nodes(); i++) {
+ Ignite ignite = CLUSTER_NODES.get(i);
+
+ InternalTable internalTable = ((TableImpl)
ignite.tables().table(TABLE_NAME)).internalTable();
+ SchemaDescriptor schema = ((TableImpl)
ignite.tables().table(TABLE_NAME)).schemaView().schema();
+ HybridClock clock = ((IgniteImpl) ignite).clock();
+
+ Collection<ClusterNode> nodes = ignite.clusterNodes();
+
+ for (ClusterNode clusterNode : nodes) {
+ CompletableFuture<BinaryRow> getFut =
internalTable.get(createRowKey(schema, 100 + i), clock.now(), clusterNode);
+
+ assertNull(getFut.join());
+ }
+
+ ArrayList<CompletableFuture<BinaryRow>> futs = new
ArrayList<>(nodes.size());
+
+ long startTime = System.currentTimeMillis();
+
+ for (ClusterNode clusterNode : nodes) {
+ CompletableFuture<BinaryRow> getFut = internalTable.get(
+ createRowKey(schema, 100 + i),
+ new HybridTimestamp(clock.now().getPhysical() +
FUTURE_GAP, 0),
+ clusterNode
+ );
+ assertFalse(getFut.isDone());
+
+ futs.add(getFut);
+ }
+
+ internalTable.insert(createRow(schema, 100 + i), null).get();
+
+ log.info("Delay to create a new data record [node={}, delay={}]",
ignite.name(), (System.currentTimeMillis() - startTime));
+
+ assertTrue(System.currentTimeMillis() - startTime < FUTURE_GAP,
+ "Too long to execute [delay=" +
(System.currentTimeMillis() - startTime) + ']');
+
+ for (var getFut : futs) {
+ assertNotNull(getFut.get(10, TimeUnit.SECONDS));
+ }
+ }
+
+ assertEquals(100 + nodes(), checkData(null, id -> id < 100 ? ("str " +
id) : ("new str " + id)));
+
+ Ignite ignite = CLUSTER_NODES.get(0);
+
+ ignite.transactions().runInTransaction(tx -> {
+ for (int i = 100; i < 100 + nodes(); i++) {
+ sql(tx, "DELETE FROM " + TABLE_NAME + " WHERE id = ?", i);
+ }
+ });
+ }
+
+ @Test
+ public void testPastRead() throws Exception {
+ for (int i = 0; i < nodes(); i++) {
+ Ignite ignite = CLUSTER_NODES.get(i);
+
+ InternalTable internalTable = ((TableImpl)
ignite.tables().table(TABLE_NAME)).internalTable();
+ SchemaDescriptor schema = ((TableImpl)
ignite.tables().table(TABLE_NAME)).schemaView().schema();
+ HybridClock clock = ((IgniteImpl) ignite).clock();
+
+ Collection<ClusterNode> nodes = ignite.clusterNodes();
+
+ for (ClusterNode clusterNode : nodes) {
+ CompletableFuture<BinaryRow> getFut =
internalTable.get(createRowKey(schema, i), clock.now(), clusterNode);
+
+ assertNotNull(getFut.join());
+ }
+
+ var pastTs = clock.now();
+
+ long startTime = System.currentTimeMillis();
+
+ internalTable.delete(createRowKey(schema, i), null).get();
+
+ for (ClusterNode clusterNode : nodes) {
+ CompletableFuture<BinaryRow> getFut =
internalTable.get(createRowKey(schema, i), clock.now(), clusterNode);
+
+ assertNull(getFut.join());
+ }
+
+ log.info("Delay to remove a data record [node={}, delay={}]",
ignite.name(), (System.currentTimeMillis() - startTime));
+
+ for (ClusterNode clusterNode : nodes) {
+ CompletableFuture<BinaryRow> getFut =
internalTable.get(createRowKey(schema, i), pastTs, clusterNode);
+
+ assertNotNull(getFut.join());
+ }
+ }
+
+ assertEquals(100 - nodes(), checkData(null, id -> "str " + id));
+
+ Ignite ignite = CLUSTER_NODES.get(0);
+
+ ignite.transactions().runInTransaction(tx -> {
+ for (int i = 0; i < nodes(); i++) {
+ sql(tx, "INSERT INTO " + TABLE_NAME + " VALUES (?, ?)", i,
"str " + i);
+ }
+ });
+ }
+
+ private static Row createRow(SchemaDescriptor schema, int id) {
+ RowAssembler rowBuilder = new RowAssembler(schema);
+
+ rowBuilder.appendInt(id);
+ rowBuilder.appendString("new str " + id);
+
+ return new Row(schema, rowBuilder.build());
+ }
+
+ private static Row createRowKey(SchemaDescriptor schema, int id) {
+ RowAssembler rowBuilder = RowAssembler.keyAssembler(schema);
+
+ rowBuilder.appendInt(id);
+
+ return new Row(schema, rowBuilder.build());
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
new file mode 100644
index 0000000000..e7ab9eef57
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.apache.ignite.internal.runner.app.ItTablesApiTest.SCHEMA;
+import static
org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.convert;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
+import org.apache.ignite.internal.schema.testutils.definition.ColumnDefinition;
+import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for the read-only API.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItRoReadsTest extends BaseIgniteAbstractTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(ItRoReadsTest.class);
+
+ private static final String TABLE_NAME = "some-table";
+
+ private static final SchemaDescriptor SCHEMA_1 = new SchemaDescriptor(
+ 1,
+ new Column[]{new Column("key", NativeTypes.INT64, false)},
+ new Column[]{
+ new Column("valInt", NativeTypes.INT32, false),
+ new Column("valStr", NativeTypes.STRING, false)
+ }
+ );
+
+ private static final int BASE_PORT = 3344;
+
+ private static final String NODE_BOOTSTRAP_CFG = "{\n"
+ + " \"network\": {\n"
+ + " \"port\":{},\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ {} ]\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+
+ private static Ignite NODE;
+
+ @WorkDirectory
+ private static Path WORK_DIR;
+
+ private Table table;
+
+ @BeforeAll
+ static void startNode(TestInfo testInfo) {
+ String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+ String nodeName = testNodeName(testInfo, 0);
+
+ String config = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG,
BASE_PORT, connectNodeAddr);
+
+ CompletableFuture<Ignite> future = IgnitionManager.start(nodeName,
config, WORK_DIR.resolve(nodeName));
+
+ String metaStorageNodeName = testNodeName(testInfo, nodes() - 1);
+
+ IgnitionManager.init(metaStorageNodeName,
List.of(metaStorageNodeName), "cluster");
+
+ assertThat(future, willCompleteSuccessfully());
+
+ NODE = future.join();
+ }
+
+ @AfterAll
+ static void stopNode(TestInfo testInfo) throws Exception {
+ LOG.info("Start tearDown()");
+
+ NODE = null;
+
+ IgniteUtils.closeAll(() -> IgnitionManager.stop(testNodeName(testInfo,
0)));
+
+ LOG.info("End tearDown()");
+ }
+
+ @BeforeEach
+ void createTable() {
+ table = startTable(node(), TABLE_NAME);
+ }
+
+ @AfterEach
+ void dropTable() {
+ stopTable(node(), TABLE_NAME);
+
+ table = null;
+ }
+
+ @Test
+ public void testRoGet() throws Exception {
+ IgniteImpl node = node();
+
+ InternalTable internalTable = ((TableImpl) table).internalTable();
+
+ Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1);
+
+ BinaryRow res = internalTable.get(keyValueRow, node.clock().now(),
node.node()).get();
+
+ assertNull(res);
+
+ KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+ populateData(node, keyValueView, false);
+
+ res = internalTable.get(keyValueRow, node.clock().now(),
node.node()).get();
+
+ assertEquals(res.byteBuffer(), keyValueRow.byteBuffer());
+ }
+
+ @Test
+ public void testRoGetWithSeveralInserts() throws Exception {
+ IgniteImpl node = node();
+
+ InternalTable internalTable = ((TableImpl) table).internalTable();
+
+ Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1);
+
+ Row keyValueRow2 = createKeyValueRow(1, 2, "some string row" + 2);
+
+ Row keyRow = createKeyRow(1);
+
+ assertNull(internalTable.get(keyRow, node.clock().now(),
node.node()).get());
+ assertNull(internalTable.get(keyRow, node.clock().now(),
node.node()).get());
+
+ Transaction tx1 = node.transactions().begin();
+
+ internalTable.upsert(keyValueRow, (InternalTransaction) tx1).get();
+
+ tx1.commit();
+
+ Transaction tx2 = node.transactions().begin();
+
+ internalTable.upsert(keyValueRow2, (InternalTransaction) tx2).get();
+
+ tx2.commit();
+
+ BinaryRow res = internalTable.get(keyRow, node.clock().now(),
node.node()).get();
+
+ assertEquals(res.byteBuffer(), keyValueRow2.byteBuffer());
+ }
+
+ @Test
+ public void testRoScanWithSeveralInserts() throws Exception {
+ IgniteImpl node = node();
+
+ InternalTable internalTable = ((TableImpl) table).internalTable();
+
+ Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1);
+
+ Row keyValueRow2 = createKeyValueRow(1, 2, "some string row" + 2);
+
+ Row keyRow = createKeyRow(1);
+
+ assertNull(internalTable.get(keyRow, node.clock().now(),
node.node()).get());
+ assertNull(internalTable.get(keyRow, node.clock().now(),
node.node()).get());
+
+ Transaction tx1 = node.transactions().begin();
+
+ internalTable.insert(keyValueRow, (InternalTransaction) tx1).get();
+
+ tx1.commit();
+
+ Transaction tx2 = node.transactions().begin();
+
+ internalTable.upsert(keyValueRow2, (InternalTransaction) tx2).get();
+
+ tx2.commit();
+
+ Publisher<BinaryRow> res = internalTable.scan(0, node.clock().now(),
node.node());
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ List<ByteBuffer> list = new ArrayList<>();
+
+ res.subscribe(new Subscriber<BinaryRow>() {
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(100);
+ }
+
+ @Override
+ public void onNext(BinaryRow item) {
+ list.add(item.byteBuffer());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onComplete() {
+ latch.countDown();
+ }
+ });
+
+ latch.await();
+
+ assertEquals(1, list.size());
+
+ assertEquals(list.get(0), keyValueRow2.byteBuffer());
+ }
+
+ @Test
+ public void testRoGetOngoingCommitIsNotVisible() throws Exception {
+ IgniteImpl node = node();
+
+ InternalTable internalTable = ((TableImpl) table).internalTable();
+
+ Row keyValueRow = createKeyValueRow(1, 1, "some string row" + 1);
+
+ Row keyValueRow2 = createKeyValueRow(1, 2, "some string row" + 2);
+
+ assertNull(internalTable.get(keyValueRow, node.clock().now(),
node.node()).get());
+ assertNull(internalTable.get(keyValueRow2, node.clock().now(),
node.node()).get());
+
+ Transaction tx1 = node.transactions().begin();
+
+ internalTable.insert(keyValueRow, (InternalTransaction) tx1).get();
+
+ tx1.commit();
+
+ Transaction tx2 = node.transactions().begin();
+
+ internalTable.upsert(keyValueRow2, (InternalTransaction) tx2);
+
+ BinaryRow res = internalTable.get(keyValueRow, node.clock().now(),
node.node()).get();
+
+ assertEquals(res.byteBuffer(), keyValueRow.byteBuffer());
+
+ tx2.commit();
+
+ res = internalTable.get(keyValueRow, node.clock().now(),
node.node()).get();
+
+ assertEquals(res.byteBuffer(), keyValueRow2.byteBuffer());
+ }
+
+ @Test
+ public void testRoGetAll() throws Exception {
+ IgniteImpl node = node();
+
+ InternalTable internalTable = ((TableImpl) table).internalTable();
+
+ Row keyValueRow1 = createKeyValueRow(1, 1, "some string row" + 1);
+ Row keyValueRow2 = createKeyValueRow(2, 2, "some string row" + 2);
+ Row keyValueRow3 = createKeyValueRow(3, 3, "some string row" + 3);
+
+ Set<BinaryRowEx> rowsToSearch = Set.of(keyValueRow1, keyValueRow2,
keyValueRow3);
+
+ KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+ Collection<BinaryRow> res = internalTable.getAll(rowsToSearch,
node.clock().now(), node.node()).get();
+
+ assertEquals(res.size(), 0);
+
+ node.transactions().runInTransaction(txs -> {
+ for (int i = 0; i < 15; i++) {
+ putValue(keyValueView, i, txs);
+ }
+ });
+
+ res = internalTable.getAll(rowsToSearch, node.clock().now(),
node.node()).get();
+
+ assertEquals(res.size(), 3);
+
+ Set<ByteBuffer> resultKeys =
res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet());
+
+ assertTrue(resultKeys.contains(keyValueRow1.byteBuffer()));
+ assertTrue(resultKeys.contains(keyValueRow2.byteBuffer()));
+ assertTrue(resultKeys.contains(keyValueRow3.byteBuffer()));
+ }
+
+ @Test
+ public void testRoGetAllWithSeveralInserts() throws ExecutionException,
InterruptedException {
+ IgniteImpl node = node();
+
+ InternalTable internalTable = ((TableImpl) table).internalTable();
+
+ Row keyValueRow1 = createKeyValueRow(1, 1, "some string row" + 1);
+ Row keyValueRow2 = createKeyValueRow(2, 2, "some string row" + 2);
+ Row keyValueRow3 = createKeyValueRow(3, 3, "some string row" + 3);
+
+ Set<BinaryRowEx> rowsToSearch = Set.of(keyValueRow1, keyValueRow2,
keyValueRow3);
+
+ KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+ Collection<BinaryRow> res = internalTable.getAll(rowsToSearch,
node.clock().now(), node.node()).get();
+
+ assertEquals(res.size(), 0);
+
+ populateData(node(), keyValueView, false);
+
+ res = internalTable.getAll(rowsToSearch, node.clock().now(),
node.node()).get();
+
+ assertEquals(res.size(), 3);
+
+ Set<ByteBuffer> resultKeys =
res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet());
+
+ assertTrue(resultKeys.contains(keyValueRow1.byteBuffer()));
+ assertTrue(resultKeys.contains(keyValueRow2.byteBuffer()));
+ assertTrue(resultKeys.contains(keyValueRow3.byteBuffer()));
+
+ node.transactions().runInTransaction(txs -> {
+ for (int i = 0; i < 15; i++) {
+ putValue(keyValueView, i + 100, txs);
+ }
+ });
+
+ Row newKeyValueRow1 = createKeyValueRow(1, 101, "some string row" +
101);
+ Row newKeyValueRow2 = createKeyValueRow(2, 102, "some string row" +
102);
+ Row newKeyValueRow3 = createKeyValueRow(3, 103, "some string row" +
103);
+
+ res = internalTable.getAll(rowsToSearch, node.clock().now(),
node.node()).get();
+
+ assertEquals(res.size(), 3);
+
+ resultKeys =
res.stream().map(BinaryRow::byteBuffer).collect(Collectors.toSet());
+
+ assertTrue(resultKeys.contains(newKeyValueRow1.byteBuffer()));
+ assertTrue(resultKeys.contains(newKeyValueRow2.byteBuffer()));
+ assertTrue(resultKeys.contains(newKeyValueRow3.byteBuffer()));
+ }
+
+ @Test
+ public void testRoScanAllImplicitPopulatingData() throws
InterruptedException {
+ roScanAll(true);
+ }
+
+ @Test
+ public void testRoScanAllExplicitPopulatingData() throws
InterruptedException {
+ roScanAll(false);
+ }
+
+ private void roScanAll(boolean implicit) throws InterruptedException {
+ IgniteImpl node = node();
+
+ InternalTable internalTable = ((TableImpl) table).internalTable();
+
+ KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+ Publisher<BinaryRow> res = internalTable.scan(0, node.clock().now(),
node.node());
+
+ var subscriberAllDataAwaitLatch = new CountDownLatch(1);
+
+ var retrievedItems = new ArrayList<BinaryRow>();
+
+ res.subscribe(new Subscriber<>() {
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(10000);
+ }
+
+ @Override
+ public void onNext(BinaryRow item) {
+ retrievedItems.add(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError call is not expected.");
+ }
+
+ @Override
+ public void onComplete() {
+ subscriberAllDataAwaitLatch.countDown();
+ }
+ });
+
+ subscriberAllDataAwaitLatch.await();
+
+ assertEquals(0, retrievedItems.size());
+
+ populateData(node, keyValueView, implicit);
+
+ res = internalTable.scan(0, node.clock().now(), node.node());
+
+ var subscriberAllDataAwaitLatch2 = new CountDownLatch(1);
+
+ res.subscribe(new Subscriber<>() {
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(10000);
+ }
+
+ @Override
+ public void onNext(BinaryRow item) {
+ retrievedItems.add(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError call is not expected.");
+ }
+
+ @Override
+ public void onComplete() {
+ subscriberAllDataAwaitLatch2.countDown();
+ }
+ });
+
+ subscriberAllDataAwaitLatch2.await();
+
+ assertEquals(15, retrievedItems.size());
+ }
+
+ private static Row createKeyValueRow(long id, int value, String str) {
+ RowAssembler rowBuilder = new RowAssembler(SCHEMA_1, false, -1);
+
+ rowBuilder.appendLong(id);
+ rowBuilder.appendInt(value);
+ rowBuilder.appendString(str);
+
+ return new Row(SCHEMA_1, rowBuilder.build());
+ }
+
+ private static Row createKeyRow(long id) {
+ RowAssembler rowBuilder = RowAssembler.keyAssembler(SCHEMA_1);
+
+ rowBuilder.appendLong(id);
+
+ return new Row(SCHEMA_1, rowBuilder.build());
+ }
+
+ private static void putValue(KeyValueView<Tuple, Tuple> kv, int val) {
+ putValue(kv, val, null);
+ }
+
+ private static void putValue(KeyValueView<Tuple, Tuple> kv, int val,
Transaction tx) {
+ Tuple tableKey = Tuple.create().set("key", Long.valueOf(val % 100));
+
+ Tuple value = Tuple.create().set("valInt",
Integer.valueOf(val)).set("valStr", "some string row" + val);
+
+ kv.put(tx, tableKey, value);
+ }
+
+ private static void populateData(Ignite node, KeyValueView<Tuple, Tuple>
keyValueView, boolean implicit) {
+ if (implicit) {
+ for (int i = 0; i < 15; i++) {
+ putValue(keyValueView, i);
+ }
+ } else {
+ Transaction tx1 = node.transactions().begin();
+
+ for (int i = 0; i < 15; i++) {
+ putValue(keyValueView, i, tx1);
+ }
+
+ tx1.commit();
+ }
+ }
+
+ private static Table startTable(Ignite node, String tableName) {
+ List<ColumnDefinition> cols = new ArrayList<>();
+ cols.add(SchemaBuilders.column("key", ColumnType.INT64).build());
+ cols.add(SchemaBuilders.column("valInt",
ColumnType.INT32).asNullable(true).build());
+ cols.add(SchemaBuilders.column("valStr",
ColumnType.string()).withDefaultValue("default").build());
+
+ return await(((TableManager) node.tables()).createTableAsync(
+ tableName,
+ tblCh -> convert(SchemaBuilders.tableBuilder(SCHEMA,
tableName).columns(
+ cols).withPrimaryKey("key").build(), tblCh)
+ .changePartitions(1)
+ ));
+ }
+
+ private static void stopTable(Ignite node, String tableName) {
+ await(((TableManager) node.tables()).dropTableAsync(tableName));
+ }
+
+ protected static int nodes() {
+ return 1;
+ }
+
+ protected static IgniteImpl node() {
+ return (IgniteImpl) NODE;
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f579c56f2f..714198a2eb 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -971,4 +971,14 @@ public class IgniteImpl implements Ignite {
public void stopDroppingMessages() {
((DefaultMessagingService)
clusterSvc.messagingService()).stopDroppingMessages();
}
+
+ /**
+ * Returns the node's hybrid clock.
+ *
+ * @return Hybrid clock.
+ */
+ @TestOnly
+ public HybridClock clock() {
+ return clock;
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index eb15ddb1a1..fe50db631e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -396,7 +396,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
- CompletableFuture<Void> safeReadFuture = isPrimary ?
completedFuture(null) : safeTime.waitFor(readTimestamp);
+ CompletableFuture<Void> safeReadFuture =
isPrimaryInTimestamp(isPrimary, readTimestamp) ? completedFuture(null)
+ : safeTime.waitFor(readTimestamp);
if (request.indexToUse() != null) {
TableSchemaAwareIndexStorage indexStorage =
secondaryIndexStorages.get().get(request.indexToUse());
@@ -486,11 +487,23 @@ public class PartitionReplicaListener implements
ReplicaListener {
format("Unknown single request [actionType={}]",
request.requestType()));
}
- CompletableFuture<Void> safeReadFuture = isPrimary ?
completedFuture(null) : safeTime.waitFor(request.readTimestamp());
+ CompletableFuture<Void> safeReadFuture =
isPrimaryInTimestamp(isPrimary, readTimestamp) ? completedFuture(null)
+ : safeTime.waitFor(request.readTimestamp());
return safeReadFuture.thenCompose(unused ->
resolveRowByPkForReadOnly(searchRow, readTimestamp));
}
+ /**
+ * Checks that the node is primary and {@code timestamp} is already passed
in the reference system of the current node.
+ *
+ * @param isPrimary True if the node is primary, false otherwise.
+ * @param timestamp Timestamp to check.
+ * @return True if the timestamp is already passed in the reference system
of the current node and node is primary, false otherwise.
+ */
+ private boolean isPrimaryInTimestamp(Boolean isPrimary, HybridTimestamp
timestamp) {
+ return isPrimary && hybridClock.now().compareTo(timestamp) > 0;
+ }
+
/**
* Processes multiple entries request for read only transaction.
*
@@ -510,7 +523,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
format("Unknown single request [actionType={}]",
request.requestType()));
}
- CompletableFuture<Void> safeReadFuture = isPrimary ?
completedFuture(null) : safeTime.waitFor(request.readTimestamp());
+ CompletableFuture<Void> safeReadFuture =
isPrimaryInTimestamp(isPrimary, readTimestamp) ? completedFuture(null)
+ : safeTime.waitFor(request.readTimestamp());
return safeReadFuture.thenCompose(unused -> {
ArrayList<CompletableFuture<BinaryRow>> resolutionFuts = new
ArrayList<>(searchRows.size());
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 71cbce1d0f..271a3a73bd 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -36,6 +36,8 @@ import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.WriteCommand;
@@ -85,6 +87,8 @@ import org.jetbrains.annotations.Nullable;
* Dummy table storage implementation.
*/
public class DummyInternalTableImpl extends InternalTableImpl {
+ private static final IgniteLogger LOG =
Loggers.forClass(DummyInternalTableImpl.class);
+
public static final NetworkAddress ADDR = new NetworkAddress("127.0.0.1",
2004);
private static final int PART_ID = 0;
@@ -95,6 +99,8 @@ public class DummyInternalTableImpl extends InternalTableImpl
{
new Column[]{new Column("value", NativeTypes.INT64, false)}
);
+ private static final HybridClock CLOCK = new HybridClockImpl();
+
private static final ReplicationGroupId crossTableGroupId = new
TablePartitionId(UUID.randomUUID(), 0);
private PartitionListener partitionListener;
@@ -103,6 +109,9 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
private ReplicationGroupId groupId;
+ /** The thread updates safe time on the dummy replica. */
+ private Thread safeTimeUpdaterThread;
+
/**
* Creates a new local table.
*
@@ -172,11 +181,11 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
Int2ObjectMaps.singleton(PART_ID,
mock(RaftGroupService.class)),
1,
name -> mock(ClusterNode.class),
- txManager == null ? new TxManagerImpl(replicaSvc, new
HeapLockManager(), new HybridClockImpl()) : txManager,
+ txManager == null ? new TxManagerImpl(replicaSvc, new
HeapLockManager(), CLOCK) : txManager,
mock(MvTableStorage.class),
new TestTxStateTableStorage(),
replicaSvc,
- new HybridClockImpl()
+ CLOCK
);
RaftGroupService svc = partitionMap.get(0);
@@ -256,7 +265,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
IndexLocker pkLocker = new HashIndexLocker(indexId, true,
this.txManager.lockManager(), row2Tuple);
- HybridClock clock = new HybridClockImpl();
PendingComparableValuesTracker<HybridTimestamp> safeTime = new
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartStorage);
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () ->
Map.of(pkStorage.get().id(), pkStorage.get());
@@ -281,7 +289,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
() -> Map.of(pkLocker.id(), pkLocker),
pkStorage,
() -> Map.of(),
- clock,
+ CLOCK,
safeTime,
txStateStorage().getOrCreateTxStateStorage(PART_ID),
placementDriver,
@@ -296,6 +304,34 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
txStateStorage().getOrCreateTxStateStorage(PART_ID),
safeTime
);
+
+ safeTimeUpdaterThread = new Thread(new SafeTimeUpdater(safeTime),
"safe-time-updater");
+
+ safeTimeUpdaterThread.start();
+ }
+
+ /**
+ * A process to update safe time periodically.
+ */
+ private static class SafeTimeUpdater implements Runnable {
+ PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+ public SafeTimeUpdater(PendingComparableValuesTracker<HybridTimestamp>
safeTime) {
+ this.safeTime = safeTime;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ safeTime.update(CLOCK.now());
+
+ try {
+ Thread.sleep(1_000);
+ } catch (InterruptedException e) {
+ LOG.warn("The sfe time updater thread is interrupted");
+ }
+ }
+ }
}
/**
@@ -354,4 +390,15 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
public CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int
partId) {
return CompletableFuture.completedFuture(mock(ClusterNode.class));
}
+
+ @Override
+ public void close() {
+ super.close();
+
+ if (safeTimeUpdaterThread != null) {
+ safeTimeUpdaterThread.interrupt();
+
+ safeTimeUpdaterThread = null;
+ }
+ }
}