This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 677e0d32ea7 IGNITE-25023 Fix error handling on client tx requests
677e0d32ea7 is described below

commit 677e0d32ea71be8bbbfca1fb9df6ce571217c861
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Fri May 30 12:52:02 2025 +0300

    IGNITE-25023 Fix error handling on client tx requests
---
 .../internal/client/proto/ClientMessagePacker.java |  23 ++++
 .../handler/ClientInboundMessageHandler.java       |   4 +-
 .../tx/ClientTransactionRollbackRequest.java       |   5 -
 .../ignite/internal/client/table/ClientTable.java  | 142 +++++++++++----------
 .../internal/client/tx/ClientTransaction.java      |  66 ++++++----
 ...tKeyValueBinaryViewApiExplicitRunInTxnTest.java |  71 +++++++++++
 .../ItKeyValueBinaryViewApiExplicitTxnTest.java    |  75 +++++++++++
 .../table/ItKeyValueBinaryViewApiTest.java         |  33 +++--
 8 files changed, 307 insertions(+), 112 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index 15e15b47c36..3461dc5a904 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -206,6 +206,29 @@ public class ClientMessagePacker implements AutoCloseable {
         buf.setLong(index, v);
     }
 
+    /**
+     * Reserve space for int value.
+     *
+     * @return Index of reserved space.
+     */
+    public int reserveInt() {
+        buf.writeByte(Code.INT32);
+        var index = buf.writerIndex();
+
+        buf.writeInt(0);
+        return index;
+    }
+
+    /**
+     * Set int value at reserved index (see {@link #reserveInt()}).
+     *
+     * @param index Index.
+     * @param v Value.
+     */
+    public void setInt(int index, int v) {
+        buf.setInt(index, v);
+    }
+
     /**
      * Writes a long value.
      *
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index d7cd60a0758..a7e2236ef7b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -874,11 +874,11 @@ public class ClientInboundMessageHandler
 
             case ClientOp.TX_COMMIT:
                 return ClientTransactionCommitRequest.process(in, resources, 
metrics, clockService, igniteTables,
-                        clientContext.hasAllFeatures(TX_DIRECT_MAPPING, 
TX_DELAYED_ACKS), tsTracker);
+                        clientContext.hasFeature(TX_PIGGYBACK), tsTracker);
 
             case ClientOp.TX_ROLLBACK:
                 return ClientTransactionRollbackRequest.process(in, resources, 
metrics, igniteTables,
-                        clientContext.hasAllFeatures(TX_DIRECT_MAPPING, 
TX_DELAYED_ACKS));
+                        clientContext.hasFeature(TX_PIGGYBACK));
 
             case ClientOp.COMPUTE_EXECUTE:
                 return ClientComputeExecuteRequest.process(in, compute, 
clusterService, notificationSender(requestId),
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
index 19fc12960b0..1ea53aad501 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
@@ -61,11 +61,6 @@ public class ClientTransactionRollbackRequest {
             for (int i = 0; i < cnt; i++) {
                 int tableId = in.unpackInt();
                 int partId = in.unpackInt();
-
-                if (in.tryUnpackNil()) { // Incomplete mapping.
-                    continue;
-                }
-
                 String consistentId = in.unpackString();
                 long token = in.unpackLong();
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index cb0ecb7a10c..72d5e61c180 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.client.table;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.function.Function.identity;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
@@ -564,75 +563,90 @@ public class ClientTable implements Table {
                                 .thenCompose(t -> loadSchemaAndReadData(t, 
reader))
                                 .handle((ret, ex) -> {
                                     if (ex != null) {
-                                        // In case of direct mapping failure 
try to roll back the transaction.
-                                        if (ctx.enlistmentToken != null && 
!matchAny(unwrapCause(ex), TX_ALREADY_FINISHED_ERR,
-                                                
TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR)) {
-                                            assert tx0 != null && 
!tx0.isReadOnly() : "Invalid transaction for direct mapping " + tx;
-
-                                            return 
tx0.rollbackAsync().handle((ignored, err0) -> {
-                                                if (err0 != null) {
-                                                    ex.addSuppressed(err0);
+                                        // Retry schema errors, if any.
+                                        Throwable cause = ex;
+
+                                        if (ctx.firstReqFut != null) {
+                                            // Create failed transaction.
+                                            ClientTransaction failed = new 
ClientTransaction(ctx.channel, id, ctx.readOnly, null, ctx.pm,
+                                                    null, 
ch.observableTimestamp(), 0);
+                                            failed.fail();
+                                            ctx.firstReqFut.complete(failed);
+                                            
fut.completeExceptionally(unwrapCause(ex));
+                                            return null;
+                                        }
+
+                                        // Don't attempt retrying in case of 
direct mapping. This may be improved in the future.
+                                        if (ctx.enlistmentToken == null) {
+                                            while (cause != null) {
+                                                if (cause instanceof 
ClientSchemaVersionMismatchException) {
+                                                    // Retry with specific 
schema version.
+                                                    int expectedVersion = 
((ClientSchemaVersionMismatchException) cause).expectedVersion();
+
+                                                    
doSchemaOutInOpAsync(opCode, writer, reader, defaultValue, 
responseSchemaRequired,
+                                                            provider,
+                                                            
retryPolicyOverride, expectedVersion, expectNotifications, tx)
+                                                            
.whenComplete((res0, err0) -> {
+                                                                if (err0 != 
null) {
+                                                                    
fut.completeExceptionally(err0);
+                                                                } else {
+                                                                    
fut.complete(res0);
+                                                                }
+                                                            });
+
+                                                    return null;
+                                                } else if 
(schemaVersionOverride == null && cause instanceof UnmappedColumnsException) {
+                                                    // Force load latest 
schema and revalidate user data against it.
+                                                    // When 
schemaVersionOverride is not null, we already tried to load the schema.
+                                                    
schemas.remove(UNKNOWN_SCHEMA_VERSION);
+
+                                                    
doSchemaOutInOpAsync(opCode, writer, reader, defaultValue, 
responseSchemaRequired,
+                                                            provider,
+                                                            
retryPolicyOverride, UNKNOWN_SCHEMA_VERSION, expectNotifications, tx)
+                                                            
.whenComplete((res0, err0) -> {
+                                                                if (err0 != 
null) {
+                                                                    
fut.completeExceptionally(err0);
+                                                                } else {
+                                                                    
fut.complete(res0);
+                                                                }
+                                                            });
+
+                                                    return null;
                                                 }
 
-                                                sneakyThrow(ex);
+                                                cause = cause.getCause();
+                                            }
 
-                                                return (T) null;
-                                            });
+                                            fut.completeExceptionally(ex);
                                         } else {
-                                            sneakyThrow(ex);
-                                        }
-                                    }
+                                            // In case of direct mapping 
failure for any reason try to roll back the transaction.
+                                            if (!matchAny(unwrapCause(ex), 
TX_ALREADY_FINISHED_ERR, TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR)) {
+                                                assert tx0 != null && 
!tx0.isReadOnly() : "Invalid transaction for direct mapping " + tx;
 
-                                    return completedFuture(ret);
-                                }).thenCompose(identity());
-                    });
-                }).whenComplete((res, err) -> {
-                    if (err == null) {
-                        fut.complete(res);
-                        return;
-                    }
-
-                    // Retry schema errors, if any.
-                    Throwable cause = err;
+                                                
tx0.rollbackAsync().handle((ignored, err0) -> {
+                                                    if (err0 != null) {
+                                                        ex.addSuppressed(err0);
+                                                    }
 
-                    while (cause != null) {
-                        if (cause instanceof 
ClientSchemaVersionMismatchException) {
-                            // Retry with specific schema version.
-                            int expectedVersion = 
((ClientSchemaVersionMismatchException) cause).expectedVersion();
+                                                    
fut.completeExceptionally(ex);
 
-                            doSchemaOutInOpAsync(opCode, writer, reader, 
defaultValue, responseSchemaRequired, provider,
-                                    retryPolicyOverride, expectedVersion, 
expectNotifications, tx)
-                                    .whenComplete((res0, err0) -> {
-                                        if (err0 != null) {
-                                            fut.completeExceptionally(err0);
-                                        } else {
-                                            fut.complete(res0);
-                                        }
-                                    });
-
-                            return;
-                        } else if (schemaVersionOverride == null && cause 
instanceof UnmappedColumnsException) {
-                            // Force load latest schema and revalidate user 
data against it.
-                            // When schemaVersionOverride is not null, we 
already tried to load the schema.
-                            schemas.remove(UNKNOWN_SCHEMA_VERSION);
-
-                            doSchemaOutInOpAsync(opCode, writer, reader, 
defaultValue, responseSchemaRequired, provider,
-                                    retryPolicyOverride, 
UNKNOWN_SCHEMA_VERSION, expectNotifications, tx)
-                                    .whenComplete((res0, err0) -> {
-                                        if (err0 != null) {
-                                            fut.completeExceptionally(err0);
-                                        } else {
-                                            fut.complete(res0);
+                                                    return (T) null;
+                                                });
+                                            } else {
+                                                fut.completeExceptionally(ex);
+                                            }
                                         }
-                                    });
-
-                            return;
-                        }
-
-                        cause = cause.getCause();
-                    }
+                                    } else {
+                                        fut.complete(ret);
+                                    }
 
-                    fut.completeExceptionally(err);
+                                    return null;
+                                });
+                    });
+                }).exceptionally(ex -> {
+                    fut.completeExceptionally(ex);
+                    sneakyThrow(ex);
+                    return null;
                 });
 
         return fut;
@@ -648,12 +662,6 @@ public class ClientTable implements Table {
         }
     }
 
-    private static boolean isProxy(@Nullable Transaction tx, @Nullable 
PartitionMapping pm, ClientChannel opChannel) {
-        return tx == null || tx.isReadOnly() || pm == null
-                || 
!opChannel.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, 
TX_DELAYED_ACKS, TX_PIGGYBACK)
-                || 
!pm.nodeConsistentId().equals(opChannel.protocolContext().clusterNode().name());
-    }
-
     private static CompletableFuture<Void> enlistDirect(
             ClientTransaction tx,
             ReliableChannel ch,
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index 7cb9fb508d4..c7a98a6003d 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -17,8 +17,7 @@
 
 package org.apache.ignite.internal.client.tx;
 
-import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
-import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ViewUtils.sync;
@@ -35,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.PartitionMapping;
+import org.apache.ignite.internal.client.PayloadOutputChannel;
 import org.apache.ignite.internal.client.ReliableChannel;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
@@ -146,7 +145,7 @@ public class ClientTransaction implements Transaction {
 
         this.coordId = coordId;
 
-        assert this.coordId != null;
+        assert txId == null || coordId != null;
     }
 
     /**
@@ -219,23 +218,14 @@ public class ClientTransaction implements Transaction {
             enlistPartitionLock.writeLock().unlock();
         }
 
-        boolean enabled = 
ch.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, TX_DELAYED_ACKS);
+        boolean enabled = 
ch.protocolContext().isFeatureSupported(TX_PIGGYBACK);
         CompletableFuture<Void> finishFut = enabled ? 
ch.inflights().finishFuture(txId()) : nullCompletedFuture();
 
         CompletableFuture<Void> mainFinishFut = finishFut.thenCompose(ignored 
-> ch.serviceAsync(ClientOp.TX_COMMIT, w -> {
             w.out().packLong(id);
 
             if (!isReadOnly && enabled) {
-                w.out().packInt(enlisted.size());
-                if (!enlisted.isEmpty()) {
-                    for (Entry<TablePartitionId, 
CompletableFuture<IgniteBiTuple<String, Long>>> entry : enlisted.entrySet()) {
-                        w.out().packInt(entry.getKey().tableId());
-                        w.out().packInt(entry.getKey().partitionId());
-                        
w.out().packString(entry.getValue().getNow(null).get1());
-                        w.out().packLong(entry.getValue().getNow(null).get2());
-                    }
-                    w.out().packLong(tracker.get().longValue());
-                }
+                packEnlisted(w);
             }
         }, r -> null));
 
@@ -272,19 +262,8 @@ public class ClientTransaction implements Transaction {
         // Don't wait inflights on rollback.
         CompletableFuture<Void> mainFinishFut = 
ch.serviceAsync(ClientOp.TX_ROLLBACK, w -> {
             w.out().packLong(id);
-            if (!isReadOnly && 
w.clientChannel().protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, 
TX_DELAYED_ACKS)) {
-                w.out().packInt(enlisted.size());
-                for (Entry<TablePartitionId, 
CompletableFuture<IgniteBiTuple<String, Long>>> entry : enlisted.entrySet()) {
-                    w.out().packInt(entry.getKey().tableId());
-                    w.out().packInt(entry.getKey().partitionId());
-                    CompletableFuture<IgniteBiTuple<String, Long>> fut = 
entry.getValue();
-                    if (CompletableFutures.isCompletedSuccessfully(fut)) {
-                        w.out().packString(fut.join().get1());
-                        w.out().packLong(fut.join().get2());
-                    } else {
-                        w.out().packNil();
-                    }
-                }
+            if (!isReadOnly && 
w.clientChannel().protocolContext().isFeatureSupported(TX_PIGGYBACK)) {
+                packEnlisted(w);
             }
         }, r -> null);
 
@@ -299,6 +278,31 @@ public class ClientTransaction implements Transaction {
         return mainFinishFut;
     }
 
+    private void packEnlisted(PayloadOutputChannel w) {
+        int pos = w.out().reserveInt();
+        int cnt = 0;
+        for (Entry<TablePartitionId, CompletableFuture<IgniteBiTuple<String, 
Long>>> entry : enlisted.entrySet()) {
+            IgniteBiTuple<String, Long> info = entry.getValue().getNow(null);
+
+            if (info == null) {
+                continue; // Ignore incomplete enlistments.
+            }
+
+            w.out().packInt(entry.getKey().tableId());
+            w.out().packInt(entry.getKey().partitionId());
+            w.out().packString(info.get1());
+            w.out().packLong(info.get2());
+
+            cnt++;
+        }
+
+        w.out().setInt(pos, cnt);
+
+        if (cnt > 0) {
+            w.out().packLong(tracker.get().longValue());
+        }
+    }
+
     /** {@inheritDoc} */
     @Override
     public boolean isReadOnly() {
@@ -430,6 +434,12 @@ public class ClientTransaction implements Transaction {
         return enlisted.size();
     }
 
+    /** Fail the transaction. */
+    public void fail() {
+        state.set(STATE_ROLLED_BACK);
+        finishFut.set(nullCompletedFuture());
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
new file mode 100644
index 00000000000..67c7642d103
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.function.Executable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Integration tests for binary {@link KeyValueView} API using explicit 
transaction.
+ */
+public class ItKeyValueBinaryViewApiExplicitRunInTxnTest extends 
ItKeyValueBinaryViewApiTest {
+    @Override
+    TestCaseFactory getFactory(String name) {
+        return new TestCaseFactory(name) {
+            @Override
+            <K, V> BaseTestCase<K, V> create(boolean async, boolean thin, 
Class<K> keyClass, Class<V> valueClass) {
+                assert keyClass == Tuple.class : keyClass;
+                assert valueClass == Tuple.class : valueClass;
+
+                KeyValueView<Tuple, Tuple> view = thin
+                        ? client.tables().table(tableName).keyValueView()
+                        : 
CLUSTER.aliveNode().tables().table(tableName).keyValueView();
+
+                if (async) {
+                    view = new AsyncApiKeyValueViewAdapter<>(view);
+                }
+
+                return (BaseTestCase<K, V>) new TxTestCase(async, thin, view, 
createdTables.get(name), thin ? client : CLUSTER.aliveNode());
+            }
+        };
+    }
+
+    private static class TxTestCase extends TestCase {
+        @Override
+        protected Executable wrap(Consumer<Transaction> run) {
+            return () -> ignite.transactions().runInTransaction(run);
+        }
+
+        TxTestCase(boolean async, boolean thin, KeyValueView<Tuple, Tuple> 
view, TestTableDefinition tableDefinition, Ignite ignite) {
+            super(async, thin, view, tableDefinition, ignite);
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("compoundPkTestCases")
+    @Override
+    public void schemaMismatch(TestCase testCase) {
+        super.schemaMismatch(testCase);
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitTxnTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitTxnTest.java
new file mode 100644
index 00000000000..dc411831bed
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitTxnTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.function.Executable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Integration tests for binary {@link KeyValueView} API using explicit 
transaction.
+ */
+public class ItKeyValueBinaryViewApiExplicitTxnTest extends 
ItKeyValueBinaryViewApiTest {
+    @Override
+    TestCaseFactory getFactory(String name) {
+        return new TestCaseFactory(name) {
+            @Override
+            <K, V> BaseTestCase<K, V> create(boolean async, boolean thin, 
Class<K> keyClass, Class<V> valueClass) {
+                assert keyClass == Tuple.class : keyClass;
+                assert valueClass == Tuple.class : valueClass;
+
+                KeyValueView<Tuple, Tuple> view = thin
+                        ? client.tables().table(tableName).keyValueView()
+                        : 
CLUSTER.aliveNode().tables().table(tableName).keyValueView();
+
+                if (async) {
+                    view = new AsyncApiKeyValueViewAdapter<>(view);
+                }
+
+                return (BaseTestCase<K, V>) new TxTestCase(async, thin, view, 
createdTables.get(name), thin ? client : CLUSTER.aliveNode());
+            }
+        };
+    }
+
+    private static class TxTestCase extends TestCase {
+        @Override
+        protected Executable wrap(Consumer<Transaction> run) {
+            return () -> {
+                Transaction tx = ignite.transactions().begin();
+                run.accept(tx);
+                tx.commit();
+            };
+        }
+
+        TxTestCase(boolean async, boolean thin, KeyValueView<Tuple, Tuple> 
view, TestTableDefinition tableDefinition, Ignite ignite) {
+            super(async, thin, view, tableDefinition, ignite);
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("compoundPkTestCases")
+    @Override
+    public void schemaMismatch(TestCase testCase) {
+        super.schemaMismatch(testCase);
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiTest.java
index 4e99a32c4e1..32986d57373 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiTest.java
@@ -28,9 +28,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.InvalidTypeException;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -42,6 +44,7 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.MarshallerException;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.function.Executable;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -61,7 +64,7 @@ public class ItKeyValueBinaryViewApiTest extends 
ItKeyValueViewApiBaseTest {
 
     private static final String TABLE_NAME_FOR_SCHEMA_VALIDATION = 
"test_schema";
 
-    private Map<String, TestTableDefinition> createdTables;
+    Map<String, TestTableDefinition> createdTables;
 
     @BeforeAll
     void createTables() {
@@ -542,24 +545,24 @@ public class ItKeyValueBinaryViewApiTest extends 
ItKeyValueViewApiBaseTest {
         KeyValueView<Tuple, Tuple> view = testCase.view();
 
         testCase.checkSchemaMismatchError(
-                () -> view.get(null, Tuple.create().set("id", 0L)),
+                tx -> view.get(null, Tuple.create().set("id", 0L)),
                 "Missed key column: AFFID"
         );
 
         // TODO https://issues.apache.org/jira/browse/IGNITE-21793 Thin client 
must throw exception
         if (!testCase.thin) {
             testCase.checkSchemaMismatchError(
-                    () -> view.get(null, Tuple.create().set("id", 
0L).set("affId", 1L).set("val", 0L)),
+                    tx -> view.get(null, Tuple.create().set("id", 
0L).set("affId", 1L).set("val", 0L)),
                     "Key tuple doesn't match schema: schemaVersion=1, 
extraColumns=[VAL]"
             );
         }
 
         testCase.checkSchemaMismatchError(
-                () -> view.put(null, Tuple.create().set("id", 0L), 
Tuple.create()),
+                tx -> view.put(tx, Tuple.create().set("id", 0L), 
Tuple.create()),
                 "Missed key column: AFFID"
         );
         testCase.checkSchemaMismatchError(
-                () -> view.put(null, Tuple.create().set("id", 0L).set("affId", 
1L).set("val", 0L), Tuple.create()),
+                tx -> view.put(tx, Tuple.create().set("id", 0L).set("affId", 
1L).set("val", 0L), Tuple.create()),
                 "Key tuple doesn't match schema: schemaVersion=1, 
extraColumns=[VAL]"
         );
 
@@ -611,7 +614,7 @@ public class ItKeyValueBinaryViewApiTest extends 
ItKeyValueViewApiBaseTest {
                     view = new AsyncApiKeyValueViewAdapter<>(view);
                 }
 
-                return (BaseTestCase<K, V>) new TestCase(async, thin, view, 
createdTables.get(name));
+                return (BaseTestCase<K, V>) new TestCase(async, thin, view, 
createdTables.get(name), thin ? client : CLUSTER.aliveNode());
             }
         };
     }
@@ -623,6 +626,8 @@ public class ItKeyValueBinaryViewApiTest extends 
ItKeyValueViewApiBaseTest {
 
         final SchemaDescriptor schema;
 
+        final Ignite ignite;
+
         String keyColumnName(int index) {
             return keyColumns.get(index);
         }
@@ -635,25 +640,33 @@ public class ItKeyValueBinaryViewApiTest extends 
ItKeyValueViewApiBaseTest {
             return schema;
         }
 
-        TestCase(boolean async, boolean thin, KeyValueView<Tuple, Tuple> view, 
TestTableDefinition tableDefinition) {
+        TestCase(boolean async, boolean thin, KeyValueView<Tuple, Tuple> view, 
TestTableDefinition tableDefinition, Ignite ignite) {
             super(async, thin, view);
 
+            this.ignite = ignite;
+
             this.keyColumns = 
quoteOrLowercaseNames(tableDefinition.schemaDescriptor.keyColumns());
             this.valueColumns = 
quoteOrLowercaseNames(tableDefinition.schemaDescriptor.valueColumns());
             this.schema = tableDefinition.schemaDescriptor;
         }
 
+        protected Executable wrap(Consumer<Transaction> run) {
+            return () -> run.accept(null);
+        }
+
         @SuppressWarnings("ThrowableNotThrown")
         void checkNullValueError(Executable run) {
             IgniteTestUtils.assertThrows(NullPointerException.class, run, 
"val");
         }
 
         @SuppressWarnings("ThrowableNotThrown")
-        void checkSchemaMismatchError(Executable run, String expectedMessage) {
+        void checkSchemaMismatchError(Consumer<Transaction> run, String 
expectedMessage) {
+            Executable e = wrap(run);
+
             if (thin) {
-                IgniteTestUtils.assertThrows(IgniteException.class, run, 
expectedMessage);
+                IgniteTestUtils.assertThrows(IgniteException.class, e, 
expectedMessage);
             } else {
-                IgniteTestUtils.assertThrowsWithCause(run::execute, 
SchemaMismatchException.class, expectedMessage);
+                IgniteTestUtils.assertThrowsWithCause(e::execute, 
SchemaMismatchException.class, expectedMessage);
             }
         }
 

Reply via email to