This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 28111df1a2 IGNITE-19839 Java client: Reload schema when unmapped 
columns are detected (#2410)
28111df1a2 is described below

commit 28111df1a22fcf7a493a4e340dec3064726bce8d
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Aug 7 12:49:51 2023 +0300

    IGNITE-19839 Java client: Reload schema when unmapped columns are detected 
(#2410)
    
    When unmapped columns are detected in a Tuple or a POJO, we must ensure 
that the latest schema is used before throwing an exception.
    
    For example, right after `ALTER TABLE ADD COLUMN`, the user expects to be 
able to insert a tuple with the new column. However, the client still has the 
old schema in the cache, and previously would throw unmapped column error. Now 
we retry the operation with forced schema refresh.
---
 .../ignite/internal/client/table/ClientTable.java  | 187 +++++++++++----------
 .../client/table/ClientTupleSerializer.java        |   5 +-
 .../ignite/internal/util/ExceptionUtils.java       |  16 ++
 .../ignite/internal/marshaller/Marshaller.java     |   3 +-
 .../marshaller/UnmappedColumnsException.java       |  25 +++
 .../ItThinClientSchemaSynchronizationTest.java     | 158 +++++++++++++++++
 .../schema/marshaller/KvMarshallerTest.java        |  17 +-
 7 files changed, 307 insertions(+), 104 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index b31328a824..2be68fc16e 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -39,7 +39,9 @@ import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.ColumnTypeConverter;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.marshaller.UnmappedColumnsException;
 import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.table.KeyValueView;
@@ -269,6 +271,7 @@ public class ClientTable implements Table {
         } else {
             ClientTransaction clientTx = ClientTransaction.get(tx);
 
+            //noinspection resource
             if (clientTx.channel() != out.clientChannel()) {
                 // Do not throw IgniteClientConnectionException to avoid retry 
kicking in.
                 throw new IgniteException(CONNECTION_ERR, "Transaction context 
has been lost due to connection errors.");
@@ -278,68 +281,6 @@ public class ClientTable implements Table {
         }
     }
 
-    <T> CompletableFuture<T> doSchemaOutInOpAsync(
-            int opCode,
-            BiConsumer<ClientSchema, PayloadOutputChannel> writer,
-            BiFunction<ClientSchema, ClientMessageUnpacker, T> reader,
-            @Nullable T defaultValue,
-            @Nullable PartitionAwarenessProvider provider
-    ) {
-        return doSchemaOutInOpAsync(opCode, writer, reader, defaultValue, 
provider, null);
-    }
-
-    private <T> CompletableFuture<T> doSchemaOutInOpAsync(
-            int opCode,
-            BiConsumer<ClientSchema, PayloadOutputChannel> writer,
-            BiFunction<ClientSchema, ClientMessageUnpacker, T> reader,
-            @Nullable T defaultValue,
-            @Nullable PartitionAwarenessProvider provider,
-            @Nullable Integer schemaVersionOverride
-    ) {
-        CompletableFuture<T> fut = new CompletableFuture<>();
-
-        CompletableFuture<ClientSchema> schemaFut = 
getSchema(schemaVersionOverride == null ? latestSchemaVer : 
schemaVersionOverride);
-        CompletableFuture<List<String>> partitionsFut = provider == null || 
!provider.isPartitionAwarenessEnabled()
-                ? CompletableFuture.completedFuture(null)
-                : getPartitionAssignment();
-
-        CompletableFuture.allOf(schemaFut, partitionsFut)
-                .thenCompose(v -> {
-                    ClientSchema schema = schemaFut.getNow(null);
-                    String preferredNodeName = getPreferredNodeName(provider, 
partitionsFut.getNow(null), schema);
-
-                    return ch.serviceAsync(opCode,
-                            w -> writer.accept(schema, w),
-                            r -> readSchemaAndReadData(schema, r.in(), reader, 
defaultValue),
-                            preferredNodeName,
-                            null);
-                })
-                .thenCompose(t -> loadSchemaAndReadData(t, reader))
-                .whenComplete((res, err) -> {
-                    if (err != null) {
-                        if (err.getCause() instanceof 
ClientSchemaVersionMismatchException) {
-                            // Retry with specific schema version.
-                            int expectedVersion = 
((ClientSchemaVersionMismatchException) err.getCause()).expectedVersion();
-
-                            doSchemaOutInOpAsync(opCode, writer, reader, 
defaultValue, provider, expectedVersion)
-                                    .whenComplete((res0, err0) -> {
-                                        if (err0 != null) {
-                                            fut.completeExceptionally(err0);
-                                        } else {
-                                            fut.complete(res0);
-                                        }
-                                    });
-                        } else {
-                            fut.completeExceptionally(err);
-                        }
-                    } else {
-                        fut.complete(res);
-                    }
-                });
-
-        return fut;
-    }
-
     /**
      * Performs a schema-based operation.
      *
@@ -350,12 +291,21 @@ public class ClientTable implements Table {
      * @param <T> Result type.
      * @return Future representing pending completion of the operation.
      */
+    @SuppressWarnings("ClassEscapesDefinedScope")
     public <T> CompletableFuture<T> doSchemaOutOpAsync(
             int opCode,
             BiConsumer<ClientSchema, PayloadOutputChannel> writer,
             Function<ClientMessageUnpacker, T> reader,
             @Nullable PartitionAwarenessProvider provider) {
-        return doSchemaOutOpAsync(opCode, writer, reader, provider, null, 
null);
+        return doSchemaOutInOpAsync(
+                opCode,
+                writer,
+                (schema, unpacker) -> reader.apply(unpacker),
+                null,
+                false,
+                provider,
+                null,
+                null);
     }
 
     /**
@@ -374,7 +324,15 @@ public class ClientTable implements Table {
             Function<ClientMessageUnpacker, T> reader,
             @Nullable PartitionAwarenessProvider provider,
             @Nullable RetryPolicy retryPolicyOverride) {
-        return doSchemaOutOpAsync(opCode, writer, reader, provider, 
retryPolicyOverride, null);
+        return doSchemaOutInOpAsync(
+                opCode,
+                writer,
+                (schema, unpacker) -> reader.apply(unpacker),
+                null,
+                false,
+                provider,
+                retryPolicyOverride,
+                null);
     }
 
     /**
@@ -383,16 +341,41 @@ public class ClientTable implements Table {
      * @param opCode Op code.
      * @param writer Writer.
      * @param reader Reader.
+     * @param defaultValue Default value to use when server returns null.
+     * @param provider Partition awareness provider.
+     * @param <T> Result type.
+     * @return Future representing pending completion of the operation.
+     */
+    <T> CompletableFuture<T> doSchemaOutInOpAsync(
+            int opCode,
+            BiConsumer<ClientSchema, PayloadOutputChannel> writer,
+            BiFunction<ClientSchema, ClientMessageUnpacker, T> reader,
+            @Nullable T defaultValue,
+            @Nullable PartitionAwarenessProvider provider
+    ) {
+        return doSchemaOutInOpAsync(opCode, writer, reader, defaultValue, 
true, provider, null, null);
+    }
+
+    /**
+     * Performs a schema-based operation.
+     *
+     * @param opCode Op code.
+     * @param writer Writer.
+     * @param reader Reader.
+     * @param defaultValue Default value to use when server returns null.
+     * @param responseSchemaRequired Whether response schema is required to 
read the result.
      * @param provider Partition awareness provider.
      * @param retryPolicyOverride Retry policy override.
      * @param schemaVersionOverride Schema version override.
      * @param <T> Result type.
      * @return Future representing pending completion of the operation.
      */
-    private <T> CompletableFuture<T> doSchemaOutOpAsync(
+    private <T> CompletableFuture<T> doSchemaOutInOpAsync(
             int opCode,
             BiConsumer<ClientSchema, PayloadOutputChannel> writer,
-            Function<ClientMessageUnpacker, T> reader,
+            BiFunction<ClientSchema, ClientMessageUnpacker, T> reader,
+            @Nullable T defaultValue,
+            boolean responseSchemaRequired,
             @Nullable PartitionAwarenessProvider provider,
             @Nullable RetryPolicy retryPolicyOverride,
             @Nullable Integer schemaVersionOverride) {
@@ -403,40 +386,59 @@ public class ClientTable implements Table {
                 ? CompletableFuture.completedFuture(null)
                 : getPartitionAssignment();
 
+        // Wait for schema and partition assignment.
         CompletableFuture.allOf(schemaFut, partitionsFut)
                 .thenCompose(v -> {
                     ClientSchema schema = schemaFut.getNow(null);
                     String preferredNodeName = getPreferredNodeName(provider, 
partitionsFut.getNow(null), schema);
 
+                    // Perform the operation.
                     return ch.serviceAsync(opCode,
                             w -> writer.accept(schema, w),
-                            r -> {
-                                ensureSchemaLoadedAsync(r.in().unpackInt());
-
-                                return reader.apply(r.in());
-                            },
+                            r -> readSchemaAndReadData(schema, r.in(), reader, 
defaultValue, responseSchemaRequired),
                             preferredNodeName,
                             retryPolicyOverride);
                 })
+
+                // Read resulting schema and the rest of the response.
+                .thenCompose(t -> loadSchemaAndReadData(t, reader))
                 .whenComplete((res, err) -> {
-                    if (err != null) {
-                        if (err.getCause() instanceof 
ClientSchemaVersionMismatchException) {
-                            // Retry with specific schema version.
-                            int expectedVersion = 
((ClientSchemaVersionMismatchException) err.getCause()).expectedVersion();
-
-                            doSchemaOutOpAsync(opCode, writer, reader, 
provider, retryPolicyOverride, expectedVersion)
-                                    .whenComplete((res0, err0) -> {
-                                        if (err0 != null) {
-                                            fut.completeExceptionally(err0);
-                                        } else {
-                                            fut.complete(res0);
-                                        }
-                                    });
-                        } else {
-                            fut.completeExceptionally(err);
-                        }
-                    } else {
+                    if (err == null) {
                         fut.complete(res);
+                        return;
+                    }
+
+                    // Retry schema errors.
+                    Throwable cause = ExceptionUtils.unwrapRootCause(err);
+                    if (cause instanceof ClientSchemaVersionMismatchException) 
{
+                        // Retry with specific schema version.
+                        int expectedVersion = 
((ClientSchemaVersionMismatchException) cause).expectedVersion();
+
+                        doSchemaOutInOpAsync(opCode, writer, reader, 
defaultValue, responseSchemaRequired, provider, retryPolicyOverride,
+                                expectedVersion)
+                                .whenComplete((res0, err0) -> {
+                                    if (err0 != null) {
+                                        fut.completeExceptionally(err0);
+                                    } else {
+                                        fut.complete(res0);
+                                    }
+                                });
+                    } else if (schemaVersionOverride == null && cause 
instanceof UnmappedColumnsException) {
+                        // Force load latest schema and revalidate user data 
against it.
+                        // When schemaVersionOverride is not null, we already 
tried to load the schema.
+                        schemas.remove(UNKNOWN_SCHEMA_VERSION);
+
+                        doSchemaOutInOpAsync(opCode, writer, reader, 
defaultValue, responseSchemaRequired, provider, retryPolicyOverride,
+                                UNKNOWN_SCHEMA_VERSION)
+                                .whenComplete((res0, err0) -> {
+                                    if (err0 != null) {
+                                        fut.completeExceptionally(err0);
+                                    } else {
+                                        fut.complete(res0);
+                                    }
+                                });
+                    } else {
+                        fut.completeExceptionally(err);
                     }
                 });
 
@@ -447,10 +449,17 @@ public class ClientTable implements Table {
             ClientSchema knownSchema,
             ClientMessageUnpacker in,
             BiFunction<ClientSchema, ClientMessageUnpacker, T> fn,
-            @Nullable T defaultValue
+            @Nullable T defaultValue,
+            boolean responseSchemaRequired
     ) {
         int schemaVer = in.unpackInt();
 
+        if (!responseSchemaRequired) {
+            ensureSchemaLoadedAsync(schemaVer);
+
+            return fn.apply(null, in);
+        }
+
         if (in.tryUnpackNil()) {
             ensureSchemaLoadedAsync(schemaVer);
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
index 635ae227c4..b89acca239 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
@@ -36,6 +36,7 @@ import 
org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.TuplePart;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.marshaller.UnmappedColumnsException;
 import org.apache.ignite.internal.util.HashCalculator;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.table.Tuple;
@@ -212,7 +213,7 @@ public class ClientTupleSerializer {
         }
 
         if (val != null && val.columnCount() > usedValCols) {
-            throwSchemaMismatchException(key, schema, TuplePart.VAL);
+            throwSchemaMismatchException(val, schema, TuplePart.VAL);
         }
 
         out.out().packBinaryTuple(builder, noValueSet);
@@ -441,6 +442,6 @@ public class ClientTupleSerializer {
         }
 
         throw new IllegalArgumentException(String.format("%s doesn't match 
schema: schemaVersion=%s, extraColumns=%s",
-                prefix, schema.version(), extraColumns));
+                prefix, schema.version(), extraColumns), new 
UnmappedColumnsException());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index cab9d6d8dd..30dec96249 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -346,6 +346,22 @@ public final class ExceptionUtils {
         return e;
     }
 
+    /**
+     * Unwraps the root cause of the provided {@code err}.
+     *
+     * @param e Throwable.
+     * @return Root cause.
+     */
+    public static <T extends Throwable> T unwrapRootCause(Throwable e) {
+        var cause = e;
+
+        while (cause.getCause() != null) {
+            cause = cause.getCause();
+        }
+
+        return (T) cause;
+    }
+
     /**
      * Creates a new exception, which type is defined by the provided {@code 
supplier}, with the specified {@code t} as a cause.
      * In the case when the provided cause {@code t} is an instance of {@link 
IgniteInternalException}
diff --git 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
index 9f086cd42e..d62dc211c7 100644
--- 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
+++ 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
@@ -120,7 +120,8 @@ public abstract class Marshaller {
                 }
 
                 throw new IllegalArgumentException(
-                        "Fields " + fieldSet + " of type " + 
mapper.targetType().getName() + " are not mapped to columns.");
+                        "Fields " + fieldSet + " of type " + 
mapper.targetType().getName() + " are not mapped to columns.",
+                        new UnmappedColumnsException());
             }
         }
 
diff --git 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/UnmappedColumnsException.java
 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/UnmappedColumnsException.java
new file mode 100644
index 0000000000..8d757990c8
--- /dev/null
+++ 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/UnmappedColumnsException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.marshaller;
+
+/**
+ * Unmapped columns exception.
+ */
+public class UnmappedColumnsException extends RuntimeException {
+    private static final long serialVersionUID = 6756761904316593515L;
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
index d47a4cd925..174502161f 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
@@ -20,12 +20,18 @@ package org.apache.ignite.internal.runner.app.client;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Tests for client schema synchronization.
@@ -97,4 +103,156 @@ public class ItThinClientSchemaSynchronizationTest extends 
ItAbstractThinClientT
         ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME 
VARCHAR NOT NULL");
         assertNull(recordView.get(null, rec).stringValue(1));
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testClientReloadsTupleSchemaOnUnmappedColumnException(boolean 
useGetAndUpsert) throws InterruptedException {
+        IgniteClient client = client();
+        Session ses = client.sql().createSession();
+
+        String tableName = 
"testClientReloadsTupleSchemaOnUnmappedColumnException_" + useGetAndUpsert;
+        ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL 
PRIMARY KEY)");
+
+        waitForTableOnAllNodes(tableName);
+        RecordView<Tuple> recordView = 
client.tables().table(tableName).recordView();
+
+        Tuple rec = Tuple.create().set("ID", 1).set("NAME", "name");
+        Runnable action = useGetAndUpsert
+                ? () -> recordView.getAndUpsert(null, rec)
+                : () -> recordView.insert(null, rec);
+
+        // Insert fails, because there is no NAME column.
+        var ex = assertThrows(IgniteException.class, action::run);
+        assertEquals("Tuple doesn't match schema: schemaVersion=1, 
extraColumns=[NAME]", ex.getMessage());
+
+        // Modify table, insert again - client will use old schema, throw 
ClientSchemaMismatchException,
+        // reload schema, retry with new schema and succeed.
+        ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME 
VARCHAR NOT NULL");
+        action.run();
+
+        assertEquals("name", recordView.get(null, rec).stringValue(1));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testClientReloadsKvTupleSchemaOnUnmappedColumnException(boolean 
useGetAndPut) throws InterruptedException {
+        IgniteClient client = client();
+        Session ses = client.sql().createSession();
+
+        String tableName = 
"testClientReloadsKvTupleSchemaOnUnmappedColumnException_" + useGetAndPut;
+        ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL 
PRIMARY KEY)");
+
+        waitForTableOnAllNodes(tableName);
+        KeyValueView<Tuple, Tuple> kvView = 
client.tables().table(tableName).keyValueView();
+
+        // Insert fails, because there is no NAME column.
+        Tuple key = Tuple.create().set("ID", 1);
+        Tuple val = Tuple.create().set("NAME", "name");
+
+        Runnable action = useGetAndPut
+                ? () -> kvView.getAndPut(null, key, val)
+                : () -> kvView.put(null, key, val);
+
+        var ex = assertThrows(IgniteException.class, action::run);
+        assertEquals("Value tuple doesn't match schema: schemaVersion=1, 
extraColumns=[NAME]", ex.getMessage());
+
+        // Modify table, insert again - client will use old schema, throw 
ClientSchemaMismatchException,
+        // reload schema, retry with new schema and succeed.
+        ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME 
VARCHAR NOT NULL");
+        action.run();
+
+        assertEquals("name", kvView.get(null, key).stringValue(0));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testClientReloadsPojoSchemaOnUnmappedColumnException(boolean 
useGetAndUpsert) throws InterruptedException {
+        IgniteClient client = client();
+        Session ses = client.sql().createSession();
+
+        String tableName = 
"testClientReloadsPojoSchemaOnUnmappedColumnException_" + useGetAndUpsert;
+        ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL 
PRIMARY KEY)");
+
+        waitForTableOnAllNodes(tableName);
+        RecordView<Pojo> recordView = 
client.tables().table(tableName).recordView(Mapper.of(Pojo.class));
+
+        // Insert fails, because there is no NAME column.
+        Pojo rec = new Pojo(1, "name");
+        Runnable action = useGetAndUpsert
+                ? () -> recordView.getAndUpsert(null, rec)
+                : () -> recordView.insert(null, rec);
+
+        var ex = assertThrows(IgniteException.class, action::run);
+        assertEquals(
+                "Fields [name] of type 
org.apache.ignite.internal.runner.app.client.ItThinClientSchemaSynchronizationTest$Pojo
 "
+                        + "are not mapped to columns.",
+                ex.getMessage());
+
+        // Modify table, insert again - client will use old schema, throw 
ClientSchemaMismatchException,
+        // reload schema, retry with new schema and succeed.
+        ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME 
VARCHAR NOT NULL");
+        action.run();
+
+        assertEquals("name", recordView.get(null, rec).name);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testClientReloadsKvPojoSchemaOnUnmappedColumnException(boolean 
useGetAndPut) throws InterruptedException {
+        IgniteClient client = client();
+        Session ses = client.sql().createSession();
+
+        String tableName = 
"testClientReloadsKvPojoSchemaOnUnmappedColumnException_" + useGetAndPut;
+        ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL 
PRIMARY KEY)");
+
+        waitForTableOnAllNodes(tableName);
+        KeyValueView<Integer, ValPojo> kvView = 
client.tables().table(tableName)
+                .keyValueView(Mapper.of(Integer.class), 
Mapper.of(ValPojo.class));
+
+        // Insert fails, because there is no NAME column.
+        Integer key = 1;
+        ValPojo val = new ValPojo("name");
+
+        Runnable action = useGetAndPut
+                ? () -> kvView.getAndPut(null, key, val)
+                : () -> kvView.put(null, key, val);
+
+        var ex = assertThrows(IgniteException.class, action::run);
+        assertEquals(
+                "Fields [name] of type "
+                        + 
"org.apache.ignite.internal.runner.app.client.ItThinClientSchemaSynchronizationTest$ValPojo
 "
+                        + "are not mapped to columns.",
+                ex.getMessage());
+
+        // Modify table, insert again - client will use old schema, throw 
ClientSchemaMismatchException,
+        // reload schema, retry with new schema and succeed.
+        ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME 
VARCHAR NOT NULL");
+        action.run();
+
+        assertEquals("name", kvView.get(null, key).name);
+    }
+
+    private static class Pojo {
+        public int id;
+        public String name;
+
+        public Pojo() {
+        }
+
+        public Pojo(int id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+    }
+
+    private static class ValPojo {
+        public String name;
+
+        public ValPojo() {
+        }
+
+        public ValPojo(String name) {
+            this.name = name;
+        }
+    }
 }
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
index 1ad524bfcd..31b4af28e7 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
@@ -82,6 +82,7 @@ import 
org.apache.ignite.internal.schema.testobjects.TestObjectWithPrivateConstr
 import org.apache.ignite.internal.schema.testobjects.TestSimpleObjectKey;
 import org.apache.ignite.internal.schema.testobjects.TestSimpleObjectVal;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.ObjectFactory;
 import org.apache.ignite.table.mapper.Mapper;
 import org.junit.jupiter.api.Assumptions;
@@ -205,13 +206,9 @@ public class KvMarshallerTest {
 
         SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
 
-        IllegalArgumentException ex = assertThrows(
+        IllegalArgumentException ex = 
ExceptionUtils.unwrapRootCause(assertThrows(
                 IllegalArgumentException.class,
-                () -> factory.create(schema, TestObjectWithAllTypes.class, 
TestObjectWithAllTypes.class));
-
-        while (ex.getCause() != null) {
-            ex = (IllegalArgumentException) ex.getCause();
-        }
+                () -> factory.create(schema, TestObjectWithAllTypes.class, 
TestObjectWithAllTypes.class)));
 
         assertEquals(
                 "Fields [bitmaskCol, booleanCol, byteCol, bytesCol, dateCol, 
dateTimeCol, decimalCol, doubleCol, floatCol, "
@@ -322,13 +319,9 @@ public class KvMarshallerTest {
         KvMarshaller<Integer, BitSet> marshaller =
                 factory.create(schema, Integer.class, BitSet.class);
 
-        Throwable ex = assertThrows(
+        Throwable ex = ExceptionUtils.unwrapRootCause(assertThrows(
                 MarshallerException.class,
-                () -> marshaller.marshal(1, IgniteTestUtils.randomBitSet(rnd, 
42)));
-
-        while (ex.getCause() != null) {
-            ex = ex.getCause();
-        }
+                () -> marshaller.marshal(1, IgniteTestUtils.randomBitSet(rnd, 
42))));
 
         assertThat(ex.getMessage(), startsWith("Failed to set bitmask for 
column 'BITMASKCOL' (mask size exceeds allocated size)"));
     }

Reply via email to