This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e82e9043acf IGNITE-24119 Fix getAll in explicit RO transaction (#6813)
e82e9043acf is described below
commit e82e9043acf9a4e380a043601edec37154f391b3
Author: Mikhail Efremov <[email protected]>
AuthorDate: Mon Oct 20 18:47:28 2025 +0600
IGNITE-24119 Fix getAll in explicit RO transaction (#6813)
---
.../ignite/internal/table/InternalTable.java | 5 +-
.../distributed/storage/InternalTableImpl.java | 16 ++---
.../ItMultiGetInExplicitReadOnlyTxTest.java | 72 ++++++++++++++++++++++
.../readonly/ItReadOnlyTxAndLowWatermarkTest.java | 10 ---
4 files changed, 84 insertions(+), 19 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 44bdbadf5c9..1f9ab422675 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -173,7 +173,8 @@ public interface InternalTable extends ManuallyCloseable {
* @param readTimestamp Read timestamp.
* @param transactionId Transaction ID (might be {@code null}).
* @param coordinatorId Ephemeral ID of the transaction coordinator.
- * @param recipientNode Cluster node that will handle given get request.
+ * @param recipientNode Cluster node that will handle given getAll
request. In case if given node is {@code null} then for each
+ * partition inside of the method recipient node will be calculated
separately.
* @return Future that will return rows with all columns filled from the
table. The order of collection elements is
* guaranteed to be the same as the order of {@code keyRows}. If a
record does not exist, the
* element at the corresponding index of the resulting collection is
{@code null}.
@@ -183,7 +184,7 @@ public interface InternalTable extends ManuallyCloseable {
HybridTimestamp readTimestamp,
@Nullable UUID transactionId,
@Nullable UUID coordinatorId,
- InternalClusterNode recipientNode
+ @Nullable InternalClusterNode recipientNode
);
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index bd2e079ec7d..530f397a6b7 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -1035,10 +1035,7 @@ public class InternalTableImpl implements InternalTable {
if (tx != null && tx.isReadOnly()) {
assert !tx.implicit() : "implicit RO getAll not supported";
- BinaryRowEx firstRow = keyRows.iterator().next();
-
- return evaluateReadOnlyRecipientNode(partitionId(firstRow),
tx.readTimestamp())
- .thenCompose(recipientNode -> getAll(keyRows,
tx.readTimestamp(), tx.id(), tx.coordinatorId(), recipientNode));
+ return getAll(keyRows, tx.readTimestamp(), tx.id(),
tx.coordinatorId(), null);
}
return enlistInTx(
@@ -1058,12 +1055,14 @@ public class InternalTableImpl implements InternalTable
{
HybridTimestamp readTimestamp,
@Nullable UUID transactionId,
@Nullable UUID coordinatorId,
- InternalClusterNode recipientNode
+ @Nullable InternalClusterNode recipientNode
) {
Int2ObjectMap<RowBatch> rowBatchByPartitionId =
toRowBatchByPartitionId(keyRows);
for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch :
rowBatchByPartitionId.int2ObjectEntrySet()) {
- ReplicationGroupId replicationGroupId =
targetReplicationGroupId(partitionRowBatch.getIntKey());
+ int partitionId = partitionRowBatch.getIntKey();
+
+ ReplicationGroupId replicationGroupId =
targetReplicationGroupId(partitionId);
ReadOnlyMultiRowPkReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
.groupId(serializeReplicationGroupId(replicationGroupId))
@@ -1076,7 +1075,10 @@ public class InternalTableImpl implements InternalTable {
.coordinatorId(coordinatorId)
.build();
- partitionRowBatch.getValue().resultFuture =
replicaSvc.invoke(recipientNode, request);
+ partitionRowBatch.getValue().resultFuture = recipientNode != null
+ ? replicaSvc.invoke(recipientNode, request)
+ : evaluateReadOnlyRecipientNode(partitionId, readTimestamp)
+ .thenCompose(targetNode ->
replicaSvc.invoke(targetNode, request));
}
return
collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values());
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItMultiGetInExplicitReadOnlyTxTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItMultiGetInExplicitReadOnlyTxTest.java
new file mode 100644
index 00000000000..ba2cac699cd
--- /dev/null
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItMultiGetInExplicitReadOnlyTxTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.readonly;
+
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.Test;
+
+class ItMultiGetInExplicitReadOnlyTxTest extends ClusterPerTestIntegrationTest
{
+ private static final String TABLE_NAME = "TEST_TABLE";
+
+ private static final int KEY_COUNT = 100;
+
+ @Override
+ protected int initialNodes() {
+ return 2;
+ }
+
+ @Test
+ void roTransactionWithGetAllOperation() {
+ assertEquals(2, cluster.nodes().size());
+
+ Ignite coordinator = node(0);
+
+ coordinator.sql().executeScript("CREATE ZONE NEW_ZONE (PARTITIONS 2,
REPLICAS 1) STORAGE PROFILES ['default']");
+
+ coordinator.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID
INT PRIMARY KEY, VAL VARCHAR) ZONE NEW_ZONE");
+
+ KeyValueView<Integer, String> kvView =
coordinator.tables().table(TABLE_NAME).keyValueView(Integer.class,
String.class);
+
+ insertOriginalValues(KEY_COUNT, kvView);
+
+ Transaction roTx = coordinator.transactions().begin(new
TransactionOptions().readOnly(true));
+
+ List<Integer> keys = IntStream.range(0,
KEY_COUNT).boxed().collect(toList());
+
+ Map<Integer, String> getAllResult = assertDoesNotThrow(() ->
kvView.getAll(roTx, keys));
+
+ assertEquals(KEY_COUNT, getAllResult.size());
+ }
+
+ private static void insertOriginalValues(int keyCount,
KeyValueView<Integer, String> kvView) {
+ for (int i = 0; i < keyCount; i++) {
+ kvView.put(null, i, "original-" + i);
+ }
+ }
+}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxAndLowWatermarkTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxAndLowWatermarkTest.java
index b7b4deec6fc..9f93f49d93a 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxAndLowWatermarkTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxAndLowWatermarkTest.java
@@ -32,7 +32,6 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assumptions.assumeFalse;
import java.util.HashSet;
import java.util.List;
@@ -103,9 +102,6 @@ class ItReadOnlyTxAndLowWatermarkTest extends
ClusterPerTestIntegrationTest {
@ParameterizedTest
@EnumSource(TransactionalReader.class)
void
roTransactionNoticesTupleVersionsMissingDueToGcOnDataNodes(TransactionalReader
reader) throws Exception {
- // TODO: remove the assumption when IGNITE-24119 is fixed.
- assumeFalse(reader == TransactionalReader.MULTI_GET);
-
updateDataAvailabilityTimeToShortPeriod();
Ignite coordinator = node(0);
@@ -185,9 +181,6 @@ class ItReadOnlyTxAndLowWatermarkTest extends
ClusterPerTestIntegrationTest {
@Enum(TransactionalReader.class) TransactionalReader reader,
@Values(booleans = {true, false}) boolean commit
) throws Exception {
- // TODO: remove the assumption when IGNITE-24119 is fixed.
- assumeFalse(reader == TransactionalReader.MULTI_GET);
-
Ignite coordinator = node(0);
KeyValueView<Integer, String> kvView = kvView(coordinator);
@@ -230,9 +223,6 @@ class ItReadOnlyTxAndLowWatermarkTest extends
ClusterPerTestIntegrationTest {
@EnumSource(TransactionalReader.class)
@WithSystemProperty(key =
ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value =
"100")
void
nonFinishedRoTransactionsOfCoordinatorsThatLeftDontHoldLwm(TransactionalReader
reader) throws Exception {
- // TODO: remove the assumption when IGNITE-24119 is fixed.
- assumeFalse(reader == TransactionalReader.MULTI_GET);
-
Ignite coordinator = node(1);
KeyValueView<Integer, String> kvView = kvView(coordinator);