This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5d195d48d9 IGNITE-19837 Java client: Retry outdated schema error
(#2381)
5d195d48d9 is described below
commit 5d195d48d9c2a294526a5d5c5b2a63ea79b0dae3
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Jul 31 18:43:49 2023 +0300
IGNITE-19837 Java client: Retry outdated schema error (#2381)
* Handle `SCHEMA_VERSION_MISMATCH_ERR` error code in Java client as a
special case
* Rethrow as internal `ClientSchemaVersionMismatchException`
* Catch on Table level, retry operations with specified schema
* Fix `SchemaVersionMismatchException` detection in
`ClientInboundMessageHandler`
Additionally, fix IGNITE-20103 - a separate fix would be difficult to test.
When schema changes (due to ALTER TABLE), old rows are wrapped into
`UpgradingRowAdapter`, which was not expected in `ClientTableCommon.writeTuple`:
* Fix `UpgradingRowAdapter` to avoid returning incorrect `BinaryTuple`
* Handle null `BinaryTuple` in `ClientTableCommon.writeTuple`
---
.../org/apache/ignite/lang/IgniteException.java | 2 +-
.../client/proto/ClientBinaryTupleUtils.java | 95 +++++++++++++++++++
.../handler/ClientInboundMessageHandler.java | 19 +++-
.../handler/requests/table/ClientTableCommon.java | 25 ++++-
.../ClientSchemaVersionMismatchException.java | 57 ++++++++++++
.../ignite/internal/client/TcpClientChannel.java | 27 +++++-
.../ignite/internal/client/table/ClientTable.java | 103 +++++++++++++++++++--
.../client/table/ClientTupleSerializer.java | 94 +------------------
.../ignite/internal/util/ExceptionUtils.java | 2 +-
.../ignite/lang/IgniteInternalException.java | 2 +-
.../ItThinClientSchemaSynchronizationTest.java | 66 ++++++++++---
.../streamer/ItAbstractDataStreamerTest.java | 52 +++++++++--
.../schema/registry/UpgradingRowAdapter.java | 8 ++
13 files changed, 417 insertions(+), 135 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java
b/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java
index a6120acbe7..1a31df9d6d 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java
@@ -181,7 +181,7 @@ public class IgniteException extends RuntimeException
implements TraceableExcept
* @param message Detailed message.
* @param cause Optional nested exception (can be {@code null}).
*/
- public IgniteException(UUID traceId, int code, String message, Throwable
cause) {
+ public IgniteException(UUID traceId, int code, String message, @Nullable
Throwable cause) {
super(message, cause);
this.traceId = traceId;
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
index 9d464eb4c4..d3eaf81bc5 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
@@ -33,6 +33,7 @@ import
org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.ColumnType;
+import org.jetbrains.annotations.Nullable;
/**
* Client binary tuple utils.
@@ -192,6 +193,100 @@ public class ClientBinaryTupleUtils {
}
}
+ /**
+ * Writes a column value to the binary tuple.
+ *
+ * @param builder Builder.
+ * @param type Column type.
+ * @param name Column name.
+ * @param scale Scale.
+ * @param v Value.
+ */
+ public static void appendValue(BinaryTupleBuilder builder, ColumnType
type, String name, int scale, @Nullable Object v) {
+ if (v == null) {
+ builder.appendNull();
+ return;
+ }
+
+ try {
+ switch (type) {
+ case BOOLEAN:
+ builder.appendBoolean((boolean) v);
+ return;
+
+ case INT8:
+ builder.appendByte((byte) v);
+ return;
+
+ case INT16:
+ builder.appendShort((short) v);
+ return;
+
+ case INT32:
+ builder.appendInt((int) v);
+ return;
+
+ case INT64:
+ builder.appendLong((long) v);
+ return;
+
+ case FLOAT:
+ builder.appendFloat((float) v);
+ return;
+
+ case DOUBLE:
+ builder.appendDouble((double) v);
+ return;
+
+ case DECIMAL:
+ builder.appendDecimalNotNull((BigDecimal) v, scale);
+ return;
+
+ case UUID:
+ builder.appendUuidNotNull((UUID) v);
+ return;
+
+ case STRING:
+ builder.appendStringNotNull((String) v);
+ return;
+
+ case BYTE_ARRAY:
+ builder.appendBytesNotNull((byte[]) v);
+ return;
+
+ case BITMASK:
+ builder.appendBitmaskNotNull((BitSet) v);
+ return;
+
+ case DATE:
+ builder.appendDateNotNull((LocalDate) v);
+ return;
+
+ case TIME:
+ builder.appendTimeNotNull((LocalTime) v);
+ return;
+
+ case DATETIME:
+ builder.appendDateTimeNotNull((LocalDateTime) v);
+ return;
+
+ case TIMESTAMP:
+ builder.appendTimestampNotNull((Instant) v);
+ return;
+
+ case NUMBER:
+ builder.appendNumberNotNull((BigInteger) v);
+ return;
+
+ default:
+ throw new IllegalArgumentException("Unsupported type: " +
type);
+ }
+ } catch (ClassCastException e) {
+ throw new IgniteException(PROTOCOL_ERR, "Incorrect value type for
column '" + name + "': " + e.getMessage(), e);
+ }
+ }
+
+
private static void appendTypeAndScale(BinaryTupleBuilder builder,
ColumnType type, int scale) {
builder.appendInt(type.ordinal());
builder.appendInt(scale);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index f41ebf8e8c..ef195a296d 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -396,7 +396,8 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
}
private void writeErrorCore(Throwable err, ClientMessagePacker packer) {
- err = ExceptionUtils.unwrapCause(err);
+ SchemaVersionMismatchException schemaVersionMismatchException =
schemaVersionMismatchException(err);
+ err = schemaVersionMismatchException == null ?
ExceptionUtils.unwrapCause(err) : schemaVersionMismatchException;
// Trace ID and error code.
if (err instanceof TraceableException) {
@@ -420,10 +421,10 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
}
// Extensions.
- if (err instanceof SchemaVersionMismatchException) {
+ if (schemaVersionMismatchException != null) {
packer.packMapHeader(1);
packer.packString(ErrorExtensions.EXPECTED_SCHEMA_VERSION);
- packer.packInt(((SchemaVersionMismatchException)
err).expectedVersion());
+ packer.packInt(schemaVersionMismatchException.expectedVersion());
} else {
packer.packNil(); // No extensions.
}
@@ -711,4 +712,16 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
throw new IllegalArgumentException("Unsupported extension type: "
+ type.getName());
}
}
+
+ private static @Nullable SchemaVersionMismatchException
schemaVersionMismatchException(Throwable e) {
+ while (e != null) {
+ if (e instanceof SchemaVersionMismatchException) {
+ return (SchemaVersionMismatchException) e;
+ }
+
+ e = e.getCause();
+ }
+
+ return null;
+ }
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index d04e4e1ba7..0c05718307 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -17,6 +17,7 @@
package org.apache.ignite.client.handler.requests.table;
+import static
org.apache.ignite.internal.client.proto.ClientMessageCommon.NO_VALUE;
import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.TABLE_ID_NOT_FOUND_ERR;
@@ -24,8 +25,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTupleContainer;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.TuplePart;
@@ -132,13 +135,27 @@ public class ClientTableCommon {
assert tuple instanceof BinaryTupleContainer : "Tuple must be a
BinaryTupleContainer: " + tuple.getClass();
BinaryTupleReader binaryTuple = ((BinaryTupleContainer)
tuple).binaryTuple();
- assert binaryTuple != null : "Binary tuple must not be null: " +
tuple.getClass();
int elementCount = part == TuplePart.KEY ?
schema.keyColumns().length() : schema.length();
- assert elementCount == binaryTuple.elementCount() :
- "Tuple element count mismatch: " + elementCount + " != " +
binaryTuple.elementCount();
- packer.packBinaryTuple(binaryTuple);
+ if (binaryTuple != null) {
+ assert elementCount == binaryTuple.elementCount() :
+ "Tuple element count mismatch: " + elementCount + " != " +
binaryTuple.elementCount() + " (" + tuple.getClass() + ")";
+
+ packer.packBinaryTuple(binaryTuple);
+ } else {
+ // Underlying binary tuple is not available or can't be used as
is, pack columns one by one.
+ var builder = new BinaryTupleBuilder(elementCount);
+
+ for (var i = 0; i < elementCount; i++) {
+ var col = schema.column(i);
+ Object v = tuple.valueOrDefault(col.name(), NO_VALUE);
+
+ ClientBinaryTupleUtils.appendValue(builder,
getColumnType(col.type().spec()), col.name(), getDecimalScale(col.type()), v);
+ }
+
+ packer.packBinaryTuple(builder);
+ }
}
/**
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientSchemaVersionMismatchException.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientSchemaVersionMismatchException.java
new file mode 100644
index 0000000000..ff9bb0ec7f
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientSchemaVersionMismatchException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Indicates incompatible schema version.
+ */
+public class ClientSchemaVersionMismatchException extends
IgniteInternalException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Expected schema version. */
+ private final int expectedVersion;
+
+ /**
+ * Constructor.
+ *
+ * @param traceId Trace ID.
+ * @param code Error code.
+ * @param message String message.
+ * @param expectedVersion Expected schema version.
+ * @param cause Cause.
+ */
+ ClientSchemaVersionMismatchException(UUID traceId, int code, @Nullable
String message, int expectedVersion, @Nullable Throwable cause) {
+ super(traceId, code, message, cause);
+
+ this.expectedVersion = expectedVersion;
+ }
+
+ /**
+ * Gets expected schema version.
+ *
+ * @return Expected schema version.
+ */
+ public int expectedVersion() {
+ return expectedVersion;
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 599e888332..61f72bbebb 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -51,12 +51,14 @@ import
org.apache.ignite.internal.client.proto.ClientMessageCommon;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.client.proto.ErrorExtensions;
import org.apache.ignite.internal.client.proto.HandshakeExtension;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ResponseFlags;
import org.apache.ignite.internal.client.proto.ServerMessageType;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.ErrorGroups.Table;
import org.apache.ignite.lang.IgniteCheckedException;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteExceptionUtils;
@@ -427,13 +429,34 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
private static <T extends Throwable> T readError(ClientMessageUnpacker
unpacker) {
var traceId = unpacker.unpackUuid();
var code = unpacker.unpackInt();
+
var errClassName = unpacker.unpackString();
var errMsg = unpacker.tryUnpackNil() ? null : unpacker.unpackString();
IgniteException causeWithStackTrace = unpacker.tryUnpackNil() ? null :
new IgniteException(traceId, code, unpacker.unpackString());
- // TODO IGNITE-19837 Retry outdated schema error
- unpacker.skipValues(1); // Error extensions.
+ if (code == Table.SCHEMA_VERSION_MISMATCH_ERR) {
+ int extSize = unpacker.tryUnpackNil() ? 0 :
unpacker.unpackMapHeader();
+ int expectedSchemaVersion = -1;
+
+ for (int i = 0; i < extSize; i++) {
+ String key = unpacker.unpackString();
+
+ if (key.equals(ErrorExtensions.EXPECTED_SCHEMA_VERSION)) {
+ expectedSchemaVersion = unpacker.unpackInt();
+ } else {
+ // Unknown extension - ignore.
+ unpacker.skipValues(1);
+ }
+ }
+
+ if (expectedSchemaVersion == -1) {
+ return (T) new IgniteException(
+ traceId, PROTOCOL_ERR, "Expected schema version is not
specified in error extension map.", causeWithStackTrace);
+ }
+
+ return (T) new ClientSchemaVersionMismatchException(traceId, code,
errMsg, expectedSchemaVersion, causeWithStackTrace);
+ }
try {
// TODO https://issues.apache.org/jira/browse/IGNITE-19539
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index e71e6cf786..b31328a824 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -30,6 +30,7 @@ import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.ignite.client.RetryPolicy;
+import org.apache.ignite.internal.client.ClientSchemaVersionMismatchException;
import org.apache.ignite.internal.client.ClientUtils;
import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
@@ -79,8 +80,8 @@ public class ClientTable implements Table {
/**
* Constructor.
*
- * @param ch Channel.
- * @param id Table id.
+ * @param ch Channel.
+ * @param id Table id.
* @param name Table name.
*/
public ClientTable(ReliableChannel ch, int id, String name) {
@@ -259,7 +260,7 @@ public class ClientTable implements Table {
/**
* Writes transaction, if present.
*
- * @param tx Transaction.
+ * @param tx Transaction.
* @param out Packer.
*/
public static void writeTx(@Nullable Transaction tx, PayloadOutputChannel
out) {
@@ -284,12 +285,25 @@ public class ClientTable implements Table {
@Nullable T defaultValue,
@Nullable PartitionAwarenessProvider provider
) {
- CompletableFuture<ClientSchema> schemaFut = getLatestSchema();
+ return doSchemaOutInOpAsync(opCode, writer, reader, defaultValue,
provider, null);
+ }
+
+ private <T> CompletableFuture<T> doSchemaOutInOpAsync(
+ int opCode,
+ BiConsumer<ClientSchema, PayloadOutputChannel> writer,
+ BiFunction<ClientSchema, ClientMessageUnpacker, T> reader,
+ @Nullable T defaultValue,
+ @Nullable PartitionAwarenessProvider provider,
+ @Nullable Integer schemaVersionOverride
+ ) {
+ CompletableFuture<T> fut = new CompletableFuture<>();
+
+ CompletableFuture<ClientSchema> schemaFut =
getSchema(schemaVersionOverride == null ? latestSchemaVer :
schemaVersionOverride);
CompletableFuture<List<String>> partitionsFut = provider == null ||
!provider.isPartitionAwarenessEnabled()
? CompletableFuture.completedFuture(null)
: getPartitionAssignment();
- return CompletableFuture.allOf(schemaFut, partitionsFut)
+ CompletableFuture.allOf(schemaFut, partitionsFut)
.thenCompose(v -> {
ClientSchema schema = schemaFut.getNow(null);
String preferredNodeName = getPreferredNodeName(provider,
partitionsFut.getNow(null), schema);
@@ -300,7 +314,30 @@ public class ClientTable implements Table {
preferredNodeName,
null);
})
- .thenCompose(t -> loadSchemaAndReadData(t, reader));
+ .thenCompose(t -> loadSchemaAndReadData(t, reader))
+ .whenComplete((res, err) -> {
+ if (err != null) {
+ if (err.getCause() instanceof
ClientSchemaVersionMismatchException) {
+ // Retry with specific schema version.
+ int expectedVersion =
((ClientSchemaVersionMismatchException) err.getCause()).expectedVersion();
+
+ doSchemaOutInOpAsync(opCode, writer, reader,
defaultValue, provider, expectedVersion)
+ .whenComplete((res0, err0) -> {
+ if (err0 != null) {
+ fut.completeExceptionally(err0);
+ } else {
+ fut.complete(res0);
+ }
+ });
+ } else {
+ fut.completeExceptionally(err);
+ }
+ } else {
+ fut.complete(res);
+ }
+ });
+
+ return fut;
}
/**
@@ -318,7 +355,7 @@ public class ClientTable implements Table {
BiConsumer<ClientSchema, PayloadOutputChannel> writer,
Function<ClientMessageUnpacker, T> reader,
@Nullable PartitionAwarenessProvider provider) {
- return doSchemaOutOpAsync(opCode, writer, reader, provider, null);
+ return doSchemaOutOpAsync(opCode, writer, reader, provider, null,
null);
}
/**
@@ -331,19 +368,42 @@ public class ClientTable implements Table {
* @param <T> Result type.
* @return Future representing pending completion of the operation.
*/
- public <T> CompletableFuture<T> doSchemaOutOpAsync(
+ <T> CompletableFuture<T> doSchemaOutOpAsync(
int opCode,
BiConsumer<ClientSchema, PayloadOutputChannel> writer,
Function<ClientMessageUnpacker, T> reader,
@Nullable PartitionAwarenessProvider provider,
@Nullable RetryPolicy retryPolicyOverride) {
+ return doSchemaOutOpAsync(opCode, writer, reader, provider,
retryPolicyOverride, null);
+ }
+
+ /**
+ * Performs a schema-based operation.
+ *
+ * @param opCode Op code.
+ * @param writer Writer.
+ * @param reader Reader.
+ * @param provider Partition awareness provider.
+ * @param retryPolicyOverride Retry policy override.
+ * @param schemaVersionOverride Schema version override.
+ * @param <T> Result type.
+ * @return Future representing pending completion of the operation.
+ */
+ private <T> CompletableFuture<T> doSchemaOutOpAsync(
+ int opCode,
+ BiConsumer<ClientSchema, PayloadOutputChannel> writer,
+ Function<ClientMessageUnpacker, T> reader,
+ @Nullable PartitionAwarenessProvider provider,
+ @Nullable RetryPolicy retryPolicyOverride,
+ @Nullable Integer schemaVersionOverride) {
+ CompletableFuture<T> fut = new CompletableFuture<>();
- CompletableFuture<ClientSchema> schemaFut = getLatestSchema();
+ CompletableFuture<ClientSchema> schemaFut =
getSchema(schemaVersionOverride == null ? latestSchemaVer :
schemaVersionOverride);
CompletableFuture<List<String>> partitionsFut = provider == null ||
!provider.isPartitionAwarenessEnabled()
? CompletableFuture.completedFuture(null)
: getPartitionAssignment();
- return CompletableFuture.allOf(schemaFut, partitionsFut)
+ CompletableFuture.allOf(schemaFut, partitionsFut)
.thenCompose(v -> {
ClientSchema schema = schemaFut.getNow(null);
String preferredNodeName = getPreferredNodeName(provider,
partitionsFut.getNow(null), schema);
@@ -357,7 +417,30 @@ public class ClientTable implements Table {
},
preferredNodeName,
retryPolicyOverride);
+ })
+ .whenComplete((res, err) -> {
+ if (err != null) {
+ if (err.getCause() instanceof
ClientSchemaVersionMismatchException) {
+ // Retry with specific schema version.
+ int expectedVersion =
((ClientSchemaVersionMismatchException) err.getCause()).expectedVersion();
+
+ doSchemaOutOpAsync(opCode, writer, reader,
provider, retryPolicyOverride, expectedVersion)
+ .whenComplete((res0, err0) -> {
+ if (err0 != null) {
+ fut.completeExceptionally(err0);
+ } else {
+ fut.complete(res0);
+ }
+ });
+ } else {
+ fut.completeExceptionally(err);
+ }
+ } else {
+ fut.complete(res);
+ }
});
+
+ return fut;
}
private <T> @Nullable Object readSchemaAndReadData(
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
index 23341d96f6..f8199d31b3 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
@@ -19,14 +19,7 @@ package org.apache.ignite.internal.client.table;
import static
org.apache.ignite.internal.client.proto.ClientMessageCommon.NO_VALUE;
import static org.apache.ignite.internal.client.table.ClientTable.writeTx;
-import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
+
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
@@ -34,16 +27,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.UUID;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.client.PayloadOutputChannel;
+import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.TuplePart;
import org.apache.ignite.internal.client.tx.ClientTransaction;
import org.apache.ignite.internal.util.HashCalculator;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
@@ -316,93 +308,13 @@ public class ClientTupleSerializer {
}
private static void appendValue(BinaryTupleBuilder builder, BitSet
noValueSet, ClientColumn col, Object v) {
- if (v == null) {
- builder.appendNull();
- return;
- }
-
if (v == NO_VALUE) {
noValueSet.set(col.schemaIndex());
builder.appendNull();
return;
}
- try {
- switch (col.type()) {
- case BOOLEAN:
- builder.appendBoolean((boolean) v);
- return;
-
- case INT8:
- builder.appendByte((byte) v);
- return;
-
- case INT16:
- builder.appendShort((short) v);
- return;
-
- case INT32:
- builder.appendInt((int) v);
- return;
-
- case INT64:
- builder.appendLong((long) v);
- return;
-
- case FLOAT:
- builder.appendFloat((float) v);
- return;
-
- case DOUBLE:
- builder.appendDouble((double) v);
- return;
-
- case DECIMAL:
- builder.appendDecimalNotNull((BigDecimal) v, col.scale());
- return;
-
- case UUID:
- builder.appendUuidNotNull((UUID) v);
- return;
-
- case STRING:
- builder.appendStringNotNull((String) v);
- return;
-
- case BYTE_ARRAY:
- builder.appendBytesNotNull((byte[]) v);
- return;
-
- case BITMASK:
- builder.appendBitmaskNotNull((BitSet) v);
- return;
-
- case DATE:
- builder.appendDateNotNull((LocalDate) v);
- return;
-
- case TIME:
- builder.appendTimeNotNull((LocalTime) v);
- return;
-
- case DATETIME:
- builder.appendDateTimeNotNull((LocalDateTime) v);
- return;
-
- case TIMESTAMP:
- builder.appendTimestampNotNull((Instant) v);
- return;
-
- case NUMBER:
- builder.appendNumberNotNull((BigInteger) v);
- return;
-
- default:
- throw new IllegalArgumentException("Unsupported type: " +
col.type());
- }
- } catch (ClassCastException e) {
- throw new IgniteException(PROTOCOL_ERR, "Incorrect value type for
column '" + col.name() + "': " + e.getMessage(), e);
- }
+ ClientBinaryTupleUtils.appendValue(builder, col.type(), col.name(),
col.scale(), v);
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index 3c6f88f80e..c722f505d7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -449,7 +449,7 @@ public final class ExceptionUtils {
* @param t Throwable to extract a trace identifier.
* @return Returns trace identifier.
*/
- public static UUID getOrCreateTraceId(Throwable t) {
+ public static UUID getOrCreateTraceId(@Nullable Throwable t) {
Throwable e = t;
// This collection is used to avoid infinite loops in case of cyclic
dependencies.
diff --git
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java
index 974b3ff2a1..73f5b04b6d 100644
---
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java
+++
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteInternalException.java
@@ -142,7 +142,7 @@ public class IgniteInternalException extends
RuntimeException implements Traceab
* @param message Detail message.
* @param cause Optional nested exception (can be {@code null}).
*/
- public IgniteInternalException(UUID traceId, int code, String message,
@Nullable Throwable cause) {
+ public IgniteInternalException(UUID traceId, int code, @Nullable String
message, @Nullable Throwable cause) {
super(message, cause);
this.traceId = traceId;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
index 1be0a64acd..e50b50e944 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
@@ -17,12 +17,10 @@
package org.apache.ignite.internal.runner.app.client;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import org.apache.ignite.client.IgniteClient;
-import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.Session;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
@@ -31,15 +29,15 @@ import org.junit.jupiter.api.Test;
/**
* Tests for client schema synchronization.
*/
+@SuppressWarnings("resource")
public class ItThinClientSchemaSynchronizationTest extends
ItAbstractThinClientTest {
- @SuppressWarnings("resource")
@Test
- void testOutdatedSchemaFromClientThrowsExceptionOnServer() throws
InterruptedException {
+ void testClientUsesLatestSchemaOnWrite() throws InterruptedException {
IgniteClient client = client();
Session ses = client.sql().createSession();
// Create table, insert data.
- String tableName =
"testOutdatedSchemaFromClientThrowsExceptionOnServer";
+ String tableName = "testClientUsesLatestSchemaOnWrite";
ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL
PRIMARY KEY)");
waitForTableOnAllNodes(tableName);
@@ -48,11 +46,55 @@ public class ItThinClientSchemaSynchronizationTest extends
ItAbstractThinClientT
Tuple rec = Tuple.create().set("ID", 1);
recordView.insert(null, rec);
- // Modify table, get data - client will use old schema.
- ses.execute(null, "ALTER TABLE
testOutdatedSchemaFromClientThrowsExceptionOnServer ADD COLUMN NAME VARCHAR");
+ // Modify table, insert data - client will use old schema, receive
error, retry with new schema.
+ // The process is transparent for the user: updated schema is in
effect immediately.
+ ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME
VARCHAR NOT NULL");
- // TODO IGNITE-19837 Retry outdated schema error
- IgniteException ex = assertThrows(IgniteException.class, () ->
recordView.insert(null, rec));
- assertThat(ex.getMessage(), containsString("Schema version mismatch
[expectedVer=2, actualVer=1]"));
+ Tuple rec2 = Tuple.create().set("ID", 1).set("NAME", "name");
+ recordView.upsert(null, rec2);
+
+ assertEquals("name", recordView.get(null, rec).stringValue(1));
+ }
+
+ @Test
+ void testClientUsesLatestSchemaOnRead() throws InterruptedException {
+ IgniteClient client = client();
+ Session ses = client.sql().createSession();
+
+ // Create table, insert data.
+ String tableName = "testClientUsesLatestSchemaOnRead";
+ ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL
PRIMARY KEY)");
+
+ waitForTableOnAllNodes(tableName);
+ RecordView<Tuple> recordView =
client.tables().table(tableName).recordView();
+
+ Tuple rec = Tuple.create().set("ID", 1);
+ recordView.insert(null, rec);
+
+ // Modify table, insert data - client will use old schema, receive
error, retry with new schema.
+ // The process is transparent for the user: updated schema is in
effect immediately.
+ ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME
VARCHAR DEFAULT 'def_name'");
+ assertEquals("def_name", recordView.get(null, rec).stringValue(1));
+ }
+
+ @Test
+ void testClientUsesLatestSchemaOnReadWithNotNullColumn() throws
InterruptedException {
+ IgniteClient client = client();
+ Session ses = client.sql().createSession();
+
+ // Create table, insert data.
+ String tableName = "testClientUsesLatestSchemaOnReadWithNotNullColumn";
+ ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL
PRIMARY KEY)");
+
+ waitForTableOnAllNodes(tableName);
+ RecordView<Tuple> recordView =
client.tables().table(tableName).recordView();
+
+ Tuple rec = Tuple.create().set("ID", 1);
+ recordView.insert(null, rec);
+
+ // Modify table and get old row.
+ // It still has null value in the old column, even though it is not
allowed by the new schema.
+ ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME
VARCHAR NOT NULL");
+ assertNull(recordView.get(null, rec).stringValue(1));
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index b4e67b24ea..dda034d88c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
+import org.apache.ignite.sql.Session;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
@@ -144,16 +145,7 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
view.streamData(publisher, options);
publisher.submit(tuple(1, "foo"));
- assertTrue(waitForCondition(() -> {
- @SuppressWarnings("resource")
- var tx = ignite().transactions().begin(new
TransactionOptions().readOnly(true));
-
- try {
- return view.get(tx, tupleKey(1)) != null;
- } finally {
- tx.rollback();
- }
- }, 50, 5000));
+ waitForKey(view, tupleKey(1));
}
}
@@ -213,6 +205,46 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
assertNull(view.get(null, tupleKey(10_000)));
}
+ @SuppressWarnings("resource")
+ @Test
+ public void testSchemaUpdateWhileStreaming() throws InterruptedException {
+ Session ses = ignite().sql().createSession();
+
+ String tableName = "testSchemaUpdateWhileStreaming";
+ ses.execute(null, "CREATE TABLE " + tableName + "(ID INT NOT NULL
PRIMARY KEY)");
+ RecordView<Tuple> view =
ignite().tables().table(tableName).recordView();
+
+ CompletableFuture<Void> streamerFut;
+
+ try (var publisher = new SubmissionPublisher<Tuple>()) {
+ var options = DataStreamerOptions.builder().batchSize(1).build();
+ streamerFut = view.streamData(publisher, options);
+
+ publisher.submit(tupleKey(1));
+ waitForKey(view, tupleKey(1));
+
+ ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME
VARCHAR NOT NULL");
+ publisher.submit(tuple(2, "bar"));
+ }
+
+ streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+ assertEquals("bar", view.get(null, tupleKey(2)).stringValue("name"));
+ }
+
+ private void waitForKey(RecordView<Tuple> view, Tuple key) throws
InterruptedException {
+ assertTrue(waitForCondition(() -> {
+ @SuppressWarnings("resource")
+ var tx = ignite().transactions().begin(new
TransactionOptions().readOnly(true));
+
+ try {
+ return view.get(tx, key) != null;
+ } finally {
+ tx.rollback();
+ }
+ }, 50, 5000));
+ }
+
private Table defaultTable() {
//noinspection resource
return ignite().tables().table(TABLE_NAME);
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
index dff0f63f1a..51924510a0 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
@@ -26,6 +26,7 @@ import java.time.LocalTime;
import java.util.BitSet;
import java.util.UUID;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.InvalidTypeException;
@@ -418,4 +419,11 @@ class UpgradingRowAdapter extends Row {
return mappedId < 0 ? (Instant) column.defaultValue() :
super.timestampValue(mappedId);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public BinaryTuple binaryTuple() {
+ // Underlying binary tuple can not be used directly.
+ return null;
+ }
}