This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 197941f6f1 IGNITE-17929 Add read-only support to ClientTransactions
(#1625)
197941f6f1 is described below
commit 197941f6f11a1321f3e1b06e9d1b7f002949e817
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Feb 3 14:05:31 2023 +0200
IGNITE-17929 Add read-only support to ClientTransactions (#1625)
* Add readOnly flag to the client protocol.
* Add support to Java thin API.
---
.../handler/ClientInboundMessageHandler.java | 2 +-
.../requests/tx/ClientTransactionBeginRequest.java | 7 +++
.../ignite/internal/client/TcpIgniteClient.java | 2 +-
.../internal/client/tx/ClientTransactions.java | 14 ++++--
.../client/detail/transaction/transactions_impl.h | 8 +++-
modules/platforms/cpp/ignite/protocol/writer.h | 13 ++++++
.../Internal/Transactions/Transactions.cs | 7 ++-
.../app/client/ItThinClientTransactionsTest.java | 53 ++++++++++++++++++++++
8 files changed, 97 insertions(+), 9 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index f24b460d29..10fecbb4fd 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -463,7 +463,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter {
return ClientJdbcQueryMetadataRequest.process(in, out,
jdbcQueryCursorHandler);
case ClientOp.TX_BEGIN:
- return ClientTransactionBeginRequest.process(out,
igniteTransactions, resources);
+ return ClientTransactionBeginRequest.process(in, out,
igniteTransactions, resources);
case ClientOp.TX_COMMIT:
return ClientTransactionCommitRequest.process(in, resources);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
index 08d4ff708f..469cc542df 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.tx.IgniteTransactions;
@@ -32,15 +33,21 @@ public class ClientTransactionBeginRequest {
/**
* Processes the request.
*
+ * @param in Unpacker.
* @param out Packer.
* @param transactions Transactions.
* @param resources Resources.
* @return Future.
*/
public static CompletableFuture<Void> process(
+ ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTransactions transactions,
ClientResourceRegistry resources) {
+ if (in.unpackBoolean()) {
+ transactions = transactions.readOnly();
+ }
+
return transactions.beginAsync().thenAccept(t -> {
try {
long resourceId = resources.put(new ClientResource(t,
t::rollbackAsync));
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index efc3227dde..3c67061057 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -88,7 +88,7 @@ public class TcpIgniteClient implements IgniteClient {
ch = new ReliableChannel(chFactory, cfg);
tables = new ClientTables(ch);
- transactions = new ClientTransactions(ch);
+ transactions = new ClientTransactions(ch, false);
compute = new ClientCompute(ch, tables);
sql = new ClientSql(ch);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
index 6572c51a19..7830c0ccca 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
@@ -32,13 +32,17 @@ public class ClientTransactions implements
IgniteTransactions {
/** Channel. */
private final ReliableChannel ch;
+ /** Read only flag. */
+ private final boolean readOnly;
+
/**
* Constructor.
*
* @param ch Channel.
*/
- public ClientTransactions(ReliableChannel ch) {
+ public ClientTransactions(ReliableChannel ch, boolean readOnly) {
this.ch = ch;
+ this.readOnly = readOnly;
}
/** {@inheritDoc} */
@@ -57,12 +61,14 @@ public class ClientTransactions implements
IgniteTransactions {
/** {@inheritDoc} */
@Override
public CompletableFuture<Transaction> beginAsync() {
- return ch.serviceAsync(ClientOp.TX_BEGIN, w -> {}, r -> new
ClientTransaction(r.clientChannel(), r.in().unpackLong()));
+ return ch.serviceAsync(
+ ClientOp.TX_BEGIN,
+ w -> w.out().packBoolean(readOnly),
+ r -> new ClientTransaction(r.clientChannel(),
r.in().unpackLong()));
}
@Override
public IgniteTransactions readOnly() {
- // TODO: IGNITE-17929 Add read-only support to ClientTransactions
- return null;
+ return new ClientTransactions(ch, true);
}
}
diff --git
a/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h
b/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h
index 0262ff0928..530ec43096 100644
--- a/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h
+++ b/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h
@@ -56,14 +56,18 @@ public:
* @param callback Callback to be called with a new transaction or error
upon completion of asynchronous operation.
*/
IGNITE_API void begin_async(ignite_callback<transaction> callback) {
+ auto writer_func = [](protocol::writer &writer) {
+ writer.write_bool(false); // readOnly.
+ };
+
auto reader_func = [](protocol::reader &reader,
std::shared_ptr<node_connection> conn) mutable -> transaction {
auto id = reader.read_int64();
return transaction(std::make_shared<transaction_impl>(id,
std::move(conn)));
};
- m_connection->perform_request_rd<transaction>(
- client_operation::TX_BEGIN, std::move(reader_func),
std::move(callback));
+ m_connection->perform_request<transaction>(
+ client_operation::TX_BEGIN, std::move(writer_func),
std::move(reader_func), std::move(callback));
}
private:
diff --git a/modules/platforms/cpp/ignite/protocol/writer.h
b/modules/platforms/cpp/ignite/protocol/writer.h
index 2112e31f9c..3cb3125062 100644
--- a/modules/platforms/cpp/ignite/protocol/writer.h
+++ b/modules/platforms/cpp/ignite/protocol/writer.h
@@ -137,6 +137,19 @@ public:
msgpack_pack_ext_with_body(m_packer.get(), data.data(), data.size(),
std::int8_t(extension_type::BITMASK));
}
+ /**
+ * Write boolean.
+ *
+ * @param value Value to write.
+ */
+ void write_bool(bool value) {
+ if (value) {
+ msgpack_pack_true(m_packer.get());
+ } else {
+ msgpack_pack_false(m_packer.get());
+ }
+ }
+
private:
/**
* Write callback.
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
index d1863e1950..a096cdc7dc 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
@@ -41,8 +41,13 @@ namespace Apache.Ignite.Internal.Transactions
/// <inheritdoc/>
public async Task<ITransaction> BeginAsync()
{
+ using var writer = ProtoCommon.GetMessageWriter();
+
+ // TODO: IGNITE-18696
+ writer.MessageWriter.Write(false); // Read-only.
+
// Transaction and all corresponding operations must be performed
using the same connection.
- var (resBuf, socket) = await
_socket.DoOutInOpAndGetSocketAsync(ClientOp.TxBegin).ConfigureAwait(false);
+ var (resBuf, socket) = await
_socket.DoOutInOpAndGetSocketAsync(ClientOp.TxBegin, request:
writer).ConfigureAwait(false);
using (resBuf)
{
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index 0dbf9ef0c5..bd9c2835a2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -31,6 +31,7 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
@@ -40,6 +41,8 @@ import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
/**
* Thin client transactions integration test.
@@ -269,6 +272,56 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
var ex = assertThrows(IgniteException.class, () ->
recordView.upsert(tx, Tuple.create()));
assertThat(ex.getMessage(), containsString("Transaction context
has been lost due to connection errors"));
+ assertEquals(ErrorGroups.Client.CONNECTION_ERR, ex.code());
+ }
+ }
+
+ @Test
+ void testReadOnlyTxSeesOldDataAfterUpdate() {
+ KeyValueView<Integer, String> kvView = kvView();
+ kvView.put(null, 1, "1");
+
+ Transaction tx = client().transactions().readOnly().begin();
+ assertEquals("1", kvView.get(tx, 1));
+
+ // Update data in a different tx.
+ Transaction tx2 = client().transactions().begin();
+ kvView.put(tx2, 1, "2");
+ tx2.commit();
+
+ // Old tx sees old data.
+ assertEquals("1", kvView.get(tx, 1));
+
+ // New tx sees new data
+ Transaction tx3 = client().transactions().readOnly().begin();
+ assertEquals("2", kvView.get(tx3, 1));
+ }
+
+ @Test
+ void testUpdateInReadOnlyTxThrows() {
+ KeyValueView<Integer, String> kvView = kvView();
+ kvView.put(null, 1, "1");
+
+ Transaction tx = client().transactions().readOnly().begin();
+ var ex = assertThrows(TransactionException.class, () -> kvView.put(tx,
1, "2"));
+
+ assertThat(ex.getMessage(), containsString("Failed to enlist
read-write operation into read-only transaction"));
+
assertEquals(ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR,
ex.code());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCommitRollbackReadOnlyTxDoesNothing(boolean commit) {
+ KeyValueView<Integer, String> kvView = kvView();
+ kvView.put(null, 10, "1");
+
+ Transaction tx = client().transactions().readOnly().begin();
+ assertEquals("1", kvView.get(tx, 10));
+
+ if (commit) {
+ tx.commit();
+ } else {
+ tx.rollback();
}
}