This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 917f6cf07a IGNITE-17060 Implement script SQL API for java thin client
(#2907)
917f6cf07a is described below
commit 917f6cf07a7366dca945c3e3e38099e0b2233bb4
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Nov 30 14:01:13 2023 +0200
IGNITE-17060 Implement script SQL API for java thin client (#2907)
* Add `ClientSqlExecuteScriptRequest` on the server
* Implement `executeScriptAsync` on the client
* Enable existing tests
---
.../ignite/internal/client/proto/ClientOp.java | 3 ++
.../handler/ClientInboundMessageHandler.java | 4 ++
.../requests/sql/ClientSqlExecuteRequest.java | 2 +-
.../sql/ClientSqlExecuteScriptRequest.java | 62 ++++++++++++++++++++++
.../apache/ignite/client/ClientOperationType.java | 5 ++
.../org/apache/ignite/client/RetryReadPolicy.java | 1 +
.../ignite/internal/client/ClientChannel.java | 5 +-
.../apache/ignite/internal/client/ClientUtils.java | 4 ++
.../ignite/internal/client/ReliableChannel.java | 11 ++--
.../ignite/internal/client/TcpClientChannel.java | 8 +--
.../ignite/internal/client/sql/ClientSession.java | 18 +++----
.../sql/api/ItSqlClientAsynchronousApiTest.java | 2 -
.../sql/api/ItSqlClientSynchronousApiTest.java | 2 -
13 files changed, 101 insertions(+), 26 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index 1aa1f222d2..21331f8cd4 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -146,4 +146,7 @@ public class ClientOp {
/** JDBC command to commit/rollback transaction. */
public static final int JDBC_TX_FINISH = 55;
+
+ /** Execute SQL script. */
+ public static final int SQL_EXEC_SCRIPT = 56;
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index dafd1e0fbe..855ab192b6 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -55,6 +55,7 @@ import
org.apache.ignite.client.handler.requests.jdbc.JdbcMetadataCatalog;
import
org.apache.ignite.client.handler.requests.sql.ClientSqlCursorCloseRequest;
import
org.apache.ignite.client.handler.requests.sql.ClientSqlCursorNextPageRequest;
import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteRequest;
+import
org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteScriptRequest;
import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
import
org.apache.ignite.client.handler.requests.table.ClientTablePartitionPrimaryReplicasGetRequest;
@@ -687,6 +688,9 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
case ClientOp.JDBC_TX_FINISH:
return ClientJdbcFinishTxRequest.process(in, out,
jdbcQueryEventHandler);
+ case ClientOp.SQL_EXEC_SCRIPT:
+ return ClientSqlExecuteScriptRequest.process(in, sql,
igniteTransactions);
+
default:
throw new IgniteException(PROTOCOL_ERR, "Unexpected operation
code: " + opCode);
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index f36c7c1dd0..5219c8c6d5 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -156,7 +156,7 @@ public class ClientSqlExecuteRequest {
return statementBuilder.build();
}
- private static Session readSession(ClientMessageUnpacker in, IgniteSql
sql, IgniteTransactions transactions) {
+ static Session readSession(ClientMessageUnpacker in, IgniteSql sql,
@Nullable IgniteTransactions transactions) {
SessionBuilder sessionBuilder = sql.sessionBuilder();
if (transactions != null && sessionBuilder instanceof
SessionBuilderImpl) {
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
new file mode 100644
index 0000000000..0bdfe3dcbe
--- /dev/null
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.sql;
+
+import static
org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteRequest.readSession;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
+import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.Session;
+
+/**
+ * Client SQL execute script request.
+ */
+public class ClientSqlExecuteScriptRequest {
+ /**
+ * Processes the request.
+ *
+ * @param in Unpacker.
+ * @param sql SQL API.
+ * @return Future representing result of operation.
+ */
+ public static CompletableFuture<Void> process(
+ ClientMessageUnpacker in,
+ IgniteSql sql,
+ IgniteTransactionsImpl transactions
+ ) {
+ Session session = readSession(in, sql, null);
+ String script = in.unpackString();
+ Object[] arguments = in.unpackObjectArrayFromBinaryTuple();
+
+ if (arguments == null) {
+ // SQL engine requires non-null arguments, but we don't want to
complicate the protocol with this requirement.
+ arguments = ArrayUtils.OBJECT_EMPTY_ARRAY;
+ }
+
+ // TODO IGNITE-20232 Propagate observable timestamp to sql engine
using internal API.
+ HybridTimestamp clientTs =
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
+
+ transactions.updateObservableTimestamp(clientTs);
+
+ return session.executeScriptAsync(script, arguments);
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
index a2d688ff3c..76ab308314 100644
---
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
+++
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -134,6 +134,11 @@ public enum ClientOperationType {
*/
SQL_EXECUTE,
+ /**
+ * SQL Execute ({@link
org.apache.ignite.sql.Session#executeScriptAsync(String, Object...)}).
+ */
+ SQL_EXECUTE_SCRIPT,
+
/**
* SQL Cursor Next Page ({@link AsyncResultSet#fetchNextPage()}).
*/
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
index d882dfc2cb..ea15466407 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
@@ -53,6 +53,7 @@ public class RetryReadPolicy extends RetryLimitPolicy {
case TUPLE_UPSERT_ALL:
case SQL_EXECUTE:
case SQL_CURSOR_NEXT_PAGE:
+ case SQL_EXECUTE_SCRIPT:
return false;
default:
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
index 2cc819a997..b7a95e9c6c 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
+import org.jetbrains.annotations.Nullable;
/**
* Processing thin client requests and responses.
@@ -35,8 +36,8 @@ public interface ClientChannel extends AutoCloseable {
*/
<T> CompletableFuture<T> serviceAsync(
int opCode,
- PayloadWriter payloadWriter,
- PayloadReader<T> payloadReader
+ @Nullable PayloadWriter payloadWriter,
+ @Nullable PayloadReader<T> payloadReader
);
/**
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index 27c4871527..d3a981b7e7 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -218,6 +218,9 @@ public class ClientUtils {
case ClientOp.SQL_EXEC:
return ClientOperationType.SQL_EXECUTE;
+ case ClientOp.SQL_EXEC_SCRIPT:
+ return ClientOperationType.SQL_EXECUTE_SCRIPT;
+
case ClientOp.SQL_CURSOR_NEXT_PAGE:
return ClientOperationType.SQL_CURSOR_NEXT_PAGE;
@@ -230,6 +233,7 @@ public class ClientUtils {
case ClientOp.JDBC_TX_FINISH:
return null;
+
// Do not return null from default arm intentionally, so we don't
forget to update this when new ClientOp values are added.
default:
throw new UnsupportedOperationException("Invalid op code: " +
opCode);
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index b65aac75d9..a7ba04d227 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -200,8 +200,8 @@ public final class ReliableChannel implements AutoCloseable
{
*/
public <T> CompletableFuture<T> serviceAsync(
int opCode,
- PayloadWriter payloadWriter,
- PayloadReader<T> payloadReader,
+ @Nullable PayloadWriter payloadWriter,
+ @Nullable PayloadReader<T> payloadReader,
@Nullable String preferredNodeName,
@Nullable RetryPolicy retryPolicyOverride
) {
@@ -224,7 +224,7 @@ public final class ReliableChannel implements AutoCloseable
{
public <T> CompletableFuture<T> serviceAsync(
int opCode,
PayloadWriter payloadWriter,
- PayloadReader<T> payloadReader
+ @Nullable PayloadReader<T> payloadReader
) {
return serviceAsync(opCode, payloadWriter, payloadReader, null, null);
}
@@ -243,8 +243,8 @@ public final class ReliableChannel implements AutoCloseable
{
private <T> CompletableFuture<T> serviceAsyncInternal(
int opCode,
- PayloadWriter payloadWriter,
- PayloadReader<T> payloadReader,
+ @Nullable PayloadWriter payloadWriter,
+ @Nullable PayloadReader<T> payloadReader,
ClientChannel ch) {
return ch.serviceAsync(opCode, payloadWriter,
payloadReader).whenComplete((res, err) -> {
if (err != null && unwrapConnectionException(err) != null) {
@@ -386,7 +386,6 @@ public final class ReliableChannel implements AutoCloseable
{
String[] hostAddrs = clientCfg.addressesFinder().getAddresses();
if (hostAddrs.length == 0) {
- //noinspection NonPrivateFieldAccessedInSynchronizedContext
throw new IgniteException(CONFIGURATION_ERR, "Empty
addresses");
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index f24c78f8d7..c44a93b1ec 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -247,8 +247,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
@Override
public <T> CompletableFuture<T> serviceAsync(
int opCode,
- PayloadWriter payloadWriter,
- PayloadReader<T> payloadReader
+ @Nullable PayloadWriter payloadWriter,
+ @Nullable PayloadReader<T> payloadReader
) {
try {
if (log.isTraceEnabled()) {
@@ -273,7 +273,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
* @param payloadWriter Payload writer to stream or {@code null} if
request has no payload.
* @return Request future.
*/
- private ClientRequestFuture send(int opCode, PayloadWriter payloadWriter) {
+ private ClientRequestFuture send(int opCode, @Nullable PayloadWriter
payloadWriter) {
long id = reqId.getAndIncrement();
if (closed()) {
@@ -336,7 +336,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
* @param payloadReader Payload reader from stream.
* @return Future for the operation.
*/
- private <T> CompletableFuture<T> receiveAsync(ClientRequestFuture
pendingReq, PayloadReader<T> payloadReader) {
+ private <T> CompletableFuture<T> receiveAsync(ClientRequestFuture
pendingReq, @Nullable PayloadReader<T> payloadReader) {
return pendingReq.thenApplyAsync(payload -> {
if (payload == null) {
return null;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
index 1fc07cf2de..78aef7e541 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
@@ -221,18 +221,18 @@ public class ClientSession implements AbstractSession {
throw new UnsupportedOperationException("Not implemented yet.");
}
- /** {@inheritDoc} */
- @Override
- public void executeScript(String query, @Nullable Object... arguments) {
- // TODO IGNITE-17060.
- throw new UnsupportedOperationException("Not implemented yet.");
- }
-
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> executeScriptAsync(String query, @Nullable
Object... arguments) {
- // TODO IGNITE-17060.
- throw new UnsupportedOperationException("Not implemented yet.");
+ Objects.requireNonNull(query);
+
+ PayloadWriter payloadWriter = w -> {
+ w.out().packString(query);
+ w.out().packObjectArrayAsBinaryTuple(arguments);
+ w.out().packLong(ch.observableTimestamp());
+ };
+
+ return ch.serviceAsync(ClientOp.SQL_EXEC_SCRIPT, payloadWriter, null);
}
/** {@inheritDoc} */
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
index 73a33a1233..eec113abe1 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
@@ -72,13 +72,11 @@ public class ItSqlClientAsynchronousApiTest extends
ItSqlAsynchronousApiTest {
}
@Override
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060")
public void runScriptThatCompletesSuccessfully() {
super.runScriptThatCompletesSuccessfully();
}
@Override
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060")
public void runScriptThatFails() {
super.runScriptThatFails();
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
index 04e189a0c8..435ca957e3 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
@@ -74,13 +74,11 @@ public class ItSqlClientSynchronousApiTest extends
ItSqlSynchronousApiTest {
}
@Override
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060")
public void runScriptThatCompletesSuccessfully() {
super.runScriptThatCompletesSuccessfully();
}
@Override
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17060")
public void runScriptThatFails() {
super.runScriptThatFails();
}