This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 22f6d0d1d3 IGNITE-19381 TimestampAware messages sometimes lacks
timestamps (#2083)
22f6d0d1d3 is described below
commit 22f6d0d1d3c74f40af39ca5f540ccc73abddbae4
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Sat May 20 00:47:38 2023 +0400
IGNITE-19381 TimestampAware messages sometimes lacks timestamps (#2083)
---
.../replicator/message/TimestampAware.java | 8 ++-
.../ignite/distributed/ReplicaUnavailableTest.java | 7 ++-
.../table/distributed/SortedIndexLocker.java | 60 +++++-----------------
.../distributed/storage/InternalTableImpl.java | 5 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 2 +
5 files changed, 25 insertions(+), 57 deletions(-)
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java
index 36f2fbc455..55e0e0e335 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java
@@ -17,11 +17,10 @@
package org.apache.ignite.internal.replicator.message;
-import static
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.network.NetworkMessage;
-import org.jetbrains.annotations.Nullable;
/**
* Message with a timestamp to adjust a hybrid logical clock.
@@ -39,8 +38,7 @@ public interface TimestampAware extends NetworkMessage {
*
* @return Gets a hybrid timestamp.
*/
- //TODO IGNITE-19381 Remove @Nullable annotation and replace
"nullableHybridTimestamp(...)" call with "hybridTimestamp(...)"
- default @Nullable HybridTimestamp timestamp() {
- return nullableHybridTimestamp(timestampLong());
+ default HybridTimestamp timestamp() {
+ return hybridTimestamp(timestampLong());
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 746e268729..98b5fe10c6 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
@@ -81,6 +82,8 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
private final ReplicaMessagesFactory replicaMessageFactory = new
ReplicaMessagesFactory();
+ private final HybridClock clock = new HybridClockImpl();
+
private final TestInfo testInfo;
private ReplicaService replicaService;
@@ -101,8 +104,6 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
clusterService = startNode(testInfo, name, NODE_PORT_BASE + 1,
nodeFinder);
- HybridClock clock = mock(HybridClock.class);
-
replicaService = new ReplicaService(clusterService.messagingService(),
clock);
var cmgManager = mock(ClusterManagementGroupManager.class);
@@ -135,6 +136,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
ReadWriteSingleRowReplicaRequest request =
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(tablePartitionId)
+ .timestampLong(clock.nowLong())
.binaryRow(createKeyValueRow(1L, 1L))
.requestType(RequestType.RW_GET)
.build();
@@ -174,6 +176,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
ReadWriteSingleRowReplicaRequest request =
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(tablePartitionId)
+ .timestampLong(clock.nowLong())
.binaryRow(createKeyValueRow(1L, 1L))
.requestType(RequestType.RW_GET)
.build();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
index e82347efbb..c70933aed3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
@@ -20,14 +20,9 @@ package org.apache.ignite.internal.table.distributed;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.BinaryTupleSchema;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.PeekCursor;
@@ -45,18 +40,8 @@ import org.jetbrains.annotations.Nullable;
* <p>Simply acquires lock on a given row for lookup and remove, acquires lock
on a next key for insert.
*/
public class SortedIndexLocker implements IndexLocker {
-
- private static final SchemaDescriptor INFINITY_TUPLE_SCHEMA = new
SchemaDescriptor(
- 1,
- new Column[]{
- new Column("indexId", NativeTypes.UUID, false),
- new Column("partId", NativeTypes.INT32, false)
- },
- new Column[0]
- );
-
/** Index INF+ value object. */
- private final BinaryTuple positiveInf;
+ private final Object positiveInf;
private final UUID indexId;
private final LockManager lockManager;
@@ -82,27 +67,7 @@ public class SortedIndexLocker implements IndexLocker {
this.lockManager = lockManager;
this.storage = storage;
this.indexRowResolver = indexRowResolver;
-
- this.positiveInf = createInfiniteBoundary(partId, indexId);
- }
-
- /**
- * Creates a tuple for positive infinity boundary.
- *
- * @param partId Partition id.
- * @param indexId Index id.
- * @return Infinity binary tuple.
- */
- private static BinaryTuple createInfiniteBoundary(int partId, UUID
indexId) {
- var binarySchema =
BinaryTupleSchema.createSchema(INFINITY_TUPLE_SCHEMA, new int[]{
- INFINITY_TUPLE_SCHEMA.column("indexId").schemaIndex(),
- INFINITY_TUPLE_SCHEMA.column("partId").schemaIndex()
- });
-
- return new BinaryTuple(
- binarySchema,
- new BinaryTupleBuilder(binarySchema.elementCount(),
false).appendUuid(indexId).appendInt(partId).build()
- );
+ this.positiveInf = Integer.valueOf(partId);
}
/** {@inheritDoc} */
@@ -150,7 +115,7 @@ public class SortedIndexLocker implements IndexLocker {
private CompletableFuture<IndexRow> acquireLockNextKey(UUID txId,
PeekCursor<IndexRow> peekCursor) {
IndexRow peekedRow = peekCursor.peek();
- LockKey lockKey = new LockKey(indexId,
indexKey(peekedRow).byteBuffer());
+ LockKey lockKey = new LockKey(indexId, indexKey(peekedRow));
return lockManager.acquire(txId, lockKey, LockMode.S)
.thenCompose(ignore -> {
@@ -174,8 +139,8 @@ public class SortedIndexLocker implements IndexLocker {
return row0 == row1 || row0.rowId().equals(row1.rowId());
}
- private BinaryTuple indexKey(@Nullable IndexRow indexRow) {
- return (indexRow == null) ? positiveInf : indexRow.indexColumns();
+ private Object indexKey(@Nullable IndexRow indexRow) {
+ return (indexRow == null) ? positiveInf :
indexRow.indexColumns().byteBuffer();
}
/** {@inheritDoc} */
@@ -185,17 +150,16 @@ public class SortedIndexLocker implements IndexLocker {
BinaryTuplePrefix prefix = BinaryTuplePrefix.fromBinaryTuple(key);
- // Find next key.
- Cursor<IndexRow> cursor = storage.scan(prefix, null,
SortedIndexStorage.GREATER);
+ IndexRow nextRow = null;
- BinaryTuple nextKey;
- if (cursor.hasNext()) {
- nextKey = cursor.next().indexColumns();
- } else { // Otherwise INF.
- nextKey = positiveInf;
+ // Find next key.
+ try (Cursor<IndexRow> cursor = storage.scan(prefix, null,
SortedIndexStorage.GREATER)) {
+ if (cursor.hasNext()) {
+ nextRow = cursor.next();
+ }
}
- var nextLockKey = new LockKey(indexId, nextKey.byteBuffer());
+ var nextLockKey = new LockKey(indexId, indexKey(nextRow));
return lockManager.acquire(txId, nextLockKey,
LockMode.IX).thenCompose(shortLock ->
lockManager.acquire(txId, new LockKey(indexId,
key.byteBuffer()), LockMode.X).thenApply((lock) ->
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index d037a88baf..226b51c8d3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -380,6 +380,7 @@ public class InternalTableImpl implements InternalTable {
ReadWriteScanRetrieveBatchReplicaRequestBuilder requestBuilder =
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
.groupId(partGroupId)
+ .timestampLong(clock.nowLong())
.transactionId(tx.id())
.scanId(scanId)
.indexToUse(indexId)
@@ -388,8 +389,7 @@ public class InternalTableImpl implements InternalTable {
.upperBound(upperBound)
.flags(flags)
.columnsToInclude(columnsToInclude)
- .batchSize(batchSize)
- .timestampLong(clock.nowLong());
+ .batchSize(batchSize);
if (primaryReplicaAndTerm != null) {
ReadWriteScanRetrieveBatchReplicaRequest request =
requestBuilder.term(primaryReplicaAndTerm.get2()).build();
@@ -1011,6 +1011,7 @@ public class InternalTableImpl implements InternalTable {
ReadWriteScanRetrieveBatchReplicaRequest request =
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
.groupId(partGroupId)
+ .timestampLong(clock.nowLong())
.transactionId(txId)
.scanId(scanId)
.indexToUse(indexId)
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 16f5235e10..03af7753b0 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -179,6 +179,7 @@ public class TxManagerImpl implements TxManager {
TxFinishReplicaRequest req = FACTORY.txFinishReplicaRequest()
.txId(txId)
+ .timestampLong(clock.nowLong())
.groupId(commitPartition)
.groups(groups)
.commit(commit)
@@ -207,6 +208,7 @@ public class TxManagerImpl implements TxManager {
recipientNode,
FACTORY.txCleanupReplicaRequest()
.groupId(tablePartitionIds.get(i).get1())
+ .timestampLong(clock.nowLong())
.txId(txId)
.commit(commit)
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))