This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 677e0d32ea7 IGNITE-25023 Fix error handling on client tx requests
677e0d32ea7 is described below
commit 677e0d32ea71be8bbbfca1fb9df6ce571217c861
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Fri May 30 12:52:02 2025 +0300
IGNITE-25023 Fix error handling on client tx requests
---
.../internal/client/proto/ClientMessagePacker.java | 23 ++++
.../handler/ClientInboundMessageHandler.java | 4 +-
.../tx/ClientTransactionRollbackRequest.java | 5 -
.../ignite/internal/client/table/ClientTable.java | 142 +++++++++++----------
.../internal/client/tx/ClientTransaction.java | 66 ++++++----
...tKeyValueBinaryViewApiExplicitRunInTxnTest.java | 71 +++++++++++
.../ItKeyValueBinaryViewApiExplicitTxnTest.java | 75 +++++++++++
.../table/ItKeyValueBinaryViewApiTest.java | 33 +++--
8 files changed, 307 insertions(+), 112 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index 15e15b47c36..3461dc5a904 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -206,6 +206,29 @@ public class ClientMessagePacker implements AutoCloseable {
buf.setLong(index, v);
}
+ /**
+ * Reserve space for int value.
+ *
+ * @return Index of reserved space.
+ */
+ public int reserveInt() {
+ buf.writeByte(Code.INT32);
+ var index = buf.writerIndex();
+
+ buf.writeInt(0);
+ return index;
+ }
+
+ /**
+ * Set int value at reserved index (see {@link #reserveInt()}).
+ *
+ * @param index Index.
+ * @param v Value.
+ */
+ public void setInt(int index, int v) {
+ buf.setInt(index, v);
+ }
+
/**
* Writes a long value.
*
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index d7cd60a0758..a7e2236ef7b 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -874,11 +874,11 @@ public class ClientInboundMessageHandler
case ClientOp.TX_COMMIT:
return ClientTransactionCommitRequest.process(in, resources,
metrics, clockService, igniteTables,
- clientContext.hasAllFeatures(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS), tsTracker);
+ clientContext.hasFeature(TX_PIGGYBACK), tsTracker);
case ClientOp.TX_ROLLBACK:
return ClientTransactionRollbackRequest.process(in, resources,
metrics, igniteTables,
- clientContext.hasAllFeatures(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS));
+ clientContext.hasFeature(TX_PIGGYBACK));
case ClientOp.COMPUTE_EXECUTE:
return ClientComputeExecuteRequest.process(in, compute,
clusterService, notificationSender(requestId),
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
index 19fc12960b0..1ea53aad501 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
@@ -61,11 +61,6 @@ public class ClientTransactionRollbackRequest {
for (int i = 0; i < cnt; i++) {
int tableId = in.unpackInt();
int partId = in.unpackInt();
-
- if (in.tryUnpackNil()) { // Incomplete mapping.
- continue;
- }
-
String consistentId = in.unpackString();
long token = in.unpackLong();
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index cb0ecb7a10c..72d5e61c180 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.client.table;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.function.Function.identity;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
@@ -564,75 +563,90 @@ public class ClientTable implements Table {
.thenCompose(t -> loadSchemaAndReadData(t,
reader))
.handle((ret, ex) -> {
if (ex != null) {
- // In case of direct mapping failure
try to roll back the transaction.
- if (ctx.enlistmentToken != null &&
!matchAny(unwrapCause(ex), TX_ALREADY_FINISHED_ERR,
-
TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR)) {
- assert tx0 != null &&
!tx0.isReadOnly() : "Invalid transaction for direct mapping " + tx;
-
- return
tx0.rollbackAsync().handle((ignored, err0) -> {
- if (err0 != null) {
- ex.addSuppressed(err0);
+ // Retry schema errors, if any.
+ Throwable cause = ex;
+
+ if (ctx.firstReqFut != null) {
+ // Create failed transaction.
+ ClientTransaction failed = new
ClientTransaction(ctx.channel, id, ctx.readOnly, null, ctx.pm,
+ null,
ch.observableTimestamp(), 0);
+ failed.fail();
+ ctx.firstReqFut.complete(failed);
+
fut.completeExceptionally(unwrapCause(ex));
+ return null;
+ }
+
+ // Don't attempt retrying in case of
direct mapping. This may be improved in the future.
+ if (ctx.enlistmentToken == null) {
+ while (cause != null) {
+ if (cause instanceof
ClientSchemaVersionMismatchException) {
+ // Retry with specific
schema version.
+ int expectedVersion =
((ClientSchemaVersionMismatchException) cause).expectedVersion();
+
+
doSchemaOutInOpAsync(opCode, writer, reader, defaultValue,
responseSchemaRequired,
+ provider,
+
retryPolicyOverride, expectedVersion, expectNotifications, tx)
+
.whenComplete((res0, err0) -> {
+ if (err0 !=
null) {
+
fut.completeExceptionally(err0);
+ } else {
+
fut.complete(res0);
+ }
+ });
+
+ return null;
+ } else if
(schemaVersionOverride == null && cause instanceof UnmappedColumnsException) {
+ // Force load latest
schema and revalidate user data against it.
+ // When
schemaVersionOverride is not null, we already tried to load the schema.
+
schemas.remove(UNKNOWN_SCHEMA_VERSION);
+
+
doSchemaOutInOpAsync(opCode, writer, reader, defaultValue,
responseSchemaRequired,
+ provider,
+
retryPolicyOverride, UNKNOWN_SCHEMA_VERSION, expectNotifications, tx)
+
.whenComplete((res0, err0) -> {
+ if (err0 !=
null) {
+
fut.completeExceptionally(err0);
+ } else {
+
fut.complete(res0);
+ }
+ });
+
+ return null;
}
- sneakyThrow(ex);
+ cause = cause.getCause();
+ }
- return (T) null;
- });
+ fut.completeExceptionally(ex);
} else {
- sneakyThrow(ex);
- }
- }
+ // In case of direct mapping
failure for any reason try to roll back the transaction.
+ if (!matchAny(unwrapCause(ex),
TX_ALREADY_FINISHED_ERR, TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR)) {
+ assert tx0 != null &&
!tx0.isReadOnly() : "Invalid transaction for direct mapping " + tx;
- return completedFuture(ret);
- }).thenCompose(identity());
- });
- }).whenComplete((res, err) -> {
- if (err == null) {
- fut.complete(res);
- return;
- }
-
- // Retry schema errors, if any.
- Throwable cause = err;
+
tx0.rollbackAsync().handle((ignored, err0) -> {
+ if (err0 != null) {
+ ex.addSuppressed(err0);
+ }
- while (cause != null) {
- if (cause instanceof
ClientSchemaVersionMismatchException) {
- // Retry with specific schema version.
- int expectedVersion =
((ClientSchemaVersionMismatchException) cause).expectedVersion();
+
fut.completeExceptionally(ex);
- doSchemaOutInOpAsync(opCode, writer, reader,
defaultValue, responseSchemaRequired, provider,
- retryPolicyOverride, expectedVersion,
expectNotifications, tx)
- .whenComplete((res0, err0) -> {
- if (err0 != null) {
- fut.completeExceptionally(err0);
- } else {
- fut.complete(res0);
- }
- });
-
- return;
- } else if (schemaVersionOverride == null && cause
instanceof UnmappedColumnsException) {
- // Force load latest schema and revalidate user
data against it.
- // When schemaVersionOverride is not null, we
already tried to load the schema.
- schemas.remove(UNKNOWN_SCHEMA_VERSION);
-
- doSchemaOutInOpAsync(opCode, writer, reader,
defaultValue, responseSchemaRequired, provider,
- retryPolicyOverride,
UNKNOWN_SCHEMA_VERSION, expectNotifications, tx)
- .whenComplete((res0, err0) -> {
- if (err0 != null) {
- fut.completeExceptionally(err0);
- } else {
- fut.complete(res0);
+ return (T) null;
+ });
+ } else {
+ fut.completeExceptionally(ex);
+ }
}
- });
-
- return;
- }
-
- cause = cause.getCause();
- }
+ } else {
+ fut.complete(ret);
+ }
- fut.completeExceptionally(err);
+ return null;
+ });
+ });
+ }).exceptionally(ex -> {
+ fut.completeExceptionally(ex);
+ sneakyThrow(ex);
+ return null;
});
return fut;
@@ -648,12 +662,6 @@ public class ClientTable implements Table {
}
}
- private static boolean isProxy(@Nullable Transaction tx, @Nullable
PartitionMapping pm, ClientChannel opChannel) {
- return tx == null || tx.isReadOnly() || pm == null
- ||
!opChannel.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS, TX_PIGGYBACK)
- ||
!pm.nodeConsistentId().equals(opChannel.protocolContext().clusterNode().name());
- }
-
private static CompletableFuture<Void> enlistDirect(
ClientTransaction tx,
ReliableChannel ch,
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index 7cb9fb508d4..c7a98a6003d 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -17,8 +17,7 @@
package org.apache.ignite.internal.client.tx;
-import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
-import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ViewUtils.sync;
@@ -35,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.PartitionMapping;
+import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
@@ -146,7 +145,7 @@ public class ClientTransaction implements Transaction {
this.coordId = coordId;
- assert this.coordId != null;
+ assert txId == null || coordId != null;
}
/**
@@ -219,23 +218,14 @@ public class ClientTransaction implements Transaction {
enlistPartitionLock.writeLock().unlock();
}
- boolean enabled =
ch.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, TX_DELAYED_ACKS);
+ boolean enabled =
ch.protocolContext().isFeatureSupported(TX_PIGGYBACK);
CompletableFuture<Void> finishFut = enabled ?
ch.inflights().finishFuture(txId()) : nullCompletedFuture();
CompletableFuture<Void> mainFinishFut = finishFut.thenCompose(ignored
-> ch.serviceAsync(ClientOp.TX_COMMIT, w -> {
w.out().packLong(id);
if (!isReadOnly && enabled) {
- w.out().packInt(enlisted.size());
- if (!enlisted.isEmpty()) {
- for (Entry<TablePartitionId,
CompletableFuture<IgniteBiTuple<String, Long>>> entry : enlisted.entrySet()) {
- w.out().packInt(entry.getKey().tableId());
- w.out().packInt(entry.getKey().partitionId());
-
w.out().packString(entry.getValue().getNow(null).get1());
- w.out().packLong(entry.getValue().getNow(null).get2());
- }
- w.out().packLong(tracker.get().longValue());
- }
+ packEnlisted(w);
}
}, r -> null));
@@ -272,19 +262,8 @@ public class ClientTransaction implements Transaction {
// Don't wait inflights on rollback.
CompletableFuture<Void> mainFinishFut =
ch.serviceAsync(ClientOp.TX_ROLLBACK, w -> {
w.out().packLong(id);
- if (!isReadOnly &&
w.clientChannel().protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS)) {
- w.out().packInt(enlisted.size());
- for (Entry<TablePartitionId,
CompletableFuture<IgniteBiTuple<String, Long>>> entry : enlisted.entrySet()) {
- w.out().packInt(entry.getKey().tableId());
- w.out().packInt(entry.getKey().partitionId());
- CompletableFuture<IgniteBiTuple<String, Long>> fut =
entry.getValue();
- if (CompletableFutures.isCompletedSuccessfully(fut)) {
- w.out().packString(fut.join().get1());
- w.out().packLong(fut.join().get2());
- } else {
- w.out().packNil();
- }
- }
+ if (!isReadOnly &&
w.clientChannel().protocolContext().isFeatureSupported(TX_PIGGYBACK)) {
+ packEnlisted(w);
}
}, r -> null);
@@ -299,6 +278,31 @@ public class ClientTransaction implements Transaction {
return mainFinishFut;
}
+ private void packEnlisted(PayloadOutputChannel w) {
+ int pos = w.out().reserveInt();
+ int cnt = 0;
+ for (Entry<TablePartitionId, CompletableFuture<IgniteBiTuple<String,
Long>>> entry : enlisted.entrySet()) {
+ IgniteBiTuple<String, Long> info = entry.getValue().getNow(null);
+
+ if (info == null) {
+ continue; // Ignore incomplete enlistments.
+ }
+
+ w.out().packInt(entry.getKey().tableId());
+ w.out().packInt(entry.getKey().partitionId());
+ w.out().packString(info.get1());
+ w.out().packLong(info.get2());
+
+ cnt++;
+ }
+
+ w.out().setInt(pos, cnt);
+
+ if (cnt > 0) {
+ w.out().packLong(tracker.get().longValue());
+ }
+ }
+
/** {@inheritDoc} */
@Override
public boolean isReadOnly() {
@@ -430,6 +434,12 @@ public class ClientTransaction implements Transaction {
return enlisted.size();
}
+ /** Fail the transaction. */
+ public void fail() {
+ state.set(STATE_ROLLED_BACK);
+ finishFut.set(nullCompletedFuture());
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
new file mode 100644
index 00000000000..67c7642d103
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.function.Executable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Integration tests for binary {@link KeyValueView} API using explicit
transaction.
+ */
+public class ItKeyValueBinaryViewApiExplicitRunInTxnTest extends
ItKeyValueBinaryViewApiTest {
+ @Override
+ TestCaseFactory getFactory(String name) {
+ return new TestCaseFactory(name) {
+ @Override
+ <K, V> BaseTestCase<K, V> create(boolean async, boolean thin,
Class<K> keyClass, Class<V> valueClass) {
+ assert keyClass == Tuple.class : keyClass;
+ assert valueClass == Tuple.class : valueClass;
+
+ KeyValueView<Tuple, Tuple> view = thin
+ ? client.tables().table(tableName).keyValueView()
+ :
CLUSTER.aliveNode().tables().table(tableName).keyValueView();
+
+ if (async) {
+ view = new AsyncApiKeyValueViewAdapter<>(view);
+ }
+
+ return (BaseTestCase<K, V>) new TxTestCase(async, thin, view,
createdTables.get(name), thin ? client : CLUSTER.aliveNode());
+ }
+ };
+ }
+
+ private static class TxTestCase extends TestCase {
+ @Override
+ protected Executable wrap(Consumer<Transaction> run) {
+ return () -> ignite.transactions().runInTransaction(run);
+ }
+
+ TxTestCase(boolean async, boolean thin, KeyValueView<Tuple, Tuple>
view, TestTableDefinition tableDefinition, Ignite ignite) {
+ super(async, thin, view, tableDefinition, ignite);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("compoundPkTestCases")
+ @Override
+ public void schemaMismatch(TestCase testCase) {
+ super.schemaMismatch(testCase);
+ }
+}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitTxnTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitTxnTest.java
new file mode 100644
index 00000000000..dc411831bed
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitTxnTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.function.Executable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Integration tests for binary {@link KeyValueView} API using explicit
transaction.
+ */
+public class ItKeyValueBinaryViewApiExplicitTxnTest extends
ItKeyValueBinaryViewApiTest {
+ @Override
+ TestCaseFactory getFactory(String name) {
+ return new TestCaseFactory(name) {
+ @Override
+ <K, V> BaseTestCase<K, V> create(boolean async, boolean thin,
Class<K> keyClass, Class<V> valueClass) {
+ assert keyClass == Tuple.class : keyClass;
+ assert valueClass == Tuple.class : valueClass;
+
+ KeyValueView<Tuple, Tuple> view = thin
+ ? client.tables().table(tableName).keyValueView()
+ :
CLUSTER.aliveNode().tables().table(tableName).keyValueView();
+
+ if (async) {
+ view = new AsyncApiKeyValueViewAdapter<>(view);
+ }
+
+ return (BaseTestCase<K, V>) new TxTestCase(async, thin, view,
createdTables.get(name), thin ? client : CLUSTER.aliveNode());
+ }
+ };
+ }
+
+ private static class TxTestCase extends TestCase {
+ @Override
+ protected Executable wrap(Consumer<Transaction> run) {
+ return () -> {
+ Transaction tx = ignite.transactions().begin();
+ run.accept(tx);
+ tx.commit();
+ };
+ }
+
+ TxTestCase(boolean async, boolean thin, KeyValueView<Tuple, Tuple>
view, TestTableDefinition tableDefinition, Ignite ignite) {
+ super(async, thin, view, tableDefinition, ignite);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("compoundPkTestCases")
+ @Override
+ public void schemaMismatch(TestCase testCase) {
+ super.schemaMismatch(testCase);
+ }
+}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiTest.java
index 4e99a32c4e1..32986d57373 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiTest.java
@@ -28,9 +28,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.InvalidTypeException;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -42,6 +44,7 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.MarshallerException;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
@@ -61,7 +64,7 @@ public class ItKeyValueBinaryViewApiTest extends
ItKeyValueViewApiBaseTest {
private static final String TABLE_NAME_FOR_SCHEMA_VALIDATION =
"test_schema";
- private Map<String, TestTableDefinition> createdTables;
+ Map<String, TestTableDefinition> createdTables;
@BeforeAll
void createTables() {
@@ -542,24 +545,24 @@ public class ItKeyValueBinaryViewApiTest extends
ItKeyValueViewApiBaseTest {
KeyValueView<Tuple, Tuple> view = testCase.view();
testCase.checkSchemaMismatchError(
- () -> view.get(null, Tuple.create().set("id", 0L)),
+ tx -> view.get(null, Tuple.create().set("id", 0L)),
"Missed key column: AFFID"
);
// TODO https://issues.apache.org/jira/browse/IGNITE-21793 Thin client
must throw exception
if (!testCase.thin) {
testCase.checkSchemaMismatchError(
- () -> view.get(null, Tuple.create().set("id",
0L).set("affId", 1L).set("val", 0L)),
+ tx -> view.get(null, Tuple.create().set("id",
0L).set("affId", 1L).set("val", 0L)),
"Key tuple doesn't match schema: schemaVersion=1,
extraColumns=[VAL]"
);
}
testCase.checkSchemaMismatchError(
- () -> view.put(null, Tuple.create().set("id", 0L),
Tuple.create()),
+ tx -> view.put(tx, Tuple.create().set("id", 0L),
Tuple.create()),
"Missed key column: AFFID"
);
testCase.checkSchemaMismatchError(
- () -> view.put(null, Tuple.create().set("id", 0L).set("affId",
1L).set("val", 0L), Tuple.create()),
+ tx -> view.put(tx, Tuple.create().set("id", 0L).set("affId",
1L).set("val", 0L), Tuple.create()),
"Key tuple doesn't match schema: schemaVersion=1,
extraColumns=[VAL]"
);
@@ -611,7 +614,7 @@ public class ItKeyValueBinaryViewApiTest extends
ItKeyValueViewApiBaseTest {
view = new AsyncApiKeyValueViewAdapter<>(view);
}
- return (BaseTestCase<K, V>) new TestCase(async, thin, view,
createdTables.get(name));
+ return (BaseTestCase<K, V>) new TestCase(async, thin, view,
createdTables.get(name), thin ? client : CLUSTER.aliveNode());
}
};
}
@@ -623,6 +626,8 @@ public class ItKeyValueBinaryViewApiTest extends
ItKeyValueViewApiBaseTest {
final SchemaDescriptor schema;
+ final Ignite ignite;
+
String keyColumnName(int index) {
return keyColumns.get(index);
}
@@ -635,25 +640,33 @@ public class ItKeyValueBinaryViewApiTest extends
ItKeyValueViewApiBaseTest {
return schema;
}
- TestCase(boolean async, boolean thin, KeyValueView<Tuple, Tuple> view,
TestTableDefinition tableDefinition) {
+ TestCase(boolean async, boolean thin, KeyValueView<Tuple, Tuple> view,
TestTableDefinition tableDefinition, Ignite ignite) {
super(async, thin, view);
+ this.ignite = ignite;
+
this.keyColumns =
quoteOrLowercaseNames(tableDefinition.schemaDescriptor.keyColumns());
this.valueColumns =
quoteOrLowercaseNames(tableDefinition.schemaDescriptor.valueColumns());
this.schema = tableDefinition.schemaDescriptor;
}
+ protected Executable wrap(Consumer<Transaction> run) {
+ return () -> run.accept(null);
+ }
+
@SuppressWarnings("ThrowableNotThrown")
void checkNullValueError(Executable run) {
IgniteTestUtils.assertThrows(NullPointerException.class, run,
"val");
}
@SuppressWarnings("ThrowableNotThrown")
- void checkSchemaMismatchError(Executable run, String expectedMessage) {
+ void checkSchemaMismatchError(Consumer<Transaction> run, String
expectedMessage) {
+ Executable e = wrap(run);
+
if (thin) {
- IgniteTestUtils.assertThrows(IgniteException.class, run,
expectedMessage);
+ IgniteTestUtils.assertThrows(IgniteException.class, e,
expectedMessage);
} else {
- IgniteTestUtils.assertThrowsWithCause(run::execute,
SchemaMismatchException.class, expectedMessage);
+ IgniteTestUtils.assertThrowsWithCause(e::execute,
SchemaMismatchException.class, expectedMessage);
}
}