This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6e7f25d33c IGNITE-22569 Java thin: fix SqlBatchException propagation
(#3982)
6e7f25d33c is described below
commit 6e7f25d33cb0f9e9aa14b8ed73e17aed7fa82e1b
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Jun 27 07:56:14 2024 +0300
IGNITE-22569 Java thin: fix SqlBatchException propagation (#3982)
* Use existing `ErrorExtensions` mechanism to propagate additional
exception data to client
* Remove custom logic from `ClientSqlExecuteBatchRequest`
* User-facing API behavior is not affected
---
.../org/apache/ignite/sql/SqlBatchException.java | 22 ++-----
.../java/org/apache/ignite/sql/SqlException.java | 9 +--
.../internal/client/proto/ErrorExtensions.java | 2 +
.../handler/ClientInboundMessageHandler.java | 27 ++++++---
.../requests/sql/ClientSqlExecuteBatchRequest.java | 52 ++--------------
.../ignite/internal/client/TcpClientChannel.java | 32 ++++++----
.../ignite/internal/client/sql/ClientSql.java | 44 +-------------
modules/platforms/cpp/cmake/ignite_test.cmake | 4 ++
.../cpp/ignite/client/detail/table/table_impl.h | 13 ++--
modules/platforms/cpp/ignite/common/ignite_error.h | 44 ++++++++------
modules/platforms/cpp/ignite/odbc/odbc_error.h | 21 +++++++
.../platforms/cpp/ignite/odbc/query/data_query.cpp | 69 ++++++++++++----------
.../platforms/cpp/ignite/odbc/query/data_query.h | 7 +++
.../platforms/cpp/ignite/odbc/sql_connection.cpp | 15 ++++-
modules/platforms/cpp/ignite/odbc/sql_connection.h | 36 +++++++++++
modules/platforms/cpp/ignite/protocol/utils.cpp | 18 +++---
modules/platforms/cpp/ignite/protocol/utils.h | 9 +++
modules/platforms/cpp/ignite/tuple/tuple_test.cpp | 7 ++-
.../cpp/tests/odbc-test/odbc_connection.h | 6 +-
.../ignite/internal/sql/api/IgniteSqlImpl.java | 1 +
20 files changed, 240 insertions(+), 198 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
index 61b3e882ae..5a7fa853da 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java
@@ -20,6 +20,7 @@ package org.apache.ignite.sql;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import org.jetbrains.annotations.Nullable;
/**
* Subclass of {@link SqlException} is thrown when an error occurs during a
batch update operation. In addition to the
@@ -41,24 +42,11 @@ public class SqlBatchException extends SqlException {
* @param traceId Unique identifier of the exception.
* @param code Full error code.
* @param updCntrs Array that describes the outcome of a batch execution.
- * @param cause Non-null throwable cause.
- */
- public SqlBatchException(UUID traceId, int code, long[] updCntrs,
Throwable cause) {
- super(traceId, code, cause.getMessage(), cause);
-
- this.updCntrs = updCntrs != null ? updCntrs : LONG_EMPTY_ARRAY;
- }
-
- /**
- * Creates an exception with the given error message.
- *
- * @param traceId Unique identifier of the exception.
- * @param code Full error code.
- * @param updCntrs Array that describes the outcome of a batch execution.
* @param message Detailed message.
+ * @param cause Optional cause.
*/
- public SqlBatchException(UUID traceId, int code, long[] updCntrs, String
message) {
- super(traceId, code, message, null);
+ public SqlBatchException(UUID traceId, int code, long[] updCntrs, String
message, @Nullable Throwable cause) {
+ super(traceId, code, message, cause);
this.updCntrs = updCntrs != null ? updCntrs : LONG_EMPTY_ARRAY;
}
@@ -71,7 +59,7 @@ public class SqlBatchException extends SqlException {
* @param message Detailed message.
* @param cause Optional nested exception (can be {@code null}).
*/
- public SqlBatchException(UUID traceId, int code, String message, Throwable
cause) {
+ public SqlBatchException(UUID traceId, int code, String message, @Nullable
Throwable cause) {
super(traceId, code, message, cause);
while ((cause instanceof CompletionException || cause instanceof
ExecutionException) && cause.getCause() != null) {
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
b/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
index f1ca6e96b5..d61f1bd0b5 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java
@@ -19,6 +19,7 @@ package org.apache.ignite.sql;
import java.util.UUID;
import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
/**
* SQL exception base class.
@@ -60,7 +61,7 @@ public class SqlException extends IgniteException {
* @param code Full error code.
* @param cause Optional nested exception (can be {@code null}).
*/
- public SqlException(int code, Throwable cause) {
+ public SqlException(int code, @Nullable Throwable cause) {
super(code, cause);
}
@@ -71,7 +72,7 @@ public class SqlException extends IgniteException {
* @param code Full error code.
* @param cause Optional nested exception (can be {@code null}).
*/
- public SqlException(UUID traceId, int code, Throwable cause) {
+ public SqlException(UUID traceId, int code, @Nullable Throwable cause) {
super(traceId, code, cause);
}
@@ -82,7 +83,7 @@ public class SqlException extends IgniteException {
* @param message Detailed message.
* @param cause Optional nested exception (can be {@code null}).
*/
- public SqlException(int code, String message, Throwable cause) {
+ public SqlException(int code, String message, @Nullable Throwable cause) {
super(code, message, cause);
}
@@ -94,7 +95,7 @@ public class SqlException extends IgniteException {
* @param message Detailed message.
* @param cause Optional nested exception (can be {@code null}).
*/
- public SqlException(UUID traceId, int code, String message, Throwable
cause) {
+ public SqlException(UUID traceId, int code, String message, @Nullable
Throwable cause) {
super(traceId, code, message, cause);
}
}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
index f16e8b642e..3e4229ec18 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
@@ -22,4 +22,6 @@ package org.apache.ignite.internal.client.proto;
*/
public class ErrorExtensions {
public static final String EXPECTED_SCHEMA_VERSION = "expected-schema-ver";
+
+ public static final String SQL_UPDATE_COUNTERS = "sql-update-counters";
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index bf62df49f9..0a2c8154e1 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -18,6 +18,7 @@
package org.apache.ignite.client.handler;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
import static org.apache.ignite.lang.ErrorGroups.Client.HANDSHAKE_HEADER_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_COMPATIBILITY_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
@@ -138,6 +139,7 @@ import org.apache.ignite.lang.TraceableException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.security.AuthenticationType;
import
org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
+import org.apache.ignite.sql.SqlBatchException;
import org.jetbrains.annotations.Nullable;
/**
@@ -496,8 +498,14 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
}
private void writeErrorCore(Throwable err, ClientMessagePacker packer) {
- SchemaVersionMismatchException schemaVersionMismatchException =
schemaVersionMismatchException(err);
- err = schemaVersionMismatchException == null ?
ExceptionUtils.unwrapCause(err) : schemaVersionMismatchException;
+ SchemaVersionMismatchException schemaVersionMismatchException =
findException(err, SchemaVersionMismatchException.class);
+ SqlBatchException sqlBatchException = findException(err,
SqlBatchException.class);
+
+ err = firstNotNull(
+ schemaVersionMismatchException,
+ sqlBatchException,
+ ExceptionUtils.unwrapCause(err)
+ );
// Trace ID and error code.
if (err instanceof TraceableException) {
@@ -510,7 +518,8 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
}
// No need to send internal errors to client.
- Throwable pubErr =
IgniteExceptionMapperUtil.mapToPublicException(ExceptionUtils.unwrapCause(err));
+ assert err != null;
+ Throwable pubErr = IgniteExceptionMapperUtil.mapToPublicException(err);
// Class name and message.
packer.packString(pubErr.getClass().getName());
@@ -525,9 +534,13 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
// Extensions.
if (schemaVersionMismatchException != null) {
- packer.packInt(1);
+ packer.packInt(1); // 1 extension.
packer.packString(ErrorExtensions.EXPECTED_SCHEMA_VERSION);
packer.packInt(schemaVersionMismatchException.expectedVersion());
+ } else if (sqlBatchException != null) {
+ packer.packInt(1); // 1 extension.
+ packer.packString(ErrorExtensions.SQL_UPDATE_COUNTERS);
+ packer.packLongArray(sqlBatchException.updateCounters());
} else {
packer.packNil(); // No extensions.
}
@@ -861,10 +874,10 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
}
}
- private static @Nullable SchemaVersionMismatchException
schemaVersionMismatchException(Throwable e) {
+ private static <T> @Nullable T findException(Throwable e, Class<T> cls) {
while (e != null) {
- if (e instanceof SchemaVersionMismatchException) {
- return (SchemaVersionMismatchException) e;
+ if (cls.isInstance(e)) {
+ return (T) e;
}
e = e.getCause();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
index 6324a101fc..59af8c82ca 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.client.handler.requests.sql;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
@@ -30,9 +29,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.sql.BatchedArguments;
-import org.apache.ignite.sql.SqlBatchException;
/**
* Client SQL execute batch request.
@@ -79,55 +76,18 @@ public class ClientSqlExecuteBatchRequest {
() -> {},
cursor -> 0,
cursorId -> {})
- .handle((affectedRows, ex) -> {
+ .thenApply((affectedRows) -> {
out.meta(transactions.observableTimestamp());
- if (ex != null) {
- var cause = ExceptionUtils.unwrapCause(ex.getCause());
+ out.packNil(); // resourceId
- if (cause instanceof SqlBatchException) {
- var exBatch = ((SqlBatchException) cause);
+ out.packBoolean(false); // has row set
+ out.packBoolean(false); // has more pages
+ out.packBoolean(false); // was applied
- writeBatchResult(out, exBatch.updateCounters(),
exBatch.code(), exBatch.getMessage(), exBatch.traceId());
- return null;
- }
+ out.packLongArray(affectedRows); // affected rows
- affectedRows = ArrayUtils.LONG_EMPTY_ARRAY;
- }
-
- writeBatchResult(out, affectedRows);
return null;
});
}
-
- private static void writeBatchResult(
- ClientMessagePacker out,
- long[] affectedRows,
- int errorCode,
- String errorMessage,
- UUID traceId) {
- out.packNil(); // resourceId
-
- out.packBoolean(false); // has row set
- out.packBoolean(false); // has more pages
- out.packBoolean(false); // was applied
- out.packLongArray(affectedRows); // affected rows
- out.packInt(errorCode); // error code
- out.packString(errorMessage); // error message
- out.packUuid(traceId);
- }
-
- private static void writeBatchResult(
- ClientMessagePacker out,
- long[] affectedRows) {
- out.packNil(); // resourceId
-
- out.packBoolean(false); // has row set
- out.packBoolean(false); // has more pages
- out.packBoolean(false); // was applied
- out.packLongArray(affectedRows); // affected rows
- out.packNil(); // error code
- out.packNil(); // error message
- out.packNil(); // trace id
- }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 9d235c04ed..30e4faf85a 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.lang.ErrorGroups.Table;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.sql.SqlBatchException;
import org.jetbrains.annotations.Nullable;
/**
@@ -481,22 +482,24 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
IgniteException causeWithStackTrace = unpacker.tryUnpackNil() ? null :
new IgniteException(traceId, code, unpacker.unpackString());
- if (code == Table.SCHEMA_VERSION_MISMATCH_ERR) {
- int extSize;
- extSize = unpacker.tryUnpackNil() ? 0 : unpacker.unpackInt();
- int expectedSchemaVersion = -1;
+ int extSize = unpacker.tryUnpackNil() ? 0 : unpacker.unpackInt();
+ int expectedSchemaVersion = -1;
+ long[] sqlUpdateCounters = null;
- for (int i = 0; i < extSize; i++) {
- String key = unpacker.unpackString();
+ for (int i = 0; i < extSize; i++) {
+ String key = unpacker.unpackString();
- if (key.equals(ErrorExtensions.EXPECTED_SCHEMA_VERSION)) {
- expectedSchemaVersion = unpacker.unpackInt();
- } else {
- // Unknown extension - ignore.
- unpacker.skipValues(1);
- }
+ if (key.equals(ErrorExtensions.EXPECTED_SCHEMA_VERSION)) {
+ expectedSchemaVersion = unpacker.unpackInt();
+ } else if (key.equals(ErrorExtensions.SQL_UPDATE_COUNTERS)) {
+ sqlUpdateCounters = unpacker.unpackLongArray();
+ } else {
+ // Unknown extension - ignore.
+ unpacker.skipValues(1);
}
+ }
+ if (code == Table.SCHEMA_VERSION_MISMATCH_ERR) {
if (expectedSchemaVersion == -1) {
return new IgniteException(
traceId, PROTOCOL_ERR, "Expected schema version is not
specified in error extension map.", causeWithStackTrace);
@@ -505,6 +508,11 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
return new ClientSchemaVersionMismatchException(traceId, code,
errMsg, expectedSchemaVersion, causeWithStackTrace);
}
+ if (sqlUpdateCounters != null) {
+ errMsg = errMsg != null ? errMsg : "SQL batch execution error";
+ return new SqlBatchException(traceId, code, sqlUpdateCounters,
errMsg, causeWithStackTrace);
+ }
+
try {
Class<? extends Throwable> errCls = (Class<? extends Throwable>)
Class.forName(errClassName);
return copyExceptionWithCause(errCls, traceId, code, errMsg,
causeWithStackTrace);
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 67a6b05174..437b945c4d 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -24,7 +24,6 @@ import java.time.ZoneId;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
@@ -45,7 +44,6 @@ import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
-import org.apache.ignite.sql.SqlBatchException;
import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
@@ -280,7 +278,7 @@ public class ClientSql implements IgniteSql {
w.out().packLong(ch.observableTimestamp());
};
- PayloadReader<BatchResultInternal> payloadReader = r -> {
+ PayloadReader<long[]> payloadReader = r -> {
ClientMessageUnpacker unpacker = r.in();
// skipping currently unused values:
@@ -290,31 +288,10 @@ public class ClientSql implements IgniteSql {
// 4. was applied flag
unpacker.skipValues(4);
- long[] updateCounters = unpacker.unpackLongArray();
-
- if (unpacker.tryUnpackNil()) {
- // No error - skipping message string and trace id.
- unpacker.skipValues(2);
-
- return new BatchResultInternal(updateCounters);
- }
-
- int errCode = unpacker.unpackInt();
- String message = unpacker.tryUnpackNil() ? null :
unpacker.unpackString();
- UUID traceId = unpacker.unpackUuid();
-
- return new BatchResultInternal(new SqlBatchException(traceId,
errCode, updateCounters, message));
+ return unpacker.unpackLongArray(); // Update counters.
};
- return ch.serviceAsync(ClientOp.SQL_EXEC_BATCH, payloadWriter,
payloadReader)
- .thenApply((batchRes) -> {
- if (batchRes.exception != null) {
- throw batchRes.exception;
- }
-
- return batchRes.updCounters;
- })
- .exceptionally(ClientSql::handleException);
+ return ch.serviceAsync(ClientOp.SQL_EXEC_BATCH, payloadWriter,
payloadReader);
}
/** {@inheritDoc} */
@@ -370,19 +347,4 @@ public class ClientSql implements IgniteSql {
throw ExceptionUtils.sneakyThrow(ex);
}
-
- private static class BatchResultInternal {
- final long[] updCounters;
- final SqlBatchException exception;
-
- BatchResultInternal(long[] updCounters) {
- this.updCounters = updCounters;
- this.exception = null;
- }
-
- BatchResultInternal(SqlBatchException exception) {
- this.updCounters = null;
- this.exception = exception;
- }
- }
}
diff --git a/modules/platforms/cpp/cmake/ignite_test.cmake
b/modules/platforms/cpp/cmake/ignite_test.cmake
index b900141401..003ae7f656 100644
--- a/modules/platforms/cpp/cmake/ignite_test.cmake
+++ b/modules/platforms/cpp/cmake/ignite_test.cmake
@@ -23,6 +23,10 @@ function(ignite_test TEST_NAME)
return()
endif()
+ if (MSVC)
+ add_compile_options(/bigobj)
+ endif()
+
set(OPTIONAL_ARGUMENT_TAGS DISCOVER)
set(SINGLE_ARGUMENT_TAGS)
set(MULTI_ARGUMENT_TAGS LIBS SOURCES)
diff --git a/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
b/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
index 2b6f86e475..518054eb5f 100644
--- a/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
+++ b/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
@@ -142,12 +142,15 @@ public:
void with_proper_schema_async(
ignite_callback<T> user_callback, std::function<void(const schema &,
ignite_callback<T>)> callback) {
auto fail_over = [uc = std::move(user_callback), this,
callback](ignite_result<T> &&res) mutable {
- if (res.has_error() &&
res.error().get_schema_version().has_value()) {
- auto ver = *res.error().get_schema_version();
- with_schema_async<T>(ver, std::move(uc), callback);
- } else {
- uc(std::move(res));
+ if (res.has_error()) {
+ auto ver_opt = res.error().template get_extra<std::int32_t>(
+ protocol::error_extensions::EXPECTED_SCHEMA_VERSION);
+ if (ver_opt) {
+ with_schema_async<T>(*ver_opt, std::move(uc), callback);
+ return;
+ }
}
+ uc(std::move(res));
};
with_latest_schema_async<T>(std::move(fail_over), callback);
diff --git a/modules/platforms/cpp/ignite/common/ignite_error.h
b/modules/platforms/cpp/ignite/common/ignite_error.h
index d64f4d508a..1d15ce3f71 100644
--- a/modules/platforms/cpp/ignite/common/ignite_error.h
+++ b/modules/platforms/cpp/ignite/common/ignite_error.h
@@ -23,6 +23,8 @@
#include <exception>
#include <optional>
#include <string>
+#include <any>
+#include <map>
namespace ignite {
@@ -61,18 +63,6 @@ public:
: m_status_code(code)
, m_message(std::move(message)) {} //
NOLINT(bugprone-throw-keyword-missing)
- /**
- * Constructor.
- *
- * @param statusCode Status code.
- * @param message Message.
- * @param ver Version.
- */
- explicit ignite_error(error::code code, std::string message,
std::optional<std::int32_t> ver) noexcept
- : m_status_code(code)
- , m_message(std::move(message)) //
NOLINT(bugprone-throw-keyword-missing)
- , m_version(ver) {}
-
/**
* Constructor.
*
@@ -118,12 +108,30 @@ public:
[[nodiscard]] std::int32_t get_flags() const noexcept { return m_flags; }
/**
- * Get expected schema version.
- * Internal method.
+ * Add an extra information.
*
- * @return Expected schema version.
+ * @tparam T Extra type.
+ * @param key Key.
+ * @param value value.
*/
- [[nodiscard]] std::optional<std::int32_t> get_schema_version() const
noexcept { return m_version; }
+ template<typename T>
+ void add_extra(std::string key, T value) {
+ m_extras.emplace(std::pair{std::move(key),
std::any{std::move(value)}});
+ }
+
+ /**
+ * Get an extra information by the key.
+ *
+ * @return Extra.
+ */
+ template<typename T>
+ [[nodiscard]] std::optional<T> get_extra(const std::string &key) const
noexcept {
+ auto it = m_extras.find(key);
+ if (it == m_extras.end())
+ return {};
+
+ return std::any_cast<T>(it->second);
+ }
private:
/** Status code. */
@@ -138,8 +146,8 @@ private:
/** Flags. */
std::int32_t m_flags{0};
- /** Schema version. */
- std::optional<std::int32_t> m_version{};
+ /** Extras. */
+ std::map<std::string, std::any> m_extras;
};
} // namespace ignite
diff --git a/modules/platforms/cpp/ignite/odbc/odbc_error.h
b/modules/platforms/cpp/ignite/odbc/odbc_error.h
index 1d4e2f532e..83ba9f0f0d 100644
--- a/modules/platforms/cpp/ignite/odbc/odbc_error.h
+++ b/modules/platforms/cpp/ignite/odbc/odbc_error.h
@@ -22,6 +22,7 @@
#include <utility>
#include "common_types.h"
+#include "ignite/common/ignite_error.h"
namespace ignite {
@@ -43,6 +44,16 @@ public:
: m_state(state)
, m_message(std::move(message)) {}
+ /**
+ * Constructor.
+ *
+ * @param err Ignite error.
+ */
+ explicit odbc_error(ignite_error err) noexcept
+ : m_state(error_code_to_sql_state(err.get_status_code()))
+ , m_message(err.what_str())
+ , m_cause(std::move(err)) {}
+
/**
* Get state.
*
@@ -62,12 +73,22 @@ public:
*/
[[nodiscard]] char const *what() const noexcept override { return
m_message.c_str(); }
+ /**
+ * Get cause.
+ *
+ * @return Cause.
+ */
+ [[nodiscard]] const std::optional<ignite_error>& get_cause() const {
return m_cause; }
+
private:
/** Status. */
sql_state m_state{sql_state::UNKNOWN};
/** Error message. */
std::string m_message;
+
+ /** Cause. */
+ std::optional<ignite_error> m_cause;
};
} // namespace ignite
diff --git a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
index 8f55c5962d..6f80cead4d 100644
--- a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
+++ b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
@@ -251,6 +251,7 @@ sql_result data_query::next_result_set() {
sql_result data_query::make_request_execute() {
auto &schema = m_connection.get_schema();
+ bool single = m_params.get_param_set_size() <= 1;
auto success = m_diag.catch_errors([&] {
auto tx = m_connection.get_transaction_id();
if (!tx && !m_connection.is_auto_commit()) {
@@ -261,10 +262,9 @@ sql_result data_query::make_request_execute() {
assert(tx);
}
- bool single = m_params.get_param_set_size() <= 1;
auto client_op = single ? protocol::client_operation::SQL_EXEC :
protocol::client_operation::SQL_EXEC_BATCH;
- auto response = m_connection.sync_request(client_op,
[&](protocol::writer &writer) {
+ auto res = m_connection.sync_request_nothrow(client_op,
[&](protocol::writer &writer) {
if (tx)
writer.write(*tx);
else
@@ -301,8 +301,23 @@ sql_result data_query::make_request_execute() {
writer.write(m_connection.get_observable_timestamp());
});
+ // Check error
+ if (res.second) {
+ auto err = std::move(*res.second);
+ if (!single) {
+ auto affected_rows =
err.get_cause()->get_extra<std::vector<std::int64_t>>(
+ protocol::error_extensions::SQL_UPDATE_COUNTERS);
+ if (affected_rows) {
+ process_affected_rows(*affected_rows);
+ }
+ }
+
+ throw odbc_error{std::move(err)};
+ }
+
m_connection.mark_transaction_non_empty();
+ auto &response = res.first;
auto reader =
std::make_unique<protocol::reader>(response.get_bytes_view());
m_query_id = reader->read_object_nullable<std::int64_t>();
@@ -322,40 +337,34 @@ sql_result data_query::make_request_execute() {
m_executed = true;
} else {
auto affected_rows = reader->read_int64_array();
- auto status_ptr = m_params.get_params_status_ptr();
-
- m_rows_affected = 0;
- for (auto &ar : affected_rows) {
- m_rows_affected += ar;
- }
- m_params.set_params_processed(affected_rows.size());
-
- if (status_ptr) {
- for (auto i = 0; i < m_params.get_param_set_size(); i++) {
- status_ptr[i] = (std::size_t(i) < affected_rows.size()) ?
SQL_PARAM_SUCCESS : SQL_PARAM_ERROR;
- }
- }
+ process_affected_rows(affected_rows);
+ }
+ });
- // Batch query, set attribute if it's set
- if (auto affected = m_params.get_params_processed_ptr(); affected)
{
- *affected = m_rows_affected;
- }
+ return success ? sql_result::AI_SUCCESS : sql_result::AI_ERROR;
+}
- m_executed = true;
+void data_query::process_affected_rows(const std::vector<std::int64_t>
&affected_rows) {
+ auto status_ptr = m_params.get_params_status_ptr();
- // Check error if this is a batch query
- if (auto error_code = reader->read_int32_nullable(); error_code) {
- auto error_message = reader->read_string();
- throw
odbc_error(error_code_to_sql_state(error::code(error_code.value())),
error_message);
- } else {
- reader->skip(); // error message
- }
+ m_rows_affected = 0;
+ for (auto &ar : affected_rows) {
+ m_rows_affected += ar;
+ }
+ m_params.set_params_processed(affected_rows.size());
- reader->skip(); // trace id
+ if (status_ptr) {
+ for (auto i = 0; i < m_params.get_param_set_size(); i++) {
+ status_ptr[i] = (size_t(i) < affected_rows.size()) ?
SQL_PARAM_SUCCESS : SQL_PARAM_ERROR;
}
- });
+ }
- return success ? sql_result::AI_SUCCESS : sql_result::AI_ERROR;
+ // Batch query, set attribute if it's set
+ if (auto affected = m_params.get_params_processed_ptr(); affected) {
+ *affected = m_rows_affected;
+ }
+
+ m_executed = true;
}
sql_result data_query::make_request_close() {
diff --git a/modules/platforms/cpp/ignite/odbc/query/data_query.h
b/modules/platforms/cpp/ignite/odbc/query/data_query.h
index ca06503e24..396b14bca7 100644
--- a/modules/platforms/cpp/ignite/odbc/query/data_query.h
+++ b/modules/platforms/cpp/ignite/odbc/query/data_query.h
@@ -189,6 +189,13 @@ private:
*/
sql_result make_request_execute();
+ /**
+ * Process affected keys array received from the server.
+ *
+ * @param affected_rows Affected keys.
+ */
+ void process_affected_rows(const std::vector<std::int64_t> &affected_rows);
+
/**
* Make query close request.
*
diff --git a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
index 89f4e6e7ec..55325aff55 100644
--- a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
+++ b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
@@ -324,6 +324,15 @@ void sql_connection::send_message(bytes_view req,
std::int32_t timeout) {
}
network::data_buffer_owning sql_connection::receive_message(std::int64_t id,
std::int32_t timeout) {
+ auto res = receive_message_nothrow(id, timeout);
+ if (res.second) {
+ throw std::move(*res.second);
+ }
+ return std::move(res.first);
+}
+
+std::pair<network::data_buffer_owning, std::optional<odbc_error>>
sql_connection::receive_message_nothrow(
+ std::int64_t id, std::int32_t timeout) {
ensure_connected();
std::vector<std::byte> res;
@@ -349,12 +358,12 @@ network::data_buffer_owning
sql_connection::receive_message(std::int64_t id, std
auto observable_timestamp = reader.read_int64();
on_observable_timestamp(observable_timestamp);
+ std::optional<odbc_error> err;
if (test_flag(flags, protocol::response_flag::ERROR_FLAG)) {
- auto err = protocol::read_error(reader);
- throw odbc_error(error_code_to_sql_state(err.get_status_code()),
err.what_str());
+ err = odbc_error(protocol::read_error(reader));
}
- return network::data_buffer_owning{std::move(res), reader.position()};
+ return {network::data_buffer_owning{std::move(res),
reader.position()}, err};
}
}
diff --git a/modules/platforms/cpp/ignite/odbc/sql_connection.h
b/modules/platforms/cpp/ignite/odbc/sql_connection.h
index 91b1f6470d..70afbf18be 100644
--- a/modules/platforms/cpp/ignite/odbc/sql_connection.h
+++ b/modules/platforms/cpp/ignite/odbc/sql_connection.h
@@ -179,6 +179,16 @@ public:
*/
network::data_buffer_owning receive_message(std::int64_t id, std::int32_t
timeout);
+ /**
+ * Receive message.
+ *
+ * @param id Expected message ID.
+ * @param timeout Timeout.
+ * @return Message and error.
+ */
+ std::pair<network::data_buffer_owning, std::optional<odbc_error>>
receive_message_nothrow(std::int64_t id,
+ std::int32_t timeout);
+
/**
* Receive message.
*
@@ -187,6 +197,16 @@ public:
*/
network::data_buffer_owning receive_message(std::int64_t id) { return
receive_message(id, m_timeout); }
+ /**
+ * Receive message.
+ *
+ * @param id Expected message ID.
+ * @return Message and error.
+ */
+ std::pair<network::data_buffer_owning, std::optional<odbc_error>>
receive_message_nothrow(std::int64_t id) {
+ return receive_message_nothrow(id, m_timeout);
+ }
+
/**
* Get configuration.
*
@@ -277,6 +297,22 @@ public:
return receive_message(req_id);
}
+ /**
+ * Make a synchronous request and get a response.
+ *
+ * @param op Operation.
+ * @param wr Payload writing function.
+ * @return Response and error.
+ */
+ std::pair<network::data_buffer_owning, std::optional<odbc_error>>
sync_request_nothrow(
+ protocol::client_operation op, const
std::function<void(protocol::writer &)> &wr) {
+ auto req_id = generate_next_req_id();
+ auto request = make_request(req_id, op, wr);
+
+ send_message(request);
+ return receive_message_nothrow(req_id);
+ }
+
/**
* Get transaction ID.
*
diff --git a/modules/platforms/cpp/ignite/protocol/utils.cpp
b/modules/platforms/cpp/ignite/protocol/utils.cpp
index 820a4f7b65..0dcd01d671 100644
--- a/modules/platforms/cpp/ignite/protocol/utils.cpp
+++ b/modules/platforms/cpp/ignite/protocol/utils.cpp
@@ -30,13 +30,6 @@
namespace ignite::protocol {
-/**
- * Error data extensions. When the server returns an error response, it may
contain additional data in a map.
- * Keys are defined here.
- */
-namespace error_extensions {
-const std::string EXPECTED_SCHEMA_VERSION{"expected-schema-ver"};
-}
/**
* Check if int value fits in @c T.
@@ -231,21 +224,26 @@ ignite_error read_error(reader &reader) {
err_msg_builder << ": " << *message;
err_msg_builder << " (" << code << ", " << trace_id << ")";
- std::optional<std::int32_t> ver{};
+ ignite_error res{error::code(code), err_msg_builder.str()};
+
if (!reader.try_read_nil()) {
// Reading extensions
auto num = reader.read_int32();
for (std::int32_t i = 0; i < num; ++i) {
auto key = reader.read_string();
if (key == error_extensions::EXPECTED_SCHEMA_VERSION) {
- ver = reader.read_int32();
+ auto ver = reader.read_int32();
+ res.add_extra<std::int32_t>(std::move(key), ver);
+ } else if (key == error_extensions::SQL_UPDATE_COUNTERS) {
+ auto affected_rows = reader.read_int64_array();
+ res.add_extra<std::vector<std::int64_t>>(std::move(key),
std::move(affected_rows));
} else {
reader.skip();
}
}
}
- return ignite_error{error::code(code), err_msg_builder.str(), ver};
+ return res;
}
void claim_primitive_with_type(binary_tuple_builder &builder, const primitive
&value) {
diff --git a/modules/platforms/cpp/ignite/protocol/utils.h
b/modules/platforms/cpp/ignite/protocol/utils.h
index 517e1e00c2..436864b9d6 100644
--- a/modules/platforms/cpp/ignite/protocol/utils.h
+++ b/modules/platforms/cpp/ignite/protocol/utils.h
@@ -39,6 +39,15 @@ namespace ignite::protocol {
class reader;
+/**
+ * Error data extensions. When the server returns an error response, it may
contain additional data in a map.
+ * Keys are defined here.
+ */
+namespace error_extensions {
+constexpr const char* EXPECTED_SCHEMA_VERSION = "expected-schema-ver";
+constexpr const char* SQL_UPDATE_COUNTERS = "sql-update-counters";
+};
+
/** Magic bytes. */
static constexpr std::array<std::byte, 4> MAGIC_BYTES = {
std::byte('I'), std::byte('G'), std::byte('N'), std::byte('I')};
diff --git a/modules/platforms/cpp/ignite/tuple/tuple_test.cpp
b/modules/platforms/cpp/ignite/tuple/tuple_test.cpp
index 465f14bb8a..b15fd450f2 100644
--- a/modules/platforms/cpp/ignite/tuple/tuple_test.cpp
+++ b/modules/platforms/cpp/ignite/tuple/tuple_test.cpp
@@ -15,6 +15,9 @@
* limitations under the License.
*/
+#define _USE_MATH_DEFINES
+#include <cmath>
+
#include "binary_tuple_builder.h"
#include "binary_tuple_parser.h"
@@ -317,7 +320,7 @@ TEST(tuple, AllTypesExtended) {
static constexpr std::size_t COUNT = 1000;
std::random_device rd;
- std::uniform_int_distribution<char> dist;
+ std::uniform_int_distribution<short> dist;
for (std::size_t i = 0; i < COUNT; i++) {
int sign = (i % 2) ? -1 : 1;
@@ -336,7 +339,7 @@ TEST(tuple, AllTypesExtended) {
std::string buffer;
buffer.resize(i % 100 + 1);
- std::generate(buffer.begin(), buffer.end(), [&]() { return dist(rd);
});
+ std::generate(buffer.begin(), buffer.end(), [&]() { return
char(dist(rd) % 128); });
bool v1 = (i % 2 == 0);
int8_t v2 = int8_t(i % std::numeric_limits<int8_t>::max()) * sign;
diff --git a/modules/platforms/cpp/tests/odbc-test/odbc_connection.h
b/modules/platforms/cpp/tests/odbc-test/odbc_connection.h
index a090d66fdc..69e67647ed 100644
--- a/modules/platforms/cpp/tests/odbc-test/odbc_connection.h
+++ b/modules/platforms/cpp/tests/odbc-test/odbc_connection.h
@@ -720,15 +720,15 @@ public:
insert_test_batch(split_at, records_num, records_num - split_at);
for (int i = 0; i < records_num - split_at; ++i)
- EXPECT_EQ(statuses[i], SQL_PARAM_SUCCESS);
+ EXPECT_EQ(statuses[i], SQL_PARAM_SUCCESS) << "index=" << i;
insert_test_batch_no_check(0, records_num, ret);
for (int i = 0; i < split_at; ++i)
- EXPECT_EQ(statuses[i], SQL_PARAM_SUCCESS);
+ EXPECT_EQ(statuses[i], SQL_PARAM_SUCCESS) << "index=" << i;
for (int i = split_at; i < records_num; ++i)
- EXPECT_EQ(statuses[i], SQL_PARAM_ERROR);
+ EXPECT_EQ(statuses[i], SQL_PARAM_ERROR) << "index=" << i;
SQLFreeStmt(m_statement, SQL_RESET_PARAMS);
SQLFreeStmt(m_statement, SQL_UNBIND);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index bb38cffd45..5a2da1d991 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -512,6 +512,7 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
((TraceableException) t).traceId(),
((TraceableException) t).code(),
counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY),
+ t.getMessage(),
t);
}