This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 09bd408a11 IGNITE-20715 Check that versions of tuples sent to
PartitionReplicaListener match tx-bound schema version (#2761)
09bd408a11 is described below
commit 09bd408a11252d821b155d00917ea10d22847110
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Oct 31 14:41:58 2023 +0400
IGNITE-20715 Check that versions of tuples sent to PartitionReplicaListener
match tx-bound schema version (#2761)
---
.../ignite/internal/replicator/ReplicaManager.java | 11 +-
.../exception/ExpectedReplicationException.java | 25 ++++
.../ignite/internal/table/AbstractTableView.java | 29 +++-
.../internal/table/RecordBinaryViewImpl.java | 2 -
.../InternalSchemaVersionMismatchException.java | 36 +++++
.../replicator/PartitionReplicaListener.java | 134 ++++++++++++------
.../replicator/SchemaCompatValidator.java | 19 +++
.../table/KeyValueBinaryViewOperationsTest.java | 34 +++++
.../internal/table/KeyValueViewOperationsTest.java | 129 +++++++++++------
.../table/RecordBinaryViewOperationsTest.java | 117 ++++++++--------
.../internal/table/RecordViewOperationsTest.java | 123 ++++++++++------
.../PartitionReplicaListenerIndexLockingTest.java | 6 +-
.../replication/PartitionReplicaListenerTest.java | 155 +++++++++++++++++----
.../apache/ignite/distributed/ItTxTestCluster.java | 10 +-
14 files changed, 613 insertions(+), 217 deletions(-)
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index c9ff648060..01e6d13e5c 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -50,6 +50,7 @@ import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessage
import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import
org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import
org.apache.ignite.internal.replicator.exception.ExpectedReplicationException;
import
org.apache.ignite.internal.replicator.exception.ReplicaIsAlreadyStartedException;
import
org.apache.ignite.internal.replicator.exception.ReplicaStoppingException;
import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
@@ -291,7 +292,11 @@ public class ReplicaManager implements IgniteComponent {
if (ex == null) {
msg = prepareReplicaResponse(sendTimestamp, res.result());
} else {
- LOG.warn("Failed to process replica request [request={}]",
ex, request);
+ if (indicatesUnexpectedProblem(ex)) {
+ LOG.warn("Failed to process replica request
[request={}]", ex, request);
+ } else {
+ LOG.debug("Failed to process replica request
[request={}]", ex, request);
+ }
msg = prepareReplicaErrorResponse(sendTimestamp, ex);
}
@@ -338,6 +343,10 @@ public class ReplicaManager implements IgniteComponent {
}
}
+ private static boolean indicatesUnexpectedProblem(Throwable ex) {
+ return !(ex instanceof ExpectedReplicationException);
+ }
+
/**
* Checks this exception is caused of timeout or connectivity issue.
*
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ExpectedReplicationException.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ExpectedReplicationException.java
new file mode 100644
index 0000000000..814ba8cb64
--- /dev/null
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ExpectedReplicationException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.exception;
+
+/**
+ * Marker interface for exceptions that are used in the replication protocol
normally (i.e. they don't indicate errors that need
+ * to be logged, for example).
+ */
+public interface ExpectedReplicationException {
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 6b6187bdf4..0a3304053e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -17,13 +17,17 @@
package org.apache.ignite.internal.table;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.function.Function.identity;
import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
import org.apache.ignite.internal.schema.SchemaRegistry;
+import
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -95,17 +99,38 @@ abstract class AbstractTableView {
* @return Whatever the action returns.
*/
protected final <T> CompletableFuture<T> withSchemaSync(@Nullable
Transaction tx, KvAction<T> action) {
- // TODO: IGNITE-20106 - retry if our request is rejected by the server
due to a changed schema version.
+ return withSchemaSync(tx, null, action);
+ }
+ private <T> CompletableFuture<T> withSchemaSync(@Nullable Transaction tx,
@Nullable Integer previousSchemaVersion, KvAction<T> action) {
CompletableFuture<Integer> schemaVersionFuture = tx == null
? schemaVersions.schemaVersionAtNow(tbl.tableId())
: schemaVersions.schemaVersionAt(((InternalTransaction)
tx).startTimestamp(), tbl.tableId());
- CompletableFuture<T> future =
schemaVersionFuture.thenCompose(action::act);
+ CompletableFuture<T> future = schemaVersionFuture
+ .thenCompose(schemaVersion -> action.act(schemaVersion)
+ .handle((res, ex) -> {
+ if
(isOrCausedBy(InternalSchemaVersionMismatchException.class, ex)) {
+ assert tx == null : "Only for implicit
transactions a retry might be requested";
+ assert previousSchemaVersion == null ||
!Objects.equals(schemaVersion, previousSchemaVersion)
+ : "Same schema version (" +
schemaVersion
+ + ") on a retry: something is
wrong, is this caused by the test setup?";
+
+ // Repeat.
+ return withSchemaSync(tx, schemaVersion,
action);
+ } else {
+ return ex == null ? completedFuture(res) :
CompletableFuture.<T>failedFuture(ex);
+ }
+ }))
+ .thenCompose(identity());
return convertToPublicFuture(future);
}
+ private static boolean isOrCausedBy(Class<? extends Exception>
exceptionClass, @Nullable Throwable ex) {
+ return ex != null && (exceptionClass.isInstance(ex) ||
isOrCausedBy(exceptionClass, ex.getCause()));
+ }
+
/**
* Action representing some KV operation. When executed, the action is
supplied with schema version corresponding
* to the operation timestamp (see {@link #withSchemaSync(Transaction,
KvAction)} for details).
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 7284c12a2b..7ee855b2a3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -40,7 +40,6 @@ import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
/**
* Table view implementation for binary objects.
@@ -84,7 +83,6 @@ public class RecordBinaryViewImpl extends AbstractTableView
implements RecordVie
*
* @param schemaVersion Schema version for which to obtain a marshaller.
*/
- @TestOnly
public TupleMarshaller marshaller(int schemaVersion) {
return marshallerCache.marshaller(schemaVersion);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/InternalSchemaVersionMismatchException.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/InternalSchemaVersionMismatchException.java
new file mode 100644
index 0000000000..e7b249e0b9
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/InternalSchemaVersionMismatchException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import
org.apache.ignite.internal.replicator.exception.ExpectedReplicationException;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.schema.BinaryRow;
+
+/**
+ * An exception that is thrown to indicate that schema version of {@link
BinaryRow}(s) passed in a {@link ReplicaRequest} do not match
+ * the schema version corresponding to the transaction. This can only happen
if the caller first obtained the schema
+ * and then started an (implicit) transaction; in such a case, the caller
should retry the sequence (that is,
+ * re-obtain the new schema version, start another implicit transaction and
send new ReplicaRequest).
+ *
+ * <p>This exception should never reach the public API users (even as a cause
or a suppressed exception),
+ * that's why it has no error code attached.
+ */
+public class InternalSchemaVersionMismatchException extends
IgniteInternalException implements ExpectedReplicationException {
+ private static final long serialVersionUID = 7695731405107811859L;
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 91e86650a2..94a16bd373 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -67,7 +67,6 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -98,6 +97,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.NullBinaryRow;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
@@ -472,34 +472,30 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
}
- return waitForSchemasBeforeReading(request)
- .thenCompose(unused -> validateTableExistence(request))
- .thenCompose(opStartTimestamp ->
processOperationRequest(request, isPrimary, senderId, opStartTimestamp));
+ HybridTimestamp opTsIfDirectRo = (request instanceof
ReadOnlyDirectReplicaRequest) ? hybridClock.now() : null;
+
+ return validateTableExistence(request, opTsIfDirectRo)
+ .thenCompose(unused -> validateSchemaMatch(request,
opTsIfDirectRo))
+ .thenCompose(unused -> waitForSchemasBeforeReading(request,
opTsIfDirectRo))
+ .thenCompose(opStartTimestamp ->
processOperationRequest(request, isPrimary, senderId, opTsIfDirectRo));
}
/**
- * Makes sure that we have schemas corresponding to the moment of tx
start; this makes PK extraction safe WRT
- * {@link org.apache.ignite.internal.schema.SchemaRegistry#schema(int)}.
+ * Validates that the table exists at a timestamp corresponding to the
request operation.
*
- * @param request Request that's being processed.
+ * <ul>
+ * <li>For a read/write in an RW transaction, it's 'now'</li>
+ * <li>For an RO read (with readTimestamp), it's readTimestamp
(matches readTimestamp in the transaction)</li>
+ * <li>For a direct read in an RO implicit transaction, it's the
timestamp chosen (as 'now') to process the request</li>
+ * </ul>
+ *
+ * <p>For other requests, the validation is skipped.
+ *
+ * @param request Replica request corresponding to the operation.
+ * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null}
otherwise.
+ * @return Future completed when the validation is finished.
*/
- private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest
request) {
- // TODO: IGNITE-20715 - validate that input rows schema version
matches the tx-bound schema version.
-
- HybridTimestamp tsToWaitForSchemas;
-
- if (request instanceof ReadWriteReplicaRequest) {
- tsToWaitForSchemas =
TransactionIds.beginTimestamp(((ReadWriteReplicaRequest)
request).transactionId());
- } else if (request instanceof ReadOnlyReplicaRequest) {
- tsToWaitForSchemas = ((ReadOnlyReplicaRequest)
request).readTimestamp();
- } else {
- tsToWaitForSchemas = null;
- }
-
- return tsToWaitForSchemas == null ? completedFuture(null) :
schemaSyncService.waitForMetadataCompleteness(tsToWaitForSchemas);
- }
-
- private CompletableFuture<HybridTimestamp>
validateTableExistence(ReplicaRequest request) {
+ private CompletableFuture<Void> validateTableExistence(ReplicaRequest
request, @Nullable HybridTimestamp opTsIfDirectRo) {
HybridTimestamp opStartTs;
if (request instanceof ReadWriteScanCloseReplicaRequest) {
@@ -510,7 +506,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
} else if (request instanceof ReadOnlyReplicaRequest) {
opStartTs = ((ReadOnlyReplicaRequest) request).readTimestamp();
} else if (request instanceof ReadOnlyDirectReplicaRequest) {
- opStartTs = hybridClock.now();
+ assert opTsIfDirectRo != null;
+
+ opStartTs = opTsIfDirectRo;
} else {
opStartTs = null;
}
@@ -520,18 +518,84 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return schemaSyncService.waitForMetadataCompleteness(opStartTs)
- .thenApply(unused -> {
- schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs,
tableId());
+ .thenRun(() ->
schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, tableId()));
+ }
- return opStartTs;
+ /**
+ * Makes sure that {@link
SchemaVersionAwareReplicaRequest#schemaVersion()} sent in a request matches
table schema version
+ * corresponding to the operation.
+ *
+ * @param request Replica request corresponding to the operation.
+ * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null}
otherwise.
+ * @return Future completed when the validation is finished.
+ */
+ private CompletableFuture<Void> validateSchemaMatch(ReplicaRequest
request, @Nullable HybridTimestamp opTsIfDirectRo) {
+ if (!(request instanceof SchemaVersionAwareReplicaRequest)) {
+ return completedFuture(null);
+ }
+
+ HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
+ if (tsToWaitForSchema == null) {
+ tsToWaitForSchema = opTsIfDirectRo;
+ }
+
+ if (tsToWaitForSchema == null) {
+ return completedFuture(null);
+ }
+
+ HybridTimestamp finalTsToWaitForSchema = tsToWaitForSchema;
+ return
schemaSyncService.waitForMetadataCompleteness(finalTsToWaitForSchema)
+ .thenRun(() -> {
+ SchemaVersionAwareReplicaRequest versionAwareRequest =
(SchemaVersionAwareReplicaRequest) request;
+
+ schemaCompatValidator.failIfRequestSchemaDiffersFromTxTs(
+ finalTsToWaitForSchema,
+ versionAwareRequest.schemaVersion(),
+ tableId()
+ );
});
}
+ /**
+ * Makes sure that we have schemas corresponding to the moment of tx
start; this makes PK extraction safe WRT
+ * {@link SchemaRegistry#schema(int)}.
+ *
+ * @param request Replica request corresponding to the operation.
+ * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null}
otherwise.
+ * @return Future completed when the validation is finished.
+ */
+ private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest
request, @Nullable HybridTimestamp opTsIfDirectRo) {
+ HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
+ if (tsToWaitForSchema == null) {
+ tsToWaitForSchema = opTsIfDirectRo;
+ }
+
+ return tsToWaitForSchema == null ? completedFuture(null) :
schemaSyncService.waitForMetadataCompleteness(tsToWaitForSchema);
+ }
+
+ /**
+ * Returns timestamp of transaction start (for RW/timestamped RO requests)
or @{code null} for other requests.
+ *
+ * @param request Replica request corresponding to the operation.
+ */
+ private static @Nullable HybridTimestamp
getTxStartTimestamp(ReplicaRequest request) {
+ HybridTimestamp txStartTimestamp;
+
+ if (request instanceof ReadWriteReplicaRequest) {
+ txStartTimestamp =
TransactionIds.beginTimestamp(((ReadWriteReplicaRequest)
request).transactionId());
+ } else if (request instanceof ReadOnlyReplicaRequest) {
+ txStartTimestamp = ((ReadOnlyReplicaRequest)
request).readTimestamp();
+ } else {
+ txStartTimestamp = null;
+ }
+ return txStartTimestamp;
+ }
+
private CompletableFuture<?> processOperationRequest(
ReplicaRequest request,
@Nullable Boolean isPrimary,
String senderId,
- HybridTimestamp opStartTimestamp
+ @Nullable HybridTimestamp opStartTsIfDirectRo
) {
if (request instanceof ReadWriteSingleRowReplicaRequest) {
var req = (ReadWriteSingleRowReplicaRequest) request;
@@ -603,9 +667,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
} else if (request instanceof BuildIndexReplicaRequest) {
return
raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request));
} else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
- return
processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest)
request, opStartTimestamp);
+ return
processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest)
request, opStartTsIfDirectRo);
} else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
- return
processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest)
request, opStartTimestamp);
+ return
processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest)
request, opStartTsIfDirectRo);
} else {
throw new UnsupportedReplicaRequestException(request.getClass());
}
@@ -3495,14 +3559,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
}
- private CatalogTableDescriptor getTableDescriptor(int tableId, int
catalogVersion) {
- CatalogTableDescriptor tableDescriptor = catalogService.table(tableId,
catalogVersion);
-
- assert tableDescriptor != null : "tableId=" + tableId + ",
catalogVersion=" + catalogVersion;
-
- return tableDescriptor;
- }
-
private static BuildIndexCommand
toBuildIndexCommand(BuildIndexReplicaRequest request) {
return MSG_FACTORY.buildIndexCommand()
.indexId(request.indexId())
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
index 90ca7d5d55..8aef07a5e6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
@@ -221,4 +221,23 @@ class SchemaCompatValidator {
throw tableWasDroppedException(tableId);
}
}
+
+ /**
+ * Throws an {@link InternalSchemaVersionMismatchException} if the schema
version passed in the request differs from the schema version
+ * corresponding to the transaction timestamp.
+ *
+ * @param txTs Transaction timestamp.
+ * @param requestSchemaVersion Schema version passed in the operation
request.
+ * @param tableId ID of the table.
+ * @throws InternalSchemaVersionMismatchException Thrown if the schema
versions are different.
+ */
+ void failIfRequestSchemaDiffersFromTxTs(HybridTimestamp txTs, int
requestSchemaVersion, int tableId) {
+ CatalogTableDescriptor table = catalogService.table(tableId,
txTs.longValue());
+
+ assert table != null : "No table " + tableId + " at " + txTs;
+
+ if (table.tableVersion() != requestSchemaVersion) {
+ throw new InternalSchemaVersionMismatchException();
+ }
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
index 8d438e3cc2..8de46d2ee5 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
@@ -17,17 +17,31 @@
package org.apache.ignite.internal.table;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
@@ -431,6 +445,26 @@ public class KeyValueBinaryViewOperationsTest extends
TableKvOperationsTestBase
assertThrows(NullPointerException.class, () -> tbl.putAll(null,
Collections.singletonMap(key, null)));
}
+ @Test
+ void retriesOnInternalSchemaVersionMismatchException() throws Exception {
+ SchemaDescriptor schema = schemaDescriptor();
+ InternalTable internalTable = spy(createInternalTable(schema));
+
+ KeyValueView<Tuple, Tuple> view = new
KeyValueBinaryViewImpl(internalTable, new DummySchemaManagerImpl(schema),
schemaVersions);
+
+ BinaryRow resultRow = new
TupleMarshallerImpl(schema).marshal(Tuple.create().set("ID", 1L).set("VAL",
2L));
+
+ doReturn(failedFuture(new InternalSchemaVersionMismatchException()))
+ .doReturn(completedFuture(resultRow))
+ .when(internalTable).get(any(), any());
+
+ Tuple result = view.get(null, Tuple.create().set("ID", 1L));
+
+ assertThat(result.longValue("VAL"), is(2L));
+
+ verify(internalTable, times(2)).get(any(), isNull());
+ }
+
private SchemaDescriptor schemaDescriptor() {
return new SchemaDescriptor(
SCHEMA_VERSION,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
index ebca9a33a7..dcb30d93ec 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.type.NativeTypes.BOOLEAN;
import static org.apache.ignite.internal.type.NativeTypes.BYTES;
import static org.apache.ignite.internal.type.NativeTypes.DATE;
@@ -30,13 +32,22 @@ import static
org.apache.ignite.internal.type.NativeTypes.STRING;
import static org.apache.ignite.internal.type.NativeTypes.datetime;
import static org.apache.ignite.internal.type.NativeTypes.time;
import static org.apache.ignite.internal.type.NativeTypes.timestamp;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
@@ -48,8 +59,11 @@ import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import
org.apache.ignite.internal.marshaller.testobjects.TestObjectWithAllTypes;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
+import
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.type.NativeTypeSpec;
@@ -59,6 +73,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.mapper.Mapper;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
@@ -67,6 +82,54 @@ import org.junit.jupiter.api.Test;
public class KeyValueViewOperationsTest extends TableKvOperationsTestBase {
private final Random rnd = new Random();
+ private final Column[] valCols = {
+ new Column("primitiveBooleanCol".toUpperCase(), BOOLEAN, false),
+ new Column("primitiveByteCol".toUpperCase(), INT8, false),
+ new Column("primitiveShortCol".toUpperCase(), INT16, false),
+ new Column("primitiveIntCol".toUpperCase(), INT32, false),
+ new Column("primitiveLongCol".toUpperCase(), INT64, false),
+ new Column("primitiveFloatCol".toUpperCase(), FLOAT, false),
+ new Column("primitiveDoubleCol".toUpperCase(), DOUBLE, false),
+
+ new Column("booleanCol".toUpperCase(), BOOLEAN, true),
+ new Column("byteCol".toUpperCase(), INT8, true),
+ new Column("shortCol".toUpperCase(), INT16, true),
+ new Column("intCol".toUpperCase(), INT32, true),
+ new Column("longCol".toUpperCase(), INT64, true),
+ new Column("nullLongCol".toUpperCase(), INT64, true),
+ new Column("floatCol".toUpperCase(), FLOAT, true),
+ new Column("doubleCol".toUpperCase(), DOUBLE, true),
+
+ new Column("dateCol".toUpperCase(), DATE, true),
+ new Column("timeCol".toUpperCase(), time(0), true),
+ new Column("dateTimeCol".toUpperCase(), datetime(6), true),
+ new Column("timestampCol".toUpperCase(), timestamp(6), true),
+
+ new Column("uuidCol".toUpperCase(), NativeTypes.UUID, true),
+ new Column("bitmaskCol".toUpperCase(), NativeTypes.bitmaskOf(42),
true),
+ new Column("stringCol".toUpperCase(), STRING, true),
+ new Column("nullBytesCol".toUpperCase(), BYTES, true),
+ new Column("bytesCol".toUpperCase(), BYTES, true),
+ new Column("numberCol".toUpperCase(), NativeTypes.numberOf(12),
true),
+ new Column("decimalCol".toUpperCase(), NativeTypes.decimalOf(19,
3), true),
+ };
+
+ private final SchemaDescriptor schema = new SchemaDescriptor(
+ SCHEMA_VERSION,
+ new Column[]{new Column("id".toUpperCase(), INT64, false)},
+ valCols
+ );
+
+ private final Mapper<TestKeyObject> keyMapper =
Mapper.of(TestKeyObject.class);
+ private final Mapper<TestObjectWithAllTypes> valMapper =
Mapper.of(TestObjectWithAllTypes.class);
+
+ private DummyInternalTableImpl internalTable;
+
+ @BeforeEach
+ void createInternalTable() {
+ internalTable = spy(createInternalTable(schema));
+ }
+
@Test
public void put() {
final TestKeyObject key = TestKeyObject.randomObject(rnd);
@@ -615,6 +678,27 @@ public class KeyValueViewOperationsTest extends
TableKvOperationsTestBase {
assertThrows(MarshallerException.class, () -> tbl.putAll(null,
Collections.singletonMap(key, null)));
}
+ @Test
+ void retriesOnInternalSchemaVersionMismatchException() throws Exception {
+ KeyValueViewImpl<TestKeyObject, TestObjectWithAllTypes> view =
kvView();
+
+ TestKeyObject key = new TestKeyObject(1);
+ TestObjectWithAllTypes expectedValue =
TestObjectWithAllTypes.randomObject(rnd);
+
+ BinaryRow resultRow = new KvMarshallerImpl<>(schema, keyMapper,
valMapper)
+ .marshal(key, expectedValue);
+
+ doReturn(failedFuture(new InternalSchemaVersionMismatchException()))
+ .doReturn(completedFuture(resultRow))
+ .when(internalTable).get(any(), any());
+
+ TestObjectWithAllTypes result = view.get(null, new TestKeyObject(1L));
+
+ assertThat(result, is(equalTo(expectedValue)));
+
+ verify(internalTable, times(2)).get(any(), isNull());
+ }
+
/**
* Creates key-value view.
*/
@@ -625,49 +709,6 @@ public class KeyValueViewOperationsTest extends
TableKvOperationsTestBase {
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class,
RETURNS_DEEP_STUBS));
- Mapper<TestKeyObject> keyMapper = Mapper.of(TestKeyObject.class);
- Mapper<TestObjectWithAllTypes> valMapper =
Mapper.of(TestObjectWithAllTypes.class);
-
- Column[] valCols = {
- new Column("primitiveBooleanCol".toUpperCase(), BOOLEAN,
false),
- new Column("primitiveByteCol".toUpperCase(), INT8, false),
- new Column("primitiveShortCol".toUpperCase(), INT16, false),
- new Column("primitiveIntCol".toUpperCase(), INT32, false),
- new Column("primitiveLongCol".toUpperCase(), INT64, false),
- new Column("primitiveFloatCol".toUpperCase(), FLOAT, false),
- new Column("primitiveDoubleCol".toUpperCase(), DOUBLE, false),
-
- new Column("booleanCol".toUpperCase(), BOOLEAN, true),
- new Column("byteCol".toUpperCase(), INT8, true),
- new Column("shortCol".toUpperCase(), INT16, true),
- new Column("intCol".toUpperCase(), INT32, true),
- new Column("longCol".toUpperCase(), INT64, true),
- new Column("nullLongCol".toUpperCase(), INT64, true),
- new Column("floatCol".toUpperCase(), FLOAT, true),
- new Column("doubleCol".toUpperCase(), DOUBLE, true),
-
- new Column("dateCol".toUpperCase(), DATE, true),
- new Column("timeCol".toUpperCase(), time(0), true),
- new Column("dateTimeCol".toUpperCase(), datetime(6), true),
- new Column("timestampCol".toUpperCase(), timestamp(6), true),
-
- new Column("uuidCol".toUpperCase(), NativeTypes.UUID, true),
- new Column("bitmaskCol".toUpperCase(),
NativeTypes.bitmaskOf(42), true),
- new Column("stringCol".toUpperCase(), STRING, true),
- new Column("nullBytesCol".toUpperCase(), BYTES, true),
- new Column("bytesCol".toUpperCase(), BYTES, true),
- new Column("numberCol".toUpperCase(),
NativeTypes.numberOf(12), true),
- new Column("decimalCol".toUpperCase(),
NativeTypes.decimalOf(19, 3), true),
- };
-
- SchemaDescriptor schema = new SchemaDescriptor(
- SCHEMA_VERSION,
- new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
- valCols
- );
-
- DummyInternalTableImpl table = createInternalTable(schema);
-
// Validate all types are tested.
Set<NativeTypeSpec> testedTypes = Arrays.stream(valCols).map(c ->
c.type().spec())
.collect(Collectors.toSet());
@@ -677,7 +718,7 @@ public class KeyValueViewOperationsTest extends
TableKvOperationsTestBase {
assertEquals(Collections.emptySet(), missedTypes);
return new KeyValueViewImpl<>(
- table,
+ internalTable,
new DummySchemaManagerImpl(schema),
schemaVersions,
keyMapper,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
index 0637167966..12d23ae078 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
@@ -17,9 +17,12 @@
package org.apache.ignite.internal.table;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.schema.DefaultValueProvider.constantProvider;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -27,13 +30,23 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.util.Collection;
import java.util.List;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.InvalidTypeException;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaMismatchException;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.table.impl.TestTupleBuilder;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.lang.IgniteException;
@@ -49,11 +62,7 @@ import org.junit.jupiter.api.function.Executable;
public class RecordBinaryViewOperationsTest extends TableKvOperationsTestBase {
@Test
public void insert() {
- SchemaDescriptor schema = new SchemaDescriptor(
- SCHEMA_VERSION,
- new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
- new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT64, false)}
- );
+ SchemaDescriptor schema = schemaDescriptor();
RecordView<Tuple> tbl = createTable(schema).recordView();
@@ -76,13 +85,17 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
assertNull(tbl.get(null, nonExistedTuple));
}
- @Test
- public void upsert() {
- SchemaDescriptor schema = new SchemaDescriptor(
- 1,
+ private static SchemaDescriptor schemaDescriptor() {
+ return new SchemaDescriptor(
+ SCHEMA_VERSION,
new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT64, false)}
);
+ }
+
+ @Test
+ public void upsert() {
+ SchemaDescriptor schema = schemaDescriptor();
RecordView<Tuple> tbl = createTable(schema).recordView();
@@ -107,11 +120,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void getAndUpsert() {
- SchemaDescriptor schema = new SchemaDescriptor(
- 1,
- new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
- new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT64, false)}
- );
+ SchemaDescriptor schema = schemaDescriptor();
RecordView<Tuple> tbl = createTable(schema).recordView();
@@ -133,11 +142,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void remove() {
- SchemaDescriptor schema = new SchemaDescriptor(
- 1,
- new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
- new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT64, false)}
- );
+ SchemaDescriptor schema = schemaDescriptor();
RecordView<Tuple> tbl = createTable(schema).recordView();
@@ -158,11 +163,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void removeExact() {
- SchemaDescriptor schema = new SchemaDescriptor(
- 1,
- new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
- new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT64, false)}
- );
+ SchemaDescriptor schema = schemaDescriptor();
RecordView<Tuple> tbl = createTable(schema).recordView();
@@ -206,11 +207,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void replace() {
- SchemaDescriptor schema = new SchemaDescriptor(
- 1,
- new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
- new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT64, false)}
- );
+ SchemaDescriptor schema = schemaDescriptor();
RecordView<Tuple> tbl = createTable(schema).recordView();
@@ -236,11 +233,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void replaceExact() {
- SchemaDescriptor schema = new SchemaDescriptor(
- 1,
- new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
- new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT64, false)}
- );
+ SchemaDescriptor schema = schemaDescriptor();
RecordView<Tuple> tbl = createTable(schema).recordView();
@@ -268,7 +261,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void validateSchema() {
SchemaDescriptor schema = new SchemaDescriptor(
- 1,
+ SCHEMA_VERSION,
new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
new Column[]{
new Column("val".toUpperCase(), NativeTypes.INT64,
true),
@@ -300,7 +293,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void defaultValues() {
SchemaDescriptor schema = new SchemaDescriptor(
- 1,
+ SCHEMA_VERSION,
new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
new Column[]{
new Column("val".toUpperCase(), NativeTypes.INT64,
true, constantProvider(28L)),
@@ -327,11 +320,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void getAll() {
- SchemaDescriptor schema = new SchemaDescriptor(
- 1,
- new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
- new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT64, false)}
- );
+ SchemaDescriptor schema = schemaDescriptor();
RecordView<Tuple> tbl = createTable(schema).recordView();
@@ -353,11 +342,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void upsertAllAfterInsertAll() {
- SchemaDescriptor schema = new SchemaDescriptor(
- 1,
- new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
- new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT64, false)}
- );
+ SchemaDescriptor schema = schemaDescriptor();
RecordView<Tuple> tbl = createTable(schema).recordView();
@@ -395,11 +380,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void deleteVsDeleteExact() {
- SchemaDescriptor schema = new SchemaDescriptor(
- 1,
- new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
- new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT64, false)}
- );
+ SchemaDescriptor schema = schemaDescriptor();
RecordView<Tuple> tbl = createTable(schema).recordView();
@@ -423,7 +404,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void getAndReplace() {
SchemaDescriptor schema = new SchemaDescriptor(
- 1,
+ SCHEMA_VERSION,
new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT32, false)}
);
@@ -448,7 +429,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void getAndDelete() {
SchemaDescriptor schema = new SchemaDescriptor(
- 1,
+ SCHEMA_VERSION,
new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT32, false)}
);
@@ -469,7 +450,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void deleteAll() {
SchemaDescriptor schema = new SchemaDescriptor(
- 1,
+ SCHEMA_VERSION,
new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT32, false)}
);
@@ -522,7 +503,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void deleteExact() {
SchemaDescriptor schema = new SchemaDescriptor(
- 1,
+ SCHEMA_VERSION,
new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT32, false)}
);
@@ -575,11 +556,7 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
@Test
public void getAndReplaceVsGetAndUpsert() {
- SchemaDescriptor schema = new SchemaDescriptor(
- 1,
- new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64,
false)},
- new Column[]{new Column("val".toUpperCase(),
NativeTypes.INT64, false)}
- );
+ SchemaDescriptor schema = schemaDescriptor();
RecordView<Tuple> tbl = createTable(schema).recordView();
@@ -600,6 +577,26 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
assertNull(tbl.get(null, Tuple.create().set("id", 1L)));
}
+ @Test
+ void retriesOnInternalSchemaVersionMismatchException() throws Exception {
+ SchemaDescriptor schema = schemaDescriptor();
+ InternalTable internalTable = spy(createInternalTable(schema));
+
+ RecordView<Tuple> view = new RecordBinaryViewImpl(internalTable, new
DummySchemaManagerImpl(schema), schemaVersions);
+
+ BinaryRow resultRow = new
TupleMarshallerImpl(schema).marshal(Tuple.create().set("id", 1L).set("val",
2L));
+
+ doReturn(failedFuture(new InternalSchemaVersionMismatchException()))
+ .doReturn(completedFuture(resultRow))
+ .when(internalTable).get(any(), any());
+
+ Tuple result = view.get(null, Tuple.create().set("id", 1L));
+
+ assertThat(result.longValue("val"), is(2L));
+
+ verify(internalTable, times(2)).get(any(), isNull());
+ }
+
/**
* Check tuples equality.
*
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
index 589eb862c3..dd0f303ccb 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.type.NativeTypes.BOOLEAN;
import static org.apache.ignite.internal.type.NativeTypes.BYTES;
import static org.apache.ignite.internal.type.NativeTypes.DATE;
@@ -32,12 +34,20 @@ import static
org.apache.ignite.internal.type.NativeTypes.time;
import static org.apache.ignite.internal.type.NativeTypes.timestamp;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
@@ -48,8 +58,11 @@ import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import
org.apache.ignite.internal.marshaller.testobjects.TestObjectWithAllTypes;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import
org.apache.ignite.internal.schema.marshaller.reflection.RecordMarshallerImpl;
+import
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.type.NativeTypeSpec;
@@ -58,6 +71,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.mapper.Mapper;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
@@ -67,6 +81,52 @@ public class RecordViewOperationsTest extends
TableKvOperationsTestBase {
private final Random rnd = new Random();
+ private final Column[] valCols = {
+ new Column("primitiveBooleanCol".toUpperCase(), BOOLEAN, false),
+ new Column("primitiveByteCol".toUpperCase(), INT8, false),
+ new Column("primitiveShortCol".toUpperCase(), INT16, false),
+ new Column("primitiveIntCol".toUpperCase(), INT32, false),
+ new Column("primitiveFloatCol".toUpperCase(), FLOAT, false),
+ new Column("primitiveDoubleCol".toUpperCase(), DOUBLE, false),
+
+ new Column("booleanCol".toUpperCase(), BOOLEAN, true),
+ new Column("byteCol".toUpperCase(), INT8, true),
+ new Column("shortCol".toUpperCase(), INT16, true),
+ new Column("intCol".toUpperCase(), INT32, true),
+ new Column("longCol".toUpperCase(), INT64, true),
+ new Column("nullLongCol".toUpperCase(), INT64, true),
+ new Column("floatCol".toUpperCase(), FLOAT, true),
+ new Column("doubleCol".toUpperCase(), DOUBLE, true),
+
+ new Column("dateCol".toUpperCase(), DATE, true),
+ new Column("timeCol".toUpperCase(), time(0), true),
+ new Column("dateTimeCol".toUpperCase(), datetime(6), true),
+ new Column("timestampCol".toUpperCase(), timestamp(6), true),
+
+ new Column("uuidCol".toUpperCase(), NativeTypes.UUID, true),
+ new Column("bitmaskCol".toUpperCase(), NativeTypes.bitmaskOf(42),
true),
+ new Column("stringCol".toUpperCase(), STRING, true),
+ new Column("nullBytesCol".toUpperCase(), BYTES, true),
+ new Column("bytesCol".toUpperCase(), BYTES, true),
+ new Column("numberCol".toUpperCase(), NativeTypes.numberOf(12),
true),
+ new Column("decimalCol".toUpperCase(), NativeTypes.decimalOf(19,
3), true),
+ };
+
+ private final SchemaDescriptor schema = new SchemaDescriptor(
+ SCHEMA_VERSION,
+ new Column[]{new Column("primitiveLongCol".toUpperCase(),
NativeTypes.INT64, false)},
+ valCols
+ );
+
+ private final Mapper<TestObjectWithAllTypes> recMapper =
Mapper.of(TestObjectWithAllTypes.class);
+
+ private DummyInternalTableImpl internalTable;
+
+ @BeforeEach
+ void createInternalTable() {
+ internalTable = spy(createInternalTable(schema));
+ }
+
@Test
public void upsert() {
final TestObjectWithAllTypes key = key(rnd);
@@ -287,6 +347,26 @@ public class RecordViewOperationsTest extends
TableKvOperationsTestBase {
assertThat(res, contains(val1, null, val3));
}
+ @Test
+ void retriesOnInternalSchemaVersionMismatchException() throws Exception {
+ RecordView<TestObjectWithAllTypes> view = recordView();
+
+ TestObjectWithAllTypes expectedRecord =
TestObjectWithAllTypes.randomObject(rnd);
+
+ BinaryRow resultRow = new RecordMarshallerImpl<>(schema, recMapper)
+ .marshal(expectedRecord);
+
+ doReturn(failedFuture(new InternalSchemaVersionMismatchException()))
+ .doReturn(completedFuture(resultRow))
+ .when(internalTable).get(any(), any());
+
+ TestObjectWithAllTypes result = view.get(null, new
TestObjectWithAllTypes());
+
+ assertThat(result, is(equalTo(expectedRecord)));
+
+ verify(internalTable, times(2)).get(any(), isNull());
+ }
+
/**
* Creates RecordView.
*/
@@ -297,47 +377,6 @@ public class RecordViewOperationsTest extends
TableKvOperationsTestBase {
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class,
RETURNS_DEEP_STUBS));
- Mapper<TestObjectWithAllTypes> recMapper =
Mapper.of(TestObjectWithAllTypes.class);
-
- Column[] valCols = {
- new Column("primitiveBooleanCol".toUpperCase(), BOOLEAN,
false),
- new Column("primitiveByteCol".toUpperCase(), INT8, false),
- new Column("primitiveShortCol".toUpperCase(), INT16, false),
- new Column("primitiveIntCol".toUpperCase(), INT32, false),
- new Column("primitiveFloatCol".toUpperCase(), FLOAT, false),
- new Column("primitiveDoubleCol".toUpperCase(), DOUBLE, false),
-
- new Column("booleanCol".toUpperCase(), BOOLEAN, true),
- new Column("byteCol".toUpperCase(), INT8, true),
- new Column("shortCol".toUpperCase(), INT16, true),
- new Column("intCol".toUpperCase(), INT32, true),
- new Column("longCol".toUpperCase(), INT64, true),
- new Column("nullLongCol".toUpperCase(), INT64, true),
- new Column("floatCol".toUpperCase(), FLOAT, true),
- new Column("doubleCol".toUpperCase(), DOUBLE, true),
-
- new Column("dateCol".toUpperCase(), DATE, true),
- new Column("timeCol".toUpperCase(), time(0), true),
- new Column("dateTimeCol".toUpperCase(), datetime(6), true),
- new Column("timestampCol".toUpperCase(), timestamp(6), true),
-
- new Column("uuidCol".toUpperCase(), NativeTypes.UUID, true),
- new Column("bitmaskCol".toUpperCase(),
NativeTypes.bitmaskOf(42), true),
- new Column("stringCol".toUpperCase(), STRING, true),
- new Column("nullBytesCol".toUpperCase(), BYTES, true),
- new Column("bytesCol".toUpperCase(), BYTES, true),
- new Column("numberCol".toUpperCase(),
NativeTypes.numberOf(12), true),
- new Column("decimalCol".toUpperCase(),
NativeTypes.decimalOf(19, 3), true),
- };
-
- SchemaDescriptor schema = new SchemaDescriptor(
- 1,
- new Column[]{new Column("primitiveLongCol".toUpperCase(),
NativeTypes.INT64, false)},
- valCols
- );
-
- DummyInternalTableImpl table = createInternalTable(schema);
-
// Validate all types are tested.
Set<NativeTypeSpec> testedTypes = Arrays.stream(valCols).map(c ->
c.type().spec())
.collect(Collectors.toSet());
@@ -347,7 +386,7 @@ public class RecordViewOperationsTest extends
TableKvOperationsTestBase {
assertEquals(Collections.emptySet(), missedTypes);
return new RecordViewImpl<>(
- table,
+ internalTable,
new DummySchemaManagerImpl(schema),
schemaVersions,
recMapper
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 41d46aa1b2..f9f2d1f74a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -206,7 +206,11 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
TestPartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(TABLE_ID, PART_ID, TEST_MV_PARTITION_STORAGE);
CatalogService catalogService = mock(CatalogService.class);
- when(catalogService.table(anyInt(),
anyLong())).thenReturn(mock(CatalogTableDescriptor.class));
+
+ CatalogTableDescriptor tableDescriptor =
mock(CatalogTableDescriptor.class);
+
when(tableDescriptor.tableVersion()).thenReturn(schemaDescriptor.version());
+
+ when(catalogService.table(anyInt(),
anyLong())).thenReturn(tableDescriptor);
ClusterNode localNode = mock(ClusterNode.class);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index bb1c861bfd..5b56fc8018 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUS
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
@@ -143,6 +144,7 @@ import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
import
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
+import
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
import org.apache.ignite.internal.table.distributed.replicator.LeaderOrTxState;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
@@ -1907,13 +1909,9 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
RwListenerInvocation invocation = null;
if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
- invocation = (targetTxId, key) -> {
- return doSingleRowPkRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, full);
- };
+ invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, full);
} else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
- invocation = (targetTxId, key) -> {
- return doSingleRowRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, full);
- };
+ invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, full);
} else {
fail("Uncovered type: " + requestType);
}
@@ -1988,13 +1986,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
RwListenerInvocation invocation = null;
if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
- invocation = (targetTxId, key) -> {
- return doMultiRowPkRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
- };
+ invocation = (targetTxId, key)
+ -> doMultiRowPkRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
} else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
- invocation = (targetTxId, key) -> {
- return doMultiRowRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
- };
+ invocation = (targetTxId, key)
+ -> doMultiRowRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
} else {
fail("Uncovered type: " + requestType);
}
@@ -2054,13 +2050,9 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
RwListenerInvocation invocation = null;
if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
- invocation = (targetTxId, key) -> {
- return doSingleRowPkRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, full);
- };
+ invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, full);
} else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
- invocation = (targetTxId, key) -> {
- return doSingleRowRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, full);
- };
+ invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, full);
} else {
fail("Uncovered type: " + requestType);
}
@@ -2105,13 +2097,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
RwListenerInvocation invocation = null;
if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
- invocation = (targetTxId, key) -> {
- return doMultiRowPkRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
- };
+ invocation = (targetTxId, key)
+ -> doMultiRowPkRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
} else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
- invocation = (targetTxId, key) -> {
- return doMultiRowRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
- };
+ invocation = (targetTxId, key)
+ -> doMultiRowRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
} else {
fail("Uncovered type: " + requestType);
}
@@ -2256,6 +2246,123 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertThat("The transaction must have been aborted", committed.get(),
is(false));
}
+ @CartesianTest
+ @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
+ void singleRowRwOperationsFailIfSchemaVersionMismatchesTx(RequestType
requestType, boolean onExistingRow, boolean full) {
+ RwListenerInvocation invocation = null;
+
+ if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
+ invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, full);
+ } else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
+ invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, full);
+ } else {
+ fail("Uncovered type: " + requestType);
+ }
+
+ testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow,
invocation);
+ }
+
+ private void testRwOperationFailsIfSchemaVersionMismatchesTx(boolean
onExistingRow, RwListenerInvocation listenerInvocation) {
+ TestKey key = nextKey();
+
+ if (onExistingRow) {
+ upsertInNewTxFor(key);
+ }
+
+ UUID txId = newTxId();
+
+ makeSchemaBeNextVersion();
+
+ CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
+
+ assertThat(future,
willThrow(InternalSchemaVersionMismatchException.class));
+ }
+
+ private void makeSchemaBeNextVersion() {
+ CatalogTableDescriptor tableVersion2 =
mock(CatalogTableDescriptor.class);
+ when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
+
+ when(catalogService.table(eq(TABLE_ID),
anyLong())).thenReturn(tableVersion2);
+ }
+
+ @CartesianTest
+ @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory")
+ void multiRowRwOperationsFailIfSchemaVersionMismatchesTx(RequestType
requestType, boolean onExistingRow, boolean full) {
+ RwListenerInvocation invocation = null;
+
+ if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
+ invocation = (targetTxId, key)
+ -> doMultiRowPkRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+ } else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
+ invocation = (targetTxId, key)
+ -> doMultiRowRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+ } else {
+ fail("Uncovered type: " + requestType);
+ }
+
+ testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow,
invocation);
+ }
+
+ @CartesianTest
+ void replaceRequestFailsIfSchemaVersionMismatchesTx(
+ @Values(booleans = {false, true}) boolean onExistingRow,
+ @Values(booleans = {false, true}) boolean full
+ ) {
+ testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow,
(targetTxId, key) -> {
+ return doReplaceRequest(
+ targetTxId,
+ marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+ marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+ full
+ );
+ });
+ }
+
+ @CartesianTest
+ void singleRowRoGetFailsIfSchemaVersionMismatchesTx(
+ @Values(booleans = {false, true}) boolean direct,
+ @Values(booleans = {false, true}) boolean onExistingRow
+ ) {
+ testRoOperationFailsIfSchemaVersionMismatchesTx(onExistingRow,
(targetTxId, readTimestamp, key) -> {
+ if (direct) {
+ return doReadOnlyDirectSingleGet(marshalQuietly(key,
kvMarshaller));
+ } else {
+ return doReadOnlySingleGet(marshalQuietly(key, kvMarshaller),
readTimestamp);
+ }
+ });
+ }
+
+ private void testRoOperationFailsIfSchemaVersionMismatchesTx(boolean
onExistingRow, RoListenerInvocation listenerInvocation) {
+ TestKey key = nextKey();
+
+ if (onExistingRow) {
+ upsertInNewTxFor(key);
+ }
+
+ UUID txId = newTxId();
+ HybridTimestamp readTs = clock.now();
+
+ makeSchemaBeNextVersion();
+
+ CompletableFuture<?> future = listenerInvocation.invoke(txId, readTs,
key);
+
+ assertThat(future,
willThrow(InternalSchemaVersionMismatchException.class));
+ }
+
+ @CartesianTest
+ void multiRowRoGetFailsIfSchemaVersionMismatchesTx(
+ @Values(booleans = {false, true}) boolean direct,
+ @Values(booleans = {false, true}) boolean onExistingRow
+ ) {
+ testRoOperationFailsIfSchemaVersionMismatchesTx(onExistingRow,
(targetTxId, readTimestamp, key) -> {
+ if (direct) {
+ return doReadOnlyDirectMultiGet(List.of(marshalQuietly(key,
kvMarshaller)));
+ } else {
+ return doReadOnlyMultiGet(List.of(marshalQuietly(key,
kvMarshaller)), readTimestamp);
+ }
+ });
+ }
+
private UUID newTxId() {
return transactionIdFor(clock.now());
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index a4defff961..6377f9a7ee 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -142,6 +142,8 @@ import org.junit.jupiter.api.TestInfo;
* Class that allows to mock a cluster for transaction tests' purposes.
*/
public class ItTxTestCluster {
+ private static final int SCHEMA_VERSION = 1;
+
private final List<NetworkAddress> localAddresses;
private final NodeFinder nodeFinder;
@@ -407,7 +409,11 @@ public class ItTxTestCluster {
*/
public TableImpl startTable(String tableName, int tableId,
SchemaDescriptor schemaDescriptor) throws Exception {
CatalogService catalogService = mock(CatalogService.class);
- lenient().when(catalogService.table(anyInt(),
anyLong())).thenReturn(mock(CatalogTableDescriptor.class));
+
+ CatalogTableDescriptor tableDescriptor =
mock(CatalogTableDescriptor.class);
+ when(tableDescriptor.tableVersion()).thenReturn(SCHEMA_VERSION);
+
+ lenient().when(catalogService.table(anyInt(),
anyLong())).thenReturn(tableDescriptor);
List<Set<Assignment>> calculatedAssignments =
AffinityUtils.calculateAssignments(
cluster.stream().map(node ->
node.topologyService().localMember().name()).collect(toList()),
@@ -613,7 +619,7 @@ public class ItTxTestCluster {
),
new DummySchemaManagerImpl(schemaDescriptor),
clientTxManager.lockManager(),
- new ConstantSchemaVersions(1)
+ new ConstantSchemaVersions(SCHEMA_VERSION)
);
}