This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 22f6d0d1d3 IGNITE-19381 TimestampAware messages sometimes lacks 
timestamps (#2083)
22f6d0d1d3 is described below

commit 22f6d0d1d3c74f40af39ca5f540ccc73abddbae4
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Sat May 20 00:47:38 2023 +0400

    IGNITE-19381 TimestampAware messages sometimes lacks timestamps (#2083)
---
 .../replicator/message/TimestampAware.java         |  8 ++-
 .../ignite/distributed/ReplicaUnavailableTest.java |  7 ++-
 .../table/distributed/SortedIndexLocker.java       | 60 +++++-----------------
 .../distributed/storage/InternalTableImpl.java     |  5 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  2 +
 5 files changed, 25 insertions(+), 57 deletions(-)

diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java
index 36f2fbc455..55e0e0e335 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java
@@ -17,11 +17,10 @@
 
 package org.apache.ignite.internal.replicator.message;
 
-import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.network.NetworkMessage;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Message with a timestamp to adjust a hybrid logical clock.
@@ -39,8 +38,7 @@ public interface TimestampAware extends NetworkMessage {
      *
      * @return Gets a hybrid timestamp.
      */
-    //TODO IGNITE-19381 Remove @Nullable annotation and replace 
"nullableHybridTimestamp(...)" call with "hybridTimestamp(...)"
-    default @Nullable HybridTimestamp timestamp() {
-        return nullableHybridTimestamp(timestampLong());
+    default HybridTimestamp timestamp() {
+        return hybridTimestamp(timestampLong());
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 746e268729..98b5fe10c6 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
@@ -81,6 +82,8 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
 
     private final ReplicaMessagesFactory replicaMessageFactory = new 
ReplicaMessagesFactory();
 
+    private final HybridClock clock = new HybridClockImpl();
+
     private final TestInfo testInfo;
 
     private ReplicaService replicaService;
@@ -101,8 +104,6 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
 
         clusterService = startNode(testInfo, name, NODE_PORT_BASE + 1, 
nodeFinder);
 
-        HybridClock clock = mock(HybridClock.class);
-
         replicaService = new ReplicaService(clusterService.messagingService(), 
clock);
 
         var cmgManager = mock(ClusterManagementGroupManager.class);
@@ -135,6 +136,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
 
         ReadWriteSingleRowReplicaRequest request = 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                 .groupId(tablePartitionId)
+                .timestampLong(clock.nowLong())
                 .binaryRow(createKeyValueRow(1L, 1L))
                 .requestType(RequestType.RW_GET)
                 .build();
@@ -174,6 +176,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
 
         ReadWriteSingleRowReplicaRequest request = 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
                 .groupId(tablePartitionId)
+                .timestampLong(clock.nowLong())
                 .binaryRow(createKeyValueRow(1L, 1L))
                 .requestType(RequestType.RW_GET)
                 .build();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
index e82347efbb..c70933aed3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
@@ -20,14 +20,9 @@ package org.apache.ignite.internal.table.distributed;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.BinaryTupleSchema;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.PeekCursor;
@@ -45,18 +40,8 @@ import org.jetbrains.annotations.Nullable;
  * <p>Simply acquires lock on a given row for lookup and remove, acquires lock 
on a next key for insert.
  */
 public class SortedIndexLocker implements IndexLocker {
-
-    private static final SchemaDescriptor INFINITY_TUPLE_SCHEMA = new 
SchemaDescriptor(
-            1,
-            new Column[]{
-                    new Column("indexId", NativeTypes.UUID, false),
-                    new Column("partId", NativeTypes.INT32, false)
-            },
-            new Column[0]
-    );
-
     /** Index INF+ value object. */
-    private final BinaryTuple positiveInf;
+    private final Object positiveInf;
 
     private final UUID indexId;
     private final LockManager lockManager;
@@ -82,27 +67,7 @@ public class SortedIndexLocker implements IndexLocker {
         this.lockManager = lockManager;
         this.storage = storage;
         this.indexRowResolver = indexRowResolver;
-
-        this.positiveInf = createInfiniteBoundary(partId, indexId);
-    }
-
-    /**
-     * Creates a tuple for positive infinity boundary.
-     *
-     * @param partId Partition id.
-     * @param indexId Index id.
-     * @return Infinity binary tuple.
-     */
-    private static BinaryTuple createInfiniteBoundary(int partId, UUID 
indexId) {
-        var binarySchema = 
BinaryTupleSchema.createSchema(INFINITY_TUPLE_SCHEMA, new int[]{
-                INFINITY_TUPLE_SCHEMA.column("indexId").schemaIndex(),
-                INFINITY_TUPLE_SCHEMA.column("partId").schemaIndex()
-        });
-
-        return new BinaryTuple(
-                binarySchema,
-                new BinaryTupleBuilder(binarySchema.elementCount(), 
false).appendUuid(indexId).appendInt(partId).build()
-        );
+        this.positiveInf = Integer.valueOf(partId);
     }
 
     /** {@inheritDoc} */
@@ -150,7 +115,7 @@ public class SortedIndexLocker implements IndexLocker {
     private CompletableFuture<IndexRow> acquireLockNextKey(UUID txId, 
PeekCursor<IndexRow> peekCursor) {
         IndexRow peekedRow = peekCursor.peek();
 
-        LockKey lockKey = new LockKey(indexId, 
indexKey(peekedRow).byteBuffer());
+        LockKey lockKey = new LockKey(indexId, indexKey(peekedRow));
 
         return lockManager.acquire(txId, lockKey, LockMode.S)
                 .thenCompose(ignore -> {
@@ -174,8 +139,8 @@ public class SortedIndexLocker implements IndexLocker {
         return row0 == row1 || row0.rowId().equals(row1.rowId());
     }
 
-    private BinaryTuple indexKey(@Nullable IndexRow indexRow) {
-        return (indexRow == null) ? positiveInf : indexRow.indexColumns();
+    private Object indexKey(@Nullable IndexRow indexRow) {
+        return (indexRow == null) ? positiveInf : 
indexRow.indexColumns().byteBuffer();
     }
 
     /** {@inheritDoc} */
@@ -185,17 +150,16 @@ public class SortedIndexLocker implements IndexLocker {
 
         BinaryTuplePrefix prefix = BinaryTuplePrefix.fromBinaryTuple(key);
 
-        // Find next key.
-        Cursor<IndexRow> cursor = storage.scan(prefix, null, 
SortedIndexStorage.GREATER);
+        IndexRow nextRow = null;
 
-        BinaryTuple nextKey;
-        if (cursor.hasNext()) {
-            nextKey = cursor.next().indexColumns();
-        } else { // Otherwise INF.
-            nextKey = positiveInf;
+        // Find next key.
+        try (Cursor<IndexRow> cursor = storage.scan(prefix, null, 
SortedIndexStorage.GREATER)) {
+            if (cursor.hasNext()) {
+                nextRow = cursor.next();
+            }
         }
 
-        var nextLockKey = new LockKey(indexId, nextKey.byteBuffer());
+        var nextLockKey = new LockKey(indexId, indexKey(nextRow));
 
         return lockManager.acquire(txId, nextLockKey, 
LockMode.IX).thenCompose(shortLock ->
                 lockManager.acquire(txId, new LockKey(indexId, 
key.byteBuffer()), LockMode.X).thenApply((lock) ->
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index d037a88baf..226b51c8d3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -380,6 +380,7 @@ public class InternalTableImpl implements InternalTable {
 
         ReadWriteScanRetrieveBatchReplicaRequestBuilder requestBuilder = 
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(partGroupId)
+                .timestampLong(clock.nowLong())
                 .transactionId(tx.id())
                 .scanId(scanId)
                 .indexToUse(indexId)
@@ -388,8 +389,7 @@ public class InternalTableImpl implements InternalTable {
                 .upperBound(upperBound)
                 .flags(flags)
                 .columnsToInclude(columnsToInclude)
-                .batchSize(batchSize)
-                .timestampLong(clock.nowLong());
+                .batchSize(batchSize);
 
         if (primaryReplicaAndTerm != null) {
             ReadWriteScanRetrieveBatchReplicaRequest request = 
requestBuilder.term(primaryReplicaAndTerm.get2()).build();
@@ -1011,6 +1011,7 @@ public class InternalTableImpl implements InternalTable {
 
                     ReadWriteScanRetrieveBatchReplicaRequest request = 
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
                             .groupId(partGroupId)
+                            .timestampLong(clock.nowLong())
                             .transactionId(txId)
                             .scanId(scanId)
                             .indexToUse(indexId)
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 16f5235e10..03af7753b0 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -179,6 +179,7 @@ public class TxManagerImpl implements TxManager {
 
         TxFinishReplicaRequest req = FACTORY.txFinishReplicaRequest()
                 .txId(txId)
+                .timestampLong(clock.nowLong())
                 .groupId(commitPartition)
                 .groups(groups)
                 .commit(commit)
@@ -207,6 +208,7 @@ public class TxManagerImpl implements TxManager {
                     recipientNode,
                     FACTORY.txCleanupReplicaRequest()
                             .groupId(tablePartitionIds.get(i).get1())
+                            .timestampLong(clock.nowLong())
                             .txId(txId)
                             .commit(commit)
                             
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))

Reply via email to