This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a5ac6f720 IGNITE-16221 Thin 3.0: Add RetryPolicy (#761)
a5ac6f720 is described below
commit a5ac6f720d7d5da76d57e6b421a068076ee507d1
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Apr 5 15:27:20 2022 +0300
IGNITE-16221 Thin 3.0: Add RetryPolicy (#761)
* Add `RetryPolicy` interface.
* Add predefined policies: `RetryReadPolicy`, `RetryLimitPolicy` (default
limit is 16).
* Add `IgniteClientConfiguration.retryPolicy` property, defaults to
`RetryReadPolicy`.
* Remove `IgniteClientConfiguration.retryLimit` - limit can be handled by
the policy.
* .NET:
* Remove singletons from policies (does not make sense for mutable
classes).
* Remove `RetryAllPolicy` - it is the same as `RetryLimitPolicy` with a
zero limit.
* Set `RetryLimitPolicy.RetryLimit` to 16 by default.
* Use `RetryReadPolicy` as default.
https://cwiki.apache.org/confluence/display/IGNITE/IEP-82+Thin+Client+Retry+Policy
---
.../ignite/internal/client/proto/ClientOp.java | 3 -
.../handler/ClientInboundMessageHandler.java | 4 -
.../apache/ignite/client/ClientOperationType.java | 124 +++++++++++
.../org/apache/ignite/client/IgniteClient.java | 23 +-
.../ignite/client/IgniteClientConfiguration.java | 7 +-
.../org/apache/ignite/client/RetryLimitPolicy.java | 63 ++++++
.../org/apache/ignite/client/RetryPolicy.java} | 23 +-
.../apache/ignite/client/RetryPolicyContext.java} | 41 ++--
.../org/apache/ignite/client/RetryReadPolicy.java | 59 ++++++
.../apache/ignite/internal/client/ClientUtils.java | 122 +++++++++++
.../client/IgniteClientConfigurationImpl.java | 27 +--
.../ignite/internal/client/ReliableChannel.java | 80 ++++---
.../internal/client/RetryPolicyContextImpl.java | 83 ++++++++
.../io/netty/NettyClientConnectionMultiplexer.java | 11 +-
.../ignite/internal/client/table/ClientTables.java | 2 +-
.../ignite/internal/jdbc/ConnectionProperties.java | 15 --
.../internal/jdbc/ConnectionPropertiesImpl.java | 19 --
.../ignite/internal/jdbc/JdbcConnection.java | 2 -
.../apache/ignite/client/AbstractClientTest.java | 9 +-
.../org/apache/ignite/client/ClientTablesTest.java | 14 --
.../apache/ignite/client/ConfigurationTest.java | 4 -
.../org/apache/ignite/client/HeartbeatTest.java | 18 +-
.../org/apache/ignite/client/ReconnectTest.java | 3 +-
.../org/apache/ignite/client/RetryPolicyTest.java | 236 +++++++++++++++++++++
.../ignite/client/TestClientHandlerModule.java | 210 ++++++++++++++++++
.../org/apache/ignite/client/TestRetryPolicy.java} | 33 +--
.../java/org/apache/ignite/client/TestServer.java | 65 ++++--
.../org/apache/ignite/client/fakes/FakeIgnite.java | 23 +-
.../ignite/client/fakes/FakeIgniteTables.java | 8 +
.../dotnet/Apache.Ignite.Tests/FakeServerTests.cs | 7 +-
.../dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs | 54 ++++-
.../Apache.Ignite.Tests/RetryReadPolicyTests.cs | 2 +-
.../Apache.Ignite/IgniteClientConfiguration.cs | 6 +-
.../Apache.Ignite/Internal/ClientFailoverSocket.cs | 2 +-
.../dotnet/Apache.Ignite/RetryLimitPolicy.cs | 11 +-
.../dotnet/Apache.Ignite/RetryReadPolicy.cs | 5 -
36 files changed, 1176 insertions(+), 242 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index 3dad93fc2..f1e9487ff 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -24,9 +24,6 @@ public class ClientOp {
/** Heartbeat. */
public static final int HEARTBEAT = 1;
- /** Drop table. */
- public static final int TABLE_DROP = 2;
-
/** Get tables. */
public static final int TABLES_GET = 3;
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 455405772..09a121df0 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -38,7 +38,6 @@ import
org.apache.ignite.client.handler.requests.sql.ClientSqlSchemasMetadataReq
import
org.apache.ignite.client.handler.requests.sql.ClientSqlTableMetadataRequest;
import org.apache.ignite.client.handler.requests.sql.JdbcMetadataCatalog;
import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
-import org.apache.ignite.client.handler.requests.table.ClientTableDropRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTablesGetRequest;
import
org.apache.ignite.client.handler.requests.table.ClientTupleContainsKeyRequest;
@@ -312,9 +311,6 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter {
case ClientOp.HEARTBEAT:
return null;
- case ClientOp.TABLE_DROP:
- return ClientTableDropRequest.process(in, igniteTables);
-
case ClientOp.TABLES_GET:
return ClientTablesGetRequest.process(out, igniteTables);
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
new file mode 100644
index 000000000..69dee583a
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.util.Collection;
+import java.util.Set;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.apache.ignite.tx.Transaction;
+
+/**
+ * Client operation type.
+ */
+public enum ClientOperationType {
+ /**
+ * Get tables ({@link IgniteTables#tables()}).
+ */
+ TABLES_GET,
+
+ /**
+ * Get table ({@link IgniteTables#table(String)}).
+ */
+ TABLE_GET,
+
+ /**
+ * Upsert ({@link RecordView#upsert(Transaction, Object)}).
+ */
+ TUPLE_UPSERT,
+
+ /**
+ * Get ({@link RecordView#get(Transaction, Object)}).
+ */
+ TUPLE_GET,
+
+ /**
+ * Upsert ({@link RecordView#upsertAll(Transaction, Collection)}).
+ */
+ TUPLE_UPSERT_ALL,
+
+ /**
+ * Get All ({@link RecordView#getAll(Transaction, Collection)}).
+ */
+ TUPLE_GET_ALL,
+
+ /**
+ * Get and Upsert ({@link RecordView#getAndUpsert(Transaction, Object)}).
+ */
+ TUPLE_GET_AND_UPSERT,
+
+ /**
+ * Insert ({@link RecordView#insert(Transaction, Object)}).
+ */
+ TUPLE_INSERT,
+
+ /**
+ * Insert All ({@link RecordView#insertAll(Transaction, Collection)}).
+ */
+ TUPLE_INSERT_ALL,
+
+ /**
+ * Replace ({@link RecordView#replace(Transaction, Object)}).
+ */
+ TUPLE_REPLACE,
+
+ /**
+ * Replace Exact ({@link RecordView#replace(Transaction, Object, Object)}).
+ */
+ TUPLE_REPLACE_EXACT,
+
+ /**
+ * Get and Replace ({@link RecordView#getAndReplace(Transaction, Object)}).
+ */
+ TUPLE_GET_AND_REPLACE,
+
+ /**
+ * Delete ({@link RecordView#delete(Transaction, Object)}).
+ */
+ TUPLE_DELETE,
+
+ /**
+ * Delete All ({@link RecordView#deleteAll(Transaction, Collection)}).
+ */
+ TUPLE_DELETE_ALL,
+
+ /**
+ * Delete Exact ({@link RecordView#deleteExact(Transaction, Object)}).
+ */
+ TUPLE_DELETE_EXACT,
+
+ /**
+ * Delete All Exact ({@link RecordView#deleteAllExact(Transaction,
Collection)}).
+ */
+ TUPLE_DELETE_ALL_EXACT,
+
+ /**
+ * Get and Delete ({@link RecordView#getAndDelete(Transaction, Object)}).
+ */
+ TUPLE_GET_AND_DELETE,
+
+ /**
+ * Contains Key ({@link
org.apache.ignite.table.KeyValueView#contains(Transaction, Object)}).
+ */
+ TUPLE_CONTAINS_KEY,
+
+ /**
+ * Compute Execute ({@link
org.apache.ignite.compute.IgniteCompute#execute(Set, String, Object...)}).
+ */
+ COMPUTE_EXECUTE
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
index 604b8bb05..f7c0908f1 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_CONNECT_TI
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_INTERVAL;
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_PERIOD;
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_RETRIES;
-import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_RETRY_LIMIT;
import static org.apache.ignite.internal.client.ClientUtils.sync;
import java.util.Objects;
@@ -62,9 +61,6 @@ public interface IgniteClient extends Ignite {
/** Address finder. */
private IgniteClientAddressFinder addressFinder;
- /** Retry limit. */
- private int retryLimit = DFLT_RETRY_LIMIT;
-
/** Connect timeout. */
private long connectTimeout = DFLT_CONNECT_TIMEOUT;
@@ -80,6 +76,9 @@ public interface IgniteClient extends Ignite {
/** Heartbeat interval. */
private long heartbeatInterval = DFLT_HEARTBEAT_INTERVAL;
+ /** Retry policy. */
+ private RetryPolicy retryPolicy = new RetryReadPolicy();
+
/**
* Sets the addresses of Ignite server nodes within a cluster. An
address can be an IP address or a hostname, with or without port.
* If port is not set then Ignite will generate multiple addresses for
default port range. See {@link
@@ -97,16 +96,16 @@ public interface IgniteClient extends Ignite {
}
/**
- * Sets the retry limit. When a request fails due to a connection
error, and multiple server connections are available, Ignite will
- * retry the request on every connection. When this property is
greater than zero, Ignite will limit the number of retries.
+ * Sets the retry policy. When a request fails due to a connection
error, and multiple server connections
+ * are available, Ignite will retry the request if the specified
policy allows it.
*
- * <p>Default is {@link IgniteClientConfiguration#DFLT_RETRY_LIMIT}.
+ * <p>Default is {@link RetryReadPolicy}.
*
- * @param retryLimit Retry limit.
+ * @param retryPolicy Retry policy.
* @return This instance.
*/
- public Builder retryLimit(int retryLimit) {
- this.retryLimit = retryLimit;
+ public Builder retryPolicy(RetryPolicy retryPolicy) {
+ this.retryPolicy = retryPolicy;
return this;
}
@@ -233,12 +232,12 @@ public interface IgniteClient extends Ignite {
var cfg = new IgniteClientConfigurationImpl(
addressFinder,
addresses,
- retryLimit,
connectTimeout,
reconnectThrottlingPeriod,
reconnectThrottlingRetries,
asyncContinuationExecutor,
- heartbeatInterval);
+ heartbeatInterval,
+ retryPolicy);
return TcpIgniteClient.startAsync(cfg);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
index 4697c6629..81161e7a0 100644
---
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
+++
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -64,11 +64,12 @@ public interface IgniteClientConfiguration {
String[] addresses();
/**
- * Gets the retry limit.
+ * Gets the retry policy. When a request fails due to a connection error,
and multiple server connections
+ * are available, Ignite will retry the request if the specified policy
allows it.
*
- * @return Retry limit.
+ * @return Retry policy.
*/
- int retryLimit();
+ @Nullable RetryPolicy retryPolicy();
/**
* Gets the socket connect timeout, in milliseconds.
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/RetryLimitPolicy.java
b/modules/client/src/main/java/org/apache/ignite/client/RetryLimitPolicy.java
new file mode 100644
index 000000000..6983ad401
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/client/RetryLimitPolicy.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.util.Objects;
+
+/**
+ * Retry policy that returns true when {@link RetryPolicyContext#iteration()}
is less than the specified {@link #retryLimit()}.
+ */
+public class RetryLimitPolicy implements RetryPolicy {
+ /** Default retry limit. */
+ public static final int DFLT_RETRY_LIMIT = 16;
+
+ /** Retry limit. */
+ private int retryLimit = DFLT_RETRY_LIMIT;
+
+ /**
+ * Gets the retry limit. 0 or less for no limit.
+ *
+ * @return Retry limit.
+ */
+ public int retryLimit() {
+ return retryLimit;
+ }
+
+ /**
+ * Sets the retry limit. 0 or less for no limit.
+ *
+ * @return this instance.
+ */
+ public RetryLimitPolicy retryLimit(int retryLimit) {
+ this.retryLimit = retryLimit;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean shouldRetry(RetryPolicyContext context) {
+ Objects.requireNonNull(context);
+
+ if (retryLimit <= 0) {
+ return true;
+ }
+
+ return context.iteration() < retryLimit;
+ }
+}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
b/modules/client/src/main/java/org/apache/ignite/client/RetryPolicy.java
similarity index 57%
copy from
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
copy to modules/client/src/main/java/org/apache/ignite/client/RetryPolicy.java
index cf1111d7c..21a5fcfff 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/RetryPolicy.java
@@ -15,26 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.client.handler.requests.table;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
-import org.apache.ignite.table.manager.IgniteTables;
+package org.apache.ignite.client;
/**
- * Client table drop request.
+ * Client retry policy determines whether client operations that have failed
due to a connection issue should be retried.
*/
-public class ClientTableDropRequest {
+public interface RetryPolicy {
/**
- * Processes the request.
+ * Gets a value indicating whether a client operation that has failed due
to a connection issue should be retried.
*
- * @param in Unpacker.
- * @param tables Ignite tables.
- * @return Future.
+ * @param context Context.
+ * @return {@code true} if the operation should be retried on another
connection, {@code false} otherwise.
*/
- public static CompletableFuture<Void> process(ClientMessageUnpacker in,
IgniteTables tables) {
- var tableName = in.unpackString();
-
- return tables.dropTableAsync(tableName);
- }
+ public boolean shouldRetry(RetryPolicyContext context);
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
b/modules/client/src/main/java/org/apache/ignite/client/RetryPolicyContext.java
similarity index 53%
rename from
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
rename to
modules/client/src/main/java/org/apache/ignite/client/RetryPolicyContext.java
index cf1111d7c..0b6c5ee02 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
+++
b/modules/client/src/main/java/org/apache/ignite/client/RetryPolicyContext.java
@@ -15,26 +15,37 @@
* limitations under the License.
*/
-package org.apache.ignite.client.handler.requests.table;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
-import org.apache.ignite.table.manager.IgniteTables;
+package org.apache.ignite.client;
/**
- * Client table drop request.
+ * Retry policy context. See {@link RetryPolicy#shouldRetry}.
*/
-public class ClientTableDropRequest {
+public interface RetryPolicyContext {
+ /**
+ * Gets the client configuration.
+ *
+ * @return Client configuration.
+ */
+ public IgniteClientConfiguration configuration();
+
+ /**
+ * Gets the operation type.
+ *
+ * @return Operation type.
+ */
+ public ClientOperationType operation();
+
/**
- * Processes the request.
+ * Gets the current iteration number (zero-based).
*
- * @param in Unpacker.
- * @param tables Ignite tables.
- * @return Future.
+ * @return Zero-based iteration counter.
*/
- public static CompletableFuture<Void> process(ClientMessageUnpacker in,
IgniteTables tables) {
- var tableName = in.unpackString();
+ public int iteration();
- return tables.dropTableAsync(tableName);
- }
+ /**
+ * Gets the connection exception that caused current retry iteration.
+ *
+ * @return Exception.
+ */
+ public IgniteClientConnectionException exception();
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
new file mode 100644
index 000000000..ea646ac8a
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+/**
+ * Retry policy that returns true for all read-only operations that do not
modify data.
+ */
+public class RetryReadPolicy extends RetryLimitPolicy {
+ /** {@inheritDoc} */
+ @Override
+ public boolean shouldRetry(RetryPolicyContext context) {
+ if (!super.shouldRetry(context)) {
+ return false;
+ }
+
+ switch (context.operation()) {
+ case TABLES_GET:
+ case TUPLE_CONTAINS_KEY:
+ case TUPLE_GET_ALL:
+ case TUPLE_GET:
+ case TABLE_GET:
+ return true;
+
+ case TUPLE_UPSERT:
+ case COMPUTE_EXECUTE:
+ case TUPLE_GET_AND_DELETE:
+ case TUPLE_DELETE_ALL_EXACT:
+ case TUPLE_DELETE_EXACT:
+ case TUPLE_DELETE_ALL:
+ case TUPLE_DELETE:
+ case TUPLE_GET_AND_REPLACE:
+ case TUPLE_REPLACE_EXACT:
+ case TUPLE_REPLACE:
+ case TUPLE_INSERT_ALL:
+ case TUPLE_INSERT:
+ case TUPLE_GET_AND_UPSERT:
+ case TUPLE_UPSERT_ALL:
+ return false;
+
+ default:
+ throw new UnsupportedOperationException("Unsupported operation
type: " + context.operation());
+ }
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index 562c9bab1..3e78b9c05 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.client;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import org.apache.ignite.client.ClientOperationType;
import org.apache.ignite.client.IgniteClientException;
+import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.lang.IgniteException;
/**
@@ -59,4 +61,124 @@ public class ClientUtils {
//TODO: IGNITE-14500 Replace with public exception with an error code
(or unwrap?).
return new IgniteClientException(e.getMessage(), e);
}
+
+ /**
+ * Converts internal op code to public {@link ClientOperationType}.
+ *
+ * @param opCode Op code.
+ * @return Operation type, or null for system operations.
+ */
+ public static ClientOperationType opCodeToClientOperationType(int opCode) {
+ if (opCode < 0) {
+ // No-op.
+ return null;
+ }
+
+ switch (opCode) {
+ case ClientOp.HEARTBEAT:
+ return null;
+
+ case ClientOp.TABLES_GET:
+ return ClientOperationType.TABLES_GET;
+
+ case ClientOp.TABLE_GET:
+ return ClientOperationType.TABLE_GET;
+
+ case ClientOp.SCHEMAS_GET:
+ return null;
+
+ case ClientOp.TUPLE_UPSERT:
+ return ClientOperationType.TUPLE_UPSERT;
+
+ case ClientOp.TUPLE_GET:
+ return ClientOperationType.TUPLE_GET;
+
+ case ClientOp.TUPLE_UPSERT_ALL:
+ return ClientOperationType.TUPLE_UPSERT_ALL;
+
+ case ClientOp.TUPLE_GET_ALL:
+ return ClientOperationType.TUPLE_GET_ALL;
+
+ case ClientOp.TUPLE_GET_AND_UPSERT:
+ return ClientOperationType.TUPLE_GET_AND_UPSERT;
+
+ case ClientOp.TUPLE_INSERT:
+ return ClientOperationType.TUPLE_INSERT;
+
+ case ClientOp.TUPLE_INSERT_ALL:
+ return ClientOperationType.TUPLE_INSERT_ALL;
+
+ case ClientOp.TUPLE_REPLACE:
+ return ClientOperationType.TUPLE_REPLACE;
+
+ case ClientOp.TUPLE_REPLACE_EXACT:
+ return ClientOperationType.TUPLE_REPLACE_EXACT;
+
+ case ClientOp.TUPLE_GET_AND_REPLACE:
+ return ClientOperationType.TUPLE_GET_AND_REPLACE;
+
+ case ClientOp.TUPLE_DELETE:
+ return ClientOperationType.TUPLE_DELETE;
+
+ case ClientOp.TUPLE_DELETE_ALL:
+ return ClientOperationType.TUPLE_DELETE_ALL;
+
+ case ClientOp.TUPLE_DELETE_EXACT:
+ return ClientOperationType.TUPLE_DELETE_EXACT;
+
+ case ClientOp.TUPLE_DELETE_ALL_EXACT:
+ return ClientOperationType.TUPLE_DELETE_ALL_EXACT;
+
+ case ClientOp.TUPLE_GET_AND_DELETE:
+ return ClientOperationType.TUPLE_GET_AND_DELETE;
+
+ case ClientOp.TUPLE_CONTAINS_KEY:
+ return ClientOperationType.TUPLE_CONTAINS_KEY;
+
+ case ClientOp.SQL_EXEC:
+ return null;
+
+ case ClientOp.SQL_NEXT:
+ return null;
+
+ case ClientOp.SQL_EXEC_BATCH:
+ return null;
+
+ case ClientOp.SQL_CURSOR_CLOSE:
+ return null;
+
+ case ClientOp.SQL_TABLE_META:
+ return null;
+
+ case ClientOp.SQL_COLUMN_META:
+ return null;
+
+ case ClientOp.SQL_SCHEMAS_META:
+ return null;
+
+ case ClientOp.SQL_PK_META:
+ return null;
+
+ case ClientOp.SQL_QUERY_META:
+ return null;
+
+ case ClientOp.TX_BEGIN:
+ case ClientOp.TX_COMMIT:
+ case ClientOp.TX_ROLLBACK:
+ return null; // Commit/rollback use owning connection and
bypass retry mechanism.
+
+ case ClientOp.SQL_EXEC_PS_BATCH:
+ return null;
+
+ case ClientOp.COMPUTE_EXECUTE:
+ return ClientOperationType.COMPUTE_EXECUTE;
+
+ case ClientOp.CLUSTER_GET_NODES:
+ return null;
+
+ // Do not return null from default arm intentionally, so we don't
forget to update this when new ClientOp values are added.
+ default:
+ throw new UnsupportedOperationException("Invalid op code: " +
opCode);
+ }
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
index 40f091936..ed3be61c3 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client;
import java.util.concurrent.Executor;
import org.apache.ignite.client.IgniteClientAddressFinder;
import org.apache.ignite.client.IgniteClientConfiguration;
+import org.apache.ignite.client.RetryPolicy;
import org.jetbrains.annotations.Nullable;
/**
@@ -32,9 +33,6 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
/** Addresses. */
private final String[] addresses;
- /** Retry limit. */
- private final int retryLimit;
-
/** Connect timeout, in milliseconds. */
private final long connectTimeout;
@@ -50,36 +48,39 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
/** Heartbeat interval. */
private final long heartbeatInterval;
+ /** Retry policy. */
+ private final RetryPolicy retryPolicy;
+
/**
* Constructor.
*
* @param addressFinder Address finder.
* @param addresses Addresses.
- * @param retryLimit Retry limit.
* @param connectTimeout Socket connect timeout.
* @param asyncContinuationExecutor Async continuation executor.
* @param heartbeatInterval Heartbeat message interval.
+ * @param retryPolicy Retry policy.
*/
public IgniteClientConfigurationImpl(
IgniteClientAddressFinder addressFinder,
String[] addresses,
- int retryLimit,
long connectTimeout,
long reconnectThrottlingPeriod,
int reconnectThrottlingRetries,
Executor asyncContinuationExecutor,
- long heartbeatInterval) {
+ long heartbeatInterval,
+ RetryPolicy retryPolicy) {
this.addressFinder = addressFinder;
//noinspection AssignmentOrReturnOfFieldWithMutableType (cloned in
Builder).
this.addresses = addresses;
- this.retryLimit = retryLimit;
this.connectTimeout = connectTimeout;
this.reconnectThrottlingPeriod = reconnectThrottlingPeriod;
this.reconnectThrottlingRetries = reconnectThrottlingRetries;
this.asyncContinuationExecutor = asyncContinuationExecutor;
this.heartbeatInterval = heartbeatInterval;
+ this.retryPolicy = retryPolicy;
}
/** {@inheritDoc} */
@@ -94,12 +95,6 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
return addresses == null ? null : addresses.clone();
}
- /** {@inheritDoc} */
- @Override
- public int retryLimit() {
- return retryLimit;
- }
-
/** {@inheritDoc} */
@Override
public long connectTimeout() {
@@ -129,4 +124,10 @@ public final class IgniteClientConfigurationImpl
implements IgniteClientConfigur
public long heartbeatInterval() {
return heartbeatInterval;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable RetryPolicy retryPolicy() {
+ return retryPolicy;
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 149d47960..a668560d7 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -28,19 +28,21 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientOperationType;
import org.apache.ignite.client.IgniteClientAuthenticationException;
import org.apache.ignite.client.IgniteClientConfiguration;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.client.IgniteClientException;
+import org.apache.ignite.client.RetryPolicy;
+import org.apache.ignite.client.RetryPolicyContext;
import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
import
org.apache.ignite.internal.client.io.netty.NettyClientConnectionMultiplexer;
@@ -48,10 +50,6 @@ import
org.apache.ignite.internal.client.io.netty.NettyClientConnectionMultiplex
* Communication channel with failover and partition awareness.
*/
public final class ReliableChannel implements AutoCloseable {
- /** Do nothing helper function. */
- private static final Consumer<Integer> DO_NOTHING = (v) -> {
- };
-
/** Channel factory. */
private final BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, ClientChannel> chFactory;
@@ -141,7 +139,7 @@ public final class ReliableChannel implements AutoCloseable
{
CompletableFuture<T> fut = new CompletableFuture<>();
// Use the only one attempt to avoid blocking async method.
- handleServiceAsync(fut, opCode, payloadWriter, payloadReader, 1, null);
+ handleServiceAsync(fut, opCode, payloadWriter, payloadReader, null, 0);
return fut;
}
@@ -162,14 +160,11 @@ public final class ReliableChannel implements
AutoCloseable {
int opCode,
PayloadWriter payloadWriter,
PayloadReader<T> payloadReader,
- int attemptsLimit,
- IgniteClientConnectionException failure) {
+ IgniteClientConnectionException failure,
+ int attempt) {
ClientChannel ch;
- // Workaround to store used attempts value within lambda body.
- var attemptsCnt = new int[1];
-
try {
- ch = applyOnDefaultChannel(channel -> channel, attemptsLimit, v ->
attemptsCnt[0] = v);
+ ch = getDefaultChannel();
} catch (Throwable ex) {
if (failure != null) {
failure.addSuppressed(ex);
@@ -193,9 +188,15 @@ public final class ReliableChannel implements
AutoCloseable {
return null;
}
+ while (err instanceof CompletionException &&
err.getCause() != null) {
+ err = err.getCause();
+ }
+
IgniteClientConnectionException failure0 = failure;
if (err instanceof IgniteClientConnectionException) {
+ var connectionErr = (IgniteClientConnectionException)
err;
+
try {
// Will try to reinit channels if topology changed.
onChannelFailure(ch);
@@ -206,20 +207,13 @@ public final class ReliableChannel implements
AutoCloseable {
}
if (failure0 == null) {
- failure0 = (IgniteClientConnectionException) err;
+ failure0 = connectionErr;
} else {
failure0.addSuppressed(err);
}
- int leftAttempts = attemptsLimit - attemptsCnt[0];
-
- // If it is a first retry then reset attempts (as for
initialization we use only 1 attempt).
- if (failure == null) {
- leftAttempts = getRetryLimit() - 1;
- }
-
- if (leftAttempts > 0) {
- handleServiceAsync(fut, opCode, payloadWriter,
payloadReader, leftAttempts, failure0);
+ if (shouldRetry(opCode, attempt, connectionErr)) {
+ handleServiceAsync(fut, opCode, payloadWriter,
payloadReader, failure0, attempt + 1);
return null;
}
@@ -469,25 +463,19 @@ public final class ReliableChannel implements
AutoCloseable {
}
// Apply no-op function. Establish default channel connection.
- applyOnDefaultChannel(channel -> null);
+ getDefaultChannel();
// TODO: Async startup IGNITE-15357.
return CompletableFuture.completedFuture(null);
}
- private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function) {
- return applyOnDefaultChannel(function, getRetryLimit(), DO_NOTHING);
- }
-
/**
* Apply specified {@code function} on any of available channel.
*/
- private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function,
- int attemptsLimit,
- Consumer<Integer> attemptsCallback) {
- Throwable failure = null;
+ private ClientChannel getDefaultChannel() {
+ IgniteClientConnectionException failure = null;
- for (int attempt = 0; attempt < attemptsLimit; attempt++) {
+ for (int attempt = 0; attempt < channels.size(); attempt++) {
ClientChannelHolder hld = null;
ClientChannel c = null;
@@ -507,11 +495,9 @@ public final class ReliableChannel implements
AutoCloseable {
c = hld.getOrCreateChannel();
if (c != null) {
- attemptsCallback.accept(attempt + 1);
-
- return function.apply(c);
+ return c;
}
- } catch (Throwable e) {
+ } catch (IgniteClientConnectionException e) {
if (failure == null) {
failure = e;
} else {
@@ -525,17 +511,23 @@ public final class ReliableChannel implements
AutoCloseable {
throw new IgniteClientConnectionException("Failed to connect",
failure);
}
- /** Get retry limit. */
- private int getRetryLimit() {
- List<ClientChannelHolder> holders = channels;
+ /** Determines whether specified operation should be retried. */
+ private boolean shouldRetry(int opCode, int iteration,
IgniteClientConnectionException exception) {
+ ClientOperationType opType =
ClientUtils.opCodeToClientOperationType(opCode);
+
+ if (opType == null) {
+ return true; // System operation.
+ }
+
+ RetryPolicy plc = clientCfg.retryPolicy();
- if (holders == null) {
- throw new IgniteClientException("Connections to nodes aren't
initialized.");
+ if (plc == null) {
+ return false;
}
- int size = holders.size();
+ RetryPolicyContext ctx = new RetryPolicyContextImpl(clientCfg, opType,
iteration, exception);
- return clientCfg.retryLimit() > 0 ? Math.min(clientCfg.retryLimit(),
size) : size;
+ return plc.shouldRetry(ctx);
}
/**
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/RetryPolicyContextImpl.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/RetryPolicyContextImpl.java
new file mode 100644
index 000000000..b1b519025
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/RetryPolicyContextImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import org.apache.ignite.client.ClientOperationType;
+import org.apache.ignite.client.IgniteClientConfiguration;
+import org.apache.ignite.client.IgniteClientConnectionException;
+import org.apache.ignite.client.RetryPolicyContext;
+
+/**
+ * Retry policy context.
+ */
+public class RetryPolicyContextImpl implements RetryPolicyContext {
+ /** Configuration. */
+ private final IgniteClientConfiguration configuration;
+
+ /** Operation type. */
+ private final ClientOperationType operation;
+
+ /** Iteration count. */
+ private final int iteration;
+
+ /** Exception that caused current iteration. */
+ private final IgniteClientConnectionException exception;
+
+ /**
+ * Constructor.
+ *
+ * @param configuration Configuration.
+ * @param operation Operation.
+ * @param iteration Iteration.
+ * @param exception Exception.
+ */
+ public RetryPolicyContextImpl(
+ IgniteClientConfiguration configuration,
+ ClientOperationType operation,
+ int iteration,
+ IgniteClientConnectionException exception) {
+ this.configuration = configuration;
+ this.operation = operation;
+ this.iteration = iteration;
+ this.exception = exception;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteClientConfiguration configuration() {
+ return configuration;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ClientOperationType operation() {
+ return operation;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int iteration() {
+ return iteration;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteClientConnectionException exception() {
+ return exception;
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
index 2ff27f3b9..2a76c3c17 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
@@ -85,10 +85,13 @@ public class NettyClientConnectionMultiplexer implements
ClientConnectionMultipl
ClientMessageHandler msgHnd,
ClientConnectionStateHandler stateHnd)
throws IgniteClientConnectionException {
+ try {
+ // TODO: Async startup IGNITE-15357.
+ ChannelFuture f = bootstrap.connect(addr).syncUninterruptibly();
- // TODO: Async startup IGNITE-15357.
- ChannelFuture f = bootstrap.connect(addr).syncUninterruptibly();
-
- return new NettyClientConnection(f.channel(), msgHnd, stateHnd);
+ return new NettyClientConnection(f.channel(), msgHnd, stateHnd);
+ } catch (Throwable t) {
+ throw new IgniteClientConnectionException(t.getMessage(), t);
+ }
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
index 143698518..8c5b95dce 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
@@ -86,7 +86,7 @@ public class ClientTables implements IgniteTables {
public CompletableFuture<Void> dropTableAsync(String name) {
Objects.requireNonNull(name);
- return ch.requestAsync(ClientOp.TABLE_DROP, w ->
w.out().packString(name));
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/ConnectionProperties.java
b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/ConnectionProperties.java
index 6149d7da9..058d0957b 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/ConnectionProperties.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/ConnectionProperties.java
@@ -82,21 +82,6 @@ public interface ConnectionProperties {
*/
void setQueryTimeout(Integer qryTimeout) throws SQLException;
- /**
- * Note: zero value means there is no limits.
- *
- * @return Retry limit.
- */
- Integer getRetryLimit();
-
- /**
- * Note: zero value means there is no limits.
- *
- * @param retryLimit Connection retry limit.
- * @throws SQLException On error.
- */
- void setRetryLimit(Integer retryLimit) throws SQLException;
-
/**
* Note: zero value means there is no limits.
*
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/ConnectionPropertiesImpl.java
b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/ConnectionPropertiesImpl.java
index 14df69e13..b39e1e652 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/ConnectionPropertiesImpl.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/ConnectionPropertiesImpl.java
@@ -68,13 +68,6 @@ public class ConnectionPropertiesImpl implements
ConnectionProperties, Serializa
+ " Zero means there is no limits.",
0L, false, 0, Integer.MAX_VALUE);
- /** JDBC retry limit. */
- private final IntegerProperty retryLimit = new
IntegerProperty("retryLimit",
- "Sets the retry limit. When a request fails due to a connection
error, and multiple server connections "
- + "are available, Ignite will retry the request on every
connection. When this property is greater than "
- + "zero, Ignite will limit the number of retries.",
- IgniteClientConfiguration.DFLT_RETRY_LIMIT, false, 0,
Integer.MAX_VALUE);
-
/** JDBC reconnect throttling period. */
private final LongProperty reconnectThrottlingPeriod = new
LongProperty("reconnectThrottlingPeriod",
"Sets the reconnect throttling period, in milliseconds. Zero means
there is no limits.",
@@ -162,18 +155,6 @@ public class ConnectionPropertiesImpl implements
ConnectionProperties, Serializa
qryTimeout.setValue(timeout);
}
- /** {@inheritDoc} */
- @Override
- public Integer getRetryLimit() {
- return retryLimit.value();
- }
-
- /** {@inheritDoc} */
- @Override
- public void setRetryLimit(Integer limit) throws SQLException {
- retryLimit.setValue(limit);
- }
-
/** {@inheritDoc} */
@Override
public Long getReconnectThrottlingPeriod() {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
index 7fcdfc142..634a23d80 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
@@ -145,7 +145,6 @@ public class JdbcConnection implements Connection {
netTimeout = connProps.getConnectionTimeout();
qryTimeout = connProps.getQueryTimeout();
- int retryLimit = connProps.getRetryLimit();
long reconnectThrottlingPeriod =
connProps.getReconnectThrottlingPeriod();
int reconnectThrottlingRetries =
connProps.getReconnectThrottlingRetries();
@@ -154,7 +153,6 @@ public class JdbcConnection implements Connection {
.builder()
.addresses(addrs)
.connectTimeout(netTimeout)
- .retryLimit(retryLimit)
.reconnectThrottlingPeriod(reconnectThrottlingPeriod)
.reconnectThrottlingRetries(reconnectThrottlingRetries)
.build());
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index d796c798d..39ee09e6b 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -21,11 +21,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import io.netty.util.ResourceLeakDetector;
-import java.net.InetSocketAddress;
-import java.util.Objects;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.fakes.FakeIgnite;
-import org.apache.ignite.client.handler.ClientHandlerModule;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -56,7 +53,7 @@ public abstract class AbstractClientTest {
testServer = startServer(10800, 10, 0, server);
- serverPort = getPort(testServer.module());
+ serverPort = testServer.port();
client = startClient();
}
@@ -139,8 +136,4 @@ public abstract class AbstractClientTest {
assertEquals((Object) x.value(i), y.value(i));
}
}
-
- public static int getPort(ClientHandlerModule hnd) {
- return ((InetSocketAddress)
Objects.requireNonNull(hnd.localAddress())).getPort();
- }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientTablesTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientTablesTest.java
index 60c6c6b45..308466394 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientTablesTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientTablesTest.java
@@ -101,18 +101,4 @@ public class ClientTablesTest extends AbstractClientTest {
var serverTables = server.tables().tables();
assertEquals(1, serverTables.size());
}
-
- @Test
- public void testDropTable() {
- server.tables().createTable("t", t -> t.changeReplicas(0));
-
- client.tables().dropTable("t");
-
- assertEquals(0, server.tables().tables().size());
- }
-
- @Test
- public void testDropTableInvalidName() {
- client.tables().dropTable("foo");
- }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
index 4dd488ef6..ab734ae39 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
@@ -81,7 +81,6 @@ public class ConfigurationTest extends AbstractClientTest {
IgniteClient client = builder
.addresses(addr)
.connectTimeout(1234)
- .retryLimit(7)
.reconnectThrottlingPeriod(123)
.reconnectThrottlingRetries(8)
.addressFinder(() -> new String[]{addr})
@@ -90,7 +89,6 @@ public class ConfigurationTest extends AbstractClientTest {
// Builder can be reused and it won't affect already created clients.
IgniteClient client2 = builder
.connectTimeout(2345)
- .retryLimit(8)
.reconnectThrottlingPeriod(1234)
.reconnectThrottlingRetries(88)
.build();
@@ -102,7 +100,6 @@ public class ConfigurationTest extends AbstractClientTest {
// Check config values.
assertEquals("thin-client", client.name());
assertEquals(1234, client.configuration().connectTimeout());
- assertEquals(7, client.configuration().retryLimit());
assertEquals(123,
client.configuration().reconnectThrottlingPeriod());
assertEquals(8,
client.configuration().reconnectThrottlingRetries());
assertArrayEquals(new String[]{addr},
client.configuration().addresses());
@@ -115,7 +112,6 @@ public class ConfigurationTest extends AbstractClientTest {
// Check config values.
assertEquals(2345, client2.configuration().connectTimeout());
- assertEquals(8, client2.configuration().retryLimit());
assertEquals(1234,
client2.configuration().reconnectThrottlingPeriod());
assertEquals(88,
client2.configuration().reconnectThrottlingRetries());
assertArrayEquals(new String[]{addr},
client.configuration().addresses());
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
index c8682c404..d08501b48 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client;
-import static org.apache.ignite.client.AbstractClientTest.getPort;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -32,11 +31,11 @@ public class HeartbeatTest {
@Test
public void testHeartbeatLongerThanIdleTimeoutCausesDisconnect() throws
Exception {
try (var srv = new TestServer(10800, 10, 50, new FakeIgnite())) {
- int srvPort = getPort(srv.module());
+ int srvPort = srv.port();
Builder builder = IgniteClient.builder()
.addresses("127.0.0.1:" + srvPort)
- .retryLimit(0);
+ .retryPolicy(null);
try (var client = builder.build()) {
Thread.sleep(300);
@@ -49,12 +48,11 @@ public class HeartbeatTest {
@Test
public void testHeartbeatShorterThanIdleTimeoutKeepsConnectionAlive()
throws Exception {
try (var srv = new TestServer(10800, 10, 300, new FakeIgnite())) {
- int srvPort = getPort(srv.module());
+ int srvPort = srv.port();
Builder builder = IgniteClient.builder()
.addresses("127.0.0.1:" + srvPort)
- .heartbeatInterval(50)
- .retryLimit(0);
+ .heartbeatInterval(50);
try (var client = builder.build()) {
Thread.sleep(900);
@@ -69,14 +67,10 @@ public class HeartbeatTest {
try (var srv = new TestServer(10800, 10, 300, new FakeIgnite())) {
Builder builder = IgniteClient.builder()
- .addresses("127.0.0.1:" + getPort(srv.module()))
+ .addresses("127.0.0.1:" + srv.port())
.heartbeatInterval(-50);
- Throwable ex = assertThrows(IgniteClientException.class,
builder::build);
-
- while (ex.getCause() != null) {
- ex = ex.getCause();
- }
+ Throwable ex = assertThrows(IllegalArgumentException.class,
builder::build);
assertEquals("Negative delay.", ex.getMessage());
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
index f1f27cd1a..968506ca2 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
@@ -53,7 +53,7 @@ public class ReconnectTest {
var client = IgniteClient.builder()
.addresses("127.0.0.1:10900..10910", "127.0.0.1:10950..10960")
- .retryLimit(100)
+ .retryPolicy(new RetryLimitPolicy().retryLimit(1))
.build();
assertEquals("t", client.tables().tables().get(0).name());
@@ -86,7 +86,6 @@ public class ReconnectTest {
var client = IgniteClient.builder()
.addresses("127.0.0.1:10900..10910", "127.0.0.1:10950..10960")
- .retryLimit(100)
.build();
assertEquals("t", client.tables().tables().get(0).name());
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
new file mode 100644
index 000000000..9842983a0
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.ArrayList;
+import java.util.function.Function;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.client.fakes.FakeIgniteTables;
+import org.apache.ignite.internal.client.ClientUtils;
+import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests thin client retry behavior.
+ */
+public class RetryPolicyTest {
+ private static final int ITER = 100;
+
+ private TestServer server;
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(server);
+ }
+
+ @Test
+ public void testNoRetryPolicySecondRequestFails() throws Exception {
+ initServer(reqId -> reqId % 3 == 0);
+
+ try (var client = getClient(null)) {
+ assertEquals("t", client.tables().tables().get(0).name());
+ assertThrows(IgniteClientException.class, () ->
client.tables().tables().get(0).name());
+ }
+ }
+
+ @Test
+ public void testRetryPolicyCompletesOperationWithoutException() throws
Exception {
+ // Every 3 network message fails, including handshake.
+ initServer(reqId -> reqId % 4 == 0);
+
+ var plc = new TestRetryPolicy();
+ plc.retryLimit(1);
+
+ try (var client = getClient(plc)) {
+ for (int i = 0; i < ITER; i++) {
+ assertEquals("t", client.tables().tables().get(0).name());
+ }
+
+ assertEquals(ITER / 2 - 1, plc.invocations.size());
+ }
+ }
+
+ @Test
+ public void testRetryPolicyDoesNotRetryUnrelatedErrors() throws Exception {
+ initServer(reqId -> reqId % 33 == 0);
+ var plc = new TestRetryPolicy();
+
+ try (var client = getClient(plc)) {
+ assertThrows(IgniteClientException.class, () ->
client.tables().table(FakeIgniteTables.BAD_TABLE));
+ assertEquals(0, plc.invocations.size());
+ }
+ }
+
+ @Test
+ public void testRetryPolicyDoesNotRetryTxCommit() throws Exception {
+ initServer(reqId -> reqId % 3 == 0);
+ var plc = new TestRetryPolicy();
+
+ try (var client = getClient(plc)) {
+ Transaction tx = client.transactions().begin();
+
+ assertThrows(IgniteClientConnectionException.class, tx::commit);
+ assertEquals(0, plc.invocations.size());
+ }
+ }
+
+ @Test
+ public void testRetryLimitPolicyThrowsOnLimitExceeded() throws Exception {
+ initServer(reqId -> reqId % 2 == 0);
+ var plc = new TestRetryPolicy();
+ plc.retryLimit(5);
+
+ try (var client = getClient(plc)) {
+ assertThrows(IgniteClientException.class, () ->
client.tables().tables());
+ }
+
+ assertEquals(6, plc.invocations.size());
+ assertEquals(5, plc.invocations.get(5).iteration());
+ }
+
+ @Test
+ public void testCustomRetryPolicyIsInvokedWithCorrectContext() throws
Exception {
+ initServer(reqId -> reqId % 2 == 0);
+ var plc = new TestRetryPolicy();
+ plc.retryLimit(2);
+
+ try (var client = getClient(plc)) {
+ assertThrows(IgniteClientException.class, () ->
client.tables().tables());
+ }
+
+ assertEquals(3, plc.invocations.size());
+
+ RetryPolicyContext ctx = plc.invocations.get(1);
+
+ assertEquals(1, ctx.iteration());
+ assertEquals(ClientOperationType.TABLES_GET, ctx.operation());
+ assertSame(plc, ctx.configuration().retryPolicy());
+ assertEquals("Channel is closed", ctx.exception().getMessage());
+ }
+
+ @Test
+ public void testTableOperationWithoutTxIsRetried() throws Exception {
+ initServer(reqId -> reqId % 4 == 0);
+ var plc = new TestRetryPolicy();
+
+ try (var client = getClient(plc)) {
+ RecordView<Tuple> recView =
client.tables().table("t").recordView();
+ recView.get(null, Tuple.create().set("id", 1));
+ recView.get(null, Tuple.create().set("id", 1));
+
+ assertEquals(1, plc.invocations.size());
+ }
+ }
+
+ @Test
+ public void testTableOperationWithTxIsNotRetried() throws Exception {
+ initServer(reqId -> reqId % 4 == 0);
+ var plc = new TestRetryPolicy();
+
+ try (var client = getClient(plc)) {
+ RecordView<Tuple> recView =
client.tables().table("t").recordView();
+ Transaction tx = client.transactions().begin();
+
+ var ex = assertThrows(IgniteClientException.class, () ->
recView.get(tx, Tuple.create().set("id", 1)));
+ assertEquals("Transaction context has been lost due to connection
errors.", ex.getMessage());
+
+ assertEquals(0, plc.invocations.size());
+ }
+ }
+
+ @Test
+ public void testRetryReadPolicyRetriesReadOperations() throws Exception {
+ initServer(reqId -> reqId % 3 == 0);
+
+ try (var client = getClient(new RetryReadPolicy())) {
+ RecordView<Tuple> recView =
client.tables().table("t").recordView();
+ recView.get(null, Tuple.create().set("id", 1));
+ recView.get(null, Tuple.create().set("id", 1));
+ }
+ }
+
+ @Test
+ public void testRetryReadPolicyDoesNotRetryWriteOperations() throws
Exception {
+ initServer(reqId -> reqId % 5 == 0);
+
+ try (var client = getClient(new RetryReadPolicy())) {
+ RecordView<Tuple> recView =
client.tables().table("t").recordView();
+ recView.upsert(null, Tuple.create().set("id", 1));
+ assertThrows(IgniteClientConnectionException.class, () ->
recView.upsert(null, Tuple.create().set("id", 1)));
+ }
+ }
+
+ @Test
+ public void testRetryPolicyConvertOpAllOperationsSupported() throws
IllegalAccessException {
+ var nullOpFields = new ArrayList<String>();
+
+ for (var field : ClientOp.class.getDeclaredFields()) {
+ var opCode = (int) field.get(null);
+ var publicOp = ClientUtils.opCodeToClientOperationType(opCode);
+
+ if (publicOp == null) {
+ nullOpFields.add(field.getName());
+ }
+ }
+
+ long expectedNullCount = 16;
+
+ String msg = nullOpFields.size()
+ + " operation codes do not have public equivalent. When adding
new codes, update ClientOperationType too. Missing ops: "
+ + String.join(", ", nullOpFields);
+
+ assertEquals(expectedNullCount, nullOpFields.size(), msg);
+ }
+
+ @Test
+ public void testDefaultRetryPolicyIsRetryReadPolicyWithLimit() throws
Exception {
+ initServer(reqId -> false);
+
+ try (var client = IgniteClient.builder().addresses("127.0.0.1:" +
server.port()).build()) {
+ var plc = client.configuration().retryPolicy();
+
+ var readPlc = assertInstanceOf(RetryReadPolicy.class, plc);
+ assertEquals(RetryReadPolicy.DFLT_RETRY_LIMIT,
readPlc.retryLimit());
+ }
+ }
+
+ private IgniteClient getClient(RetryPolicy retryPolicy) {
+ return IgniteClient.builder()
+ .addresses("127.0.0.1:" + server.port())
+ .retryPolicy(retryPolicy)
+ .reconnectThrottlingPeriod(0)
+ .build();
+ }
+
+ private void initServer(Function<Integer, Boolean> integerBooleanFunction)
{
+ FakeIgnite ign = new FakeIgnite();
+ ign.tables().createTable("t", c -> c.changeName("t"));
+
+ server = new TestServer(10900, 10, 0, ign, integerBooleanFunction);
+ }
+}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
new file mode 100644
index 000000000..79e85c8f9
--- /dev/null
+++
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static org.mockito.Mockito.mock;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.util.ReferenceCounted;
+import java.net.BindException;
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.handler.ClientInboundMessageHandler;
+import org.apache.ignite.compute.IgniteCompute;
+import
org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
+import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
+import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NettyBootstrapFactory;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client handler module for tests.
+ */
+public class TestClientHandlerModule implements IgniteComponent {
+ /** Configuration registry. */
+ private final ConfigurationRegistry registry;
+
+ /** Ignite. */
+ private final Ignite ignite;
+
+ /** Connection drop condition. */
+ private final Function<Integer, Boolean> shouldDropConnection;
+
+ /** Netty channel. */
+ private volatile Channel channel;
+
+ /** Netty bootstrap factory. */
+ private final NettyBootstrapFactory bootstrapFactory;
+
+ /**
+ * Constructor.
+ *
+ * @param ignite Ignite.
+ * @param registry Configuration registry.
+ * @param bootstrapFactory Bootstrap factory.
+ * @param shouldDropConnection Connection drop condition.
+ */
+ public TestClientHandlerModule(
+ Ignite ignite,
+ ConfigurationRegistry registry,
+ NettyBootstrapFactory bootstrapFactory,
+ Function<Integer, Boolean> shouldDropConnection) {
+ assert ignite != null;
+ assert registry != null;
+ assert bootstrapFactory != null;
+
+ this.ignite = ignite;
+ this.registry = registry;
+ this.bootstrapFactory = bootstrapFactory;
+ this.shouldDropConnection = shouldDropConnection;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ if (channel != null) {
+ throw new IgniteException("ClientHandlerModule is already
started.");
+ }
+
+ try {
+ channel = startEndpoint().channel();
+ } catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ if (channel != null) {
+ channel.close().await();
+
+ channel = null;
+ }
+ }
+
+ /**
+ * Returns the local address where this handler is bound to.
+ *
+ * @return the local address of this module, or {@code null} if this
module is not started.
+ */
+ @Nullable
+ public SocketAddress localAddress() {
+ return channel == null ? null : channel.localAddress();
+ }
+
+ /**
+ * Starts the endpoint.
+ *
+ * @return Channel future.
+ * @throws InterruptedException If thread has been interrupted during the
start.
+ * @throws IgniteException When startup has failed.
+ */
+ private ChannelFuture startEndpoint() throws InterruptedException {
+ var configuration =
registry.getConfiguration(ClientConnectorConfiguration.KEY).value();
+
+ int desiredPort = configuration.port();
+ int portRange = configuration.portRange();
+
+ Channel ch = null;
+
+ var requestCounter = new AtomicInteger();
+
+ ServerBootstrap bootstrap = bootstrapFactory.createServerBootstrap();
+
+ bootstrap.childHandler(new ChannelInitializer<>() {
+ @Override
+ protected void initChannel(Channel ch) {
+ ch.pipeline().addLast(
+ new ClientMessageDecoder(),
+ new ConnectionDropHandler(requestCounter,
shouldDropConnection),
+ new ClientInboundMessageHandler(
+ ignite.tables(),
+ ignite.transactions(),
+ mock(QueryProcessor.class),
+ configuration,
+ mock(IgniteCompute.class),
+ mock(ClusterService.class)));
+ }
+ })
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
configuration.connectTimeout());
+
+ for (int portCandidate = desiredPort; portCandidate <= desiredPort +
portRange; portCandidate++) {
+ ChannelFuture bindRes = bootstrap.bind(portCandidate).await();
+
+ if (bindRes.isSuccess()) {
+ ch = bindRes.channel();
+
+ break;
+ } else if (!(bindRes.cause() instanceof BindException)) {
+ throw new IgniteException(bindRes.cause());
+ }
+ }
+
+ if (ch == null) {
+ String msg = "Cannot start thin client connector endpoint. "
+ + "All ports in range [" + desiredPort + ", " +
(desiredPort + portRange) + "] are in use.";
+
+ throw new IgniteException(msg);
+ }
+
+ return ch.closeFuture();
+ }
+
+ private static class ConnectionDropHandler extends
ChannelInboundHandlerAdapter {
+ /** Counter. */
+ private final AtomicInteger cnt;
+
+ /** Connection drop condition. */
+ private final Function<Integer, Boolean> shouldDropConnection;
+
+ /**
+ * Constructor.
+ *
+ * @param cnt Request counter.
+ * @param shouldDropConnection Connection drop condition.
+ */
+ private ConnectionDropHandler(AtomicInteger cnt, Function<Integer,
Boolean> shouldDropConnection) {
+ this.cnt = cnt;
+ this.shouldDropConnection = shouldDropConnection;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ if (shouldDropConnection.apply(cnt.incrementAndGet())) {
+ ((ReferenceCounted) msg).release();
+
+ ctx.close();
+ } else {
+ super.channelRead(ctx, msg);
+ }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryAllPolicy.cs
b/modules/client/src/test/java/org/apache/ignite/client/TestRetryPolicy.java
similarity index 53%
rename from modules/platforms/dotnet/Apache.Ignite/RetryAllPolicy.cs
rename to
modules/client/src/test/java/org/apache/ignite/client/TestRetryPolicy.java
index 88416f104..288b3b9a4 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryAllPolicy.cs
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestRetryPolicy.java
@@ -1,10 +1,10 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -15,16 +15,23 @@
* limitations under the License.
*/
-namespace Apache.Ignite
-{
- /// <summary>
- /// Retry policy that always returns <c>true</c>.
- /// </summary>
- public sealed class RetryAllPolicy : RetryLimitPolicy
- {
- /// <summary>
- /// Singleton instance.
- /// </summary>
- public static readonly RetryAllPolicy Instance = new();
+package org.apache.ignite.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test retry policy.
+ */
+public class TestRetryPolicy extends RetryLimitPolicy {
+ /** Policy invocations. */
+ public final List<RetryPolicyContext> invocations = new ArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean shouldRetry(RetryPolicyContext context) {
+ invocations.add(context);
+
+ return super.shouldRetry(context);
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index cbf84d178..7fa63f240 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -20,8 +20,12 @@ package org.apache.ignite.client;
import static
org.apache.ignite.configuration.annotation.ConfigurationType.LOCAL;
import static org.mockito.Mockito.mock;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.handler.ClientHandlerModule;
@@ -30,6 +34,7 @@ import
org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorCo
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
@@ -39,7 +44,7 @@ import org.apache.ignite.network.NettyBootstrapFactory;
public class TestServer implements AutoCloseable {
private final ConfigurationRegistry cfg;
- private final ClientHandlerModule module;
+ private final IgniteComponent module;
private final NettyBootstrapFactory bootstrapFactory;
@@ -56,6 +61,24 @@ public class TestServer implements AutoCloseable {
int portRange,
long idleTimeout,
Ignite ignite
+ ) {
+ this(port, portRange, idleTimeout, ignite, null);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param port Port.
+ * @param portRange Port range.
+ * @param idleTimeout Idle timeout.
+ * @param ignite Ignite.
+ */
+ public TestServer(
+ int port,
+ int portRange,
+ long idleTimeout,
+ Ignite ignite,
+ Function<Integer, Boolean> shouldDropConnection
) {
cfg = new ConfigurationRegistry(
List.of(ClientConnectorConfiguration.KEY,
NetworkConfiguration.KEY),
@@ -75,31 +98,35 @@ public class TestServer implements AutoCloseable {
bootstrapFactory.start();
- module = new ClientHandlerModule(
- ((FakeIgnite) ignite).queryEngine(),
- ignite.tables(),
- ignite.transactions(),
- cfg,
- mock(IgniteCompute.class),
- mock(ClusterService.class),
- bootstrapFactory
- );
+ module = shouldDropConnection != null
+ ? new TestClientHandlerModule(ignite, cfg, bootstrapFactory,
shouldDropConnection)
+ : new ClientHandlerModule(
+ ((FakeIgnite) ignite).queryEngine(),
+ ignite.tables(),
+ ignite.transactions(),
+ cfg,
+ mock(IgniteCompute.class),
+ mock(ClusterService.class),
+ bootstrapFactory
+ );
module.start();
}
- public ConfigurationRegistry configurationRegistry() {
- return cfg;
- }
-
- public ClientHandlerModule module() {
- return module;
- }
+ /**
+ * Gets the port where this instance is listening.
+ *
+ * @return TCP port.
+ */
+ public int port() {
+ SocketAddress addr = module instanceof ClientHandlerModule
+ ? ((ClientHandlerModule) module).localAddress()
+ : ((TestClientHandlerModule) module).localAddress();
- public NettyBootstrapFactory bootstrapFactory() {
- return bootstrapFactory;
+ return ((InetSocketAddress) Objects.requireNonNull(addr)).getPort();
}
+ /** {@inheritDoc} */
@Override
public void close() throws Exception {
module.stop();
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 9183f5d04..ea565571e 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -27,6 +27,7 @@ import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
/**
* Fake Ignite.
@@ -67,7 +68,27 @@ public class FakeIgnite implements Ignite {
@Override
public CompletableFuture<Transaction> beginAsync() {
- throw new UnsupportedOperationException();
+ return CompletableFuture.completedFuture(new Transaction() {
+ @Override
+ public void commit() throws TransactionException {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> commitAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void rollback() throws TransactionException {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> rollbackAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
}
};
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index c2a7be4ce..a684ec22d 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -47,6 +47,10 @@ public class FakeIgniteTables implements IgniteTables,
IgniteTablesInternal {
public static final String TABLE_WITH_DEFAULT_VALUES = "default-columns";
+ public static final String BAD_TABLE = "bad-table";
+
+ public static final String BAD_TABLE_ERR = "Err!";
+
private final ConcurrentHashMap<String, TableImpl> tables = new
ConcurrentHashMap<>();
private final ConcurrentHashMap<UUID, TableImpl> tablesById = new
ConcurrentHashMap<>();
@@ -117,6 +121,10 @@ public class FakeIgniteTables implements IgniteTables,
IgniteTablesInternal {
/** {@inheritDoc} */
@Override
public Table table(String name) {
+ if (BAD_TABLE.equals(name)) {
+ throw new RuntimeException(BAD_TABLE_ERR);
+ }
+
return tableImpl(name);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs
index 22a5c64dc..4bde29a46 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs
@@ -59,8 +59,13 @@ namespace Apache.Ignite.Tests
[Test]
public async Task
TestFakeServerDropsConnectionOnSpecifiedRequestCount()
{
+ var cfg = new IgniteClientConfiguration
+ {
+ RetryPolicy = RetryNonePolicy.Instance
+ };
+
using var server = new FakeServer(reqId => reqId % 3 == 0);
- using var client = await server.ConnectClientAsync();
+ using var client = await server.ConnectClientAsync(cfg);
// 2 requests succeed, 3rd fails.
await client.Tables.GetTablesAsync();
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
index d8c35d32c..bebb31489 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
@@ -34,7 +34,7 @@ namespace Apache.Ignite.Tests
{
var cfg = new IgniteClientConfiguration
{
- RetryPolicy = new RetryAllPolicy { RetryLimit = 1 }
+ RetryPolicy = new RetryLimitPolicy { RetryLimit = 1 }
};
using var server = new FakeServer(reqId => reqId % 2 == 0);
@@ -49,7 +49,7 @@ namespace Apache.Ignite.Tests
[Test]
public async Task
TestFailoverWithRetryPolicyDoesNotRetryUnrelatedErrors()
{
- var cfg = new IgniteClientConfiguration { RetryPolicy =
RetryAllPolicy.Instance };
+ var cfg = new IgniteClientConfiguration { RetryPolicy = new
RetryLimitPolicy() };
using var server = new FakeServer(reqId => reqId % 2 == 0);
using var client = await server.ConnectClientAsync(cfg);
@@ -58,6 +58,21 @@ namespace Apache.Ignite.Tests
Assert.AreEqual(FakeServer.Err, ex!.Message);
}
+ [Test]
+ public async Task TestFailoverWithRetryPolicyDoesNotRetryTxCommit()
+ {
+ var testRetryPolicy = new TestRetryPolicy();
+ var cfg = new IgniteClientConfiguration { RetryPolicy =
testRetryPolicy };
+
+ using var server = new FakeServer(reqId => reqId % 2 == 0);
+ using var client = await server.ConnectClientAsync(cfg);
+
+ var tx = await client.Transactions.BeginAsync();
+
+ Assert.ThrowsAsync<IgniteClientException>(async () => await
tx.CommitAsync());
+ Assert.IsEmpty(testRetryPolicy.Invocations);
+ }
+
[Test]
public async Task
TestFailoverWithRetryPolicyThrowsOnRetryLimitExceeded()
{
@@ -75,15 +90,30 @@ namespace Apache.Ignite.Tests
Assert.AreEqual("Operation failed after 5 retries, examine
InnerException for details.", ex!.Message);
}
+ [Test]
+ public async Task
TestFailoverWithRetryPolicyThrowsOnDefaultRetryLimitExceeded()
+ {
+ using var server = new FakeServer(reqId => reqId > 1);
+ using var client = await server.ConnectClientAsync();
+
+ await client.Tables.GetTablesAsync();
+
+ var ex = Assert.ThrowsAsync<IgniteClientException>(async () =>
await client.Tables.GetTablesAsync());
+ Assert.AreEqual("Operation failed after 16 retries, examine
InnerException for details.", ex!.Message);
+ }
+
[Test]
public async Task TestZeroRetryLimitDoesNotLimitRetryCount()
{
var cfg = new IgniteClientConfiguration
{
- RetryPolicy = new RetryAllPolicy { RetryLimit = 0 }
+ RetryPolicy = new RetryLimitPolicy
+ {
+ RetryLimit = 0
+ }
};
- using var server = new FakeServer(reqId => reqId % 10 != 0);
+ using var server = new FakeServer(reqId => reqId % 30 != 0);
using var client = await server.ConnectClientAsync(cfg);
for (var i = 0; i < IterCount; i++)
@@ -103,6 +133,22 @@ namespace Apache.Ignite.Tests
Assert.ThrowsAsync<IgniteClientException>(async () => await
client.Tables.GetTablesAsync());
}
+ [Test]
+ public async Task TestNullRetryPolicyIsSameAsNoRetry()
+ {
+ var cfg = new IgniteClientConfiguration
+ {
+ RetryPolicy = null!
+ };
+
+ using var server = new FakeServer(reqId => reqId > 1);
+ using var client = await server.ConnectClientAsync(cfg);
+
+ await client.Tables.GetTablesAsync();
+
+ Assert.ThrowsAsync<IgniteClientException>(async () => await
client.Tables.GetTablesAsync());
+ }
+
[Test]
public async Task TestCustomRetryPolicyIsInvokedWithCorrectContext()
{
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/RetryReadPolicyTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryReadPolicyTests.cs
index 19e165659..039ecb3e0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/RetryReadPolicyTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryReadPolicyTests.cs
@@ -35,7 +35,7 @@ namespace Apache.Ignite.Tests
{
var ctx = new RetryPolicyContext(new(), opType, 1, new());
- Assert.DoesNotThrow(() =>
RetryReadPolicy.Instance.ShouldRetry(ctx), opType.ToString());
+ Assert.DoesNotThrow(() => new
RetryReadPolicy().ShouldRetry(ctx), opType.ToString());
}
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
b/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
index 6f31ed86d..0fac07b94 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
@@ -109,12 +109,12 @@ namespace Apache.Ignite
/// Gets or sets the retry policy. When a request fails due to a
connection error,
/// Ignite will retry the request if the specified policy allows it.
/// <para />
- /// Default is <see cref="RetryNonePolicy"/> - does not retry anything.
+ /// Default is <see cref="RetryReadPolicy"/> - retry read operations
up to <see cref="RetryLimitPolicy.DefaultRetryLimit"/> times.
/// <para />
- /// See also <see cref="RetryAllPolicy"/>, <see
cref="RetryReadPolicy"/>, <see cref="RetryNonePolicy"/>,
+ /// See also <see cref="RetryLimitPolicy"/>, <see
cref="RetryReadPolicy"/>, <see cref="RetryNonePolicy"/>,
/// <see cref="RetryLimitPolicy.RetryLimit"/>.
/// </summary>
- public IRetryPolicy RetryPolicy { get; set; } =
RetryNonePolicy.Instance;
+ public IRetryPolicy RetryPolicy { get; set; } = new RetryReadPolicy();
/// <summary>
/// Gets or sets the heartbeat message interval.
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index e0329fe4b..fa5d2d16a 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -298,7 +298,7 @@ namespace Apache.Ignite.Internal
return false;
}
- if (Configuration.RetryPolicy is RetryNonePolicy)
+ if (Configuration.RetryPolicy is null or RetryNonePolicy)
{
return false;
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryLimitPolicy.cs
b/modules/platforms/dotnet/Apache.Ignite/RetryLimitPolicy.cs
index d987228e9..ddf17ed05 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryLimitPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryLimitPolicy.cs
@@ -21,14 +21,19 @@ namespace Apache.Ignite
/// <summary>
/// Retry policy that returns <c>true</c> when <see
cref="IRetryPolicyContext.Iteration"/> is less than
- /// the specified <see cref="RetryLimit"/>.
+ /// the specified <see cref="RetryLimit"/>, or the limit is zero or less.
/// </summary>
public class RetryLimitPolicy : IRetryPolicy
{
/// <summary>
- /// Gets or sets the retry limit. 0 or less for no limit.
+ /// Default retry limit.
/// </summary>
- public int RetryLimit { get; set; }
+ public const int DefaultRetryLimit = 16;
+
+ /// <summary>
+ /// Gets or sets the retry limit. 0 or less for no limit. Default is
<see cref="DefaultRetryLimit"/>.
+ /// </summary>
+ public int RetryLimit { get; set; } = DefaultRetryLimit;
/// <inheritdoc />
public virtual bool ShouldRetry(IRetryPolicyContext context)
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
index fcf3fa594..f1d98448f 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -25,11 +25,6 @@ namespace Apache.Ignite
/// </summary>
public sealed class RetryReadPolicy : RetryLimitPolicy
{
- /// <summary>
- /// Singleton instance.
- /// </summary>
- public static readonly RetryReadPolicy Instance = new();
-
/// <inheritdoc />
public override bool ShouldRetry(IRetryPolicyContext context)
{