This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 128d3f39f7c IGNITE-27831 Java thin: Refactor ReliableChannel - Fixes
#12733.
128d3f39f7c is described below
commit 128d3f39f7cc3048965a81674833725cf4b3fb51
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Feb 13 17:10:17 2026 +0300
IGNITE-27831 Java thin: Refactor ReliableChannel - Fixes #12733.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../internal/client/thin/ClientAtomicLongImpl.java | 4 +-
.../thin/ClientCacheEntryListenerHandler.java | 4 +-
.../client/thin/ClientClusterGroupImpl.java | 6 +-
.../internal/client/thin/ClientClusterImpl.java | 2 +-
.../internal/client/thin/ClientComputeImpl.java | 4 +-
.../client/thin/ClientFieldsQueryPager.java | 4 +-
.../internal/client/thin/ClientIgniteSetImpl.java | 4 +-
.../internal/client/thin/ClientQueryPager.java | 4 +-
.../internal/client/thin/ClientServicesImpl.java | 4 +-
.../internal/client/thin/GenericQueryPager.java | 6 +-
.../internal/client/thin/ReliableChannelEx.java | 145 +++++++++++++++++++++
...liableChannel.java => ReliableChannelImpl.java} | 93 +++----------
.../internal/client/thin/TcpClientCache.java | 6 +-
.../client/thin/TcpClientTransactions.java | 4 +-
.../internal/client/thin/TcpIgniteClient.java | 6 +-
.../thin/ReliableChannelDuplicationTest.java | 4 +-
.../internal/client/thin/ReliableChannelTest.java | 36 ++---
.../ThinClientAbstractPartitionAwarenessTest.java | 4 +-
18 files changed, 214 insertions(+), 126 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientAtomicLongImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientAtomicLongImpl.java
index 50b76a9b4ac..f7f6f521b26 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientAtomicLongImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientAtomicLongImpl.java
@@ -35,7 +35,7 @@ public class ClientAtomicLongImpl implements ClientAtomicLong
{
private final String groupName;
/** */
- private final ReliableChannel ch;
+ private final ReliableChannelEx ch;
/** Cache id. */
private final int cacheId;
@@ -47,7 +47,7 @@ public class ClientAtomicLongImpl implements ClientAtomicLong
{
* @param groupName Cache group name.
* @param ch Channel.
*/
- public ClientAtomicLongImpl(String name, @Nullable String groupName,
ReliableChannel ch) {
+ public ClientAtomicLongImpl(String name, @Nullable String groupName,
ReliableChannelEx ch) {
// name and groupName uniquely identify the data structure.
this.name = name;
this.groupName = groupName;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
index b04ca6f8bba..72142323a25 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
@@ -53,7 +53,7 @@ public class ClientCacheEntryListenerHandler<K, V> implements
NotificationListen
private final Cache<K, V> jCacheAdapter;
/** */
- private final ReliableChannel ch;
+ private final ReliableChannelEx ch;
/** */
private final boolean keepBinary;
@@ -76,7 +76,7 @@ public class ClientCacheEntryListenerHandler<K, V> implements
NotificationListen
/** */
ClientCacheEntryListenerHandler(
Cache<K, V> jCacheAdapter,
- ReliableChannel ch,
+ ReliableChannelEx ch,
ClientBinaryMarshaller marsh,
boolean keepBinary
) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
index dd8c1e0b4d4..838aecc92a3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterGroupImpl.java
@@ -56,7 +56,7 @@ import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
*/
class ClientClusterGroupImpl implements ClientClusterGroup {
/** Channel. */
- protected final ReliableChannel ch;
+ protected final ReliableChannelEx ch;
/** Marshaller utils. */
protected final ClientUtils utils;
@@ -79,7 +79,7 @@ class ClientClusterGroupImpl implements ClientClusterGroup {
/**
*
*/
- ClientClusterGroupImpl(ReliableChannel ch, ClientBinaryMarshaller marsh) {
+ ClientClusterGroupImpl(ReliableChannelEx ch, ClientBinaryMarshaller marsh)
{
this.ch = ch;
utils = new ClientUtils(marsh);
@@ -90,7 +90,7 @@ class ClientClusterGroupImpl implements ClientClusterGroup {
/**
*
*/
- private ClientClusterGroupImpl(ReliableChannel ch, ClientUtils utils,
+ private ClientClusterGroupImpl(ReliableChannelEx ch, ClientUtils utils,
ProjectionFilters projectionFilters) {
this.ch = ch;
this.utils = utils;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
index 54a701b982f..2320329311d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientClusterImpl.java
@@ -33,7 +33,7 @@ public class ClientClusterImpl extends ClientClusterGroupImpl
implements ClientC
/**
* Constructor.
*/
- ClientClusterImpl(ReliableChannel ch, ClientBinaryMarshaller marsh) {
+ ClientClusterImpl(ReliableChannelEx ch, ClientBinaryMarshaller marsh) {
super(ch, marsh);
dfltClusterGrp = (ClientClusterGroupImpl)forServers();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
index d5f9907a0f5..7234473597c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
@@ -57,7 +57,7 @@ class ClientComputeImpl implements ClientCompute {
private static final byte NO_RESULT_CACHE_FLAG_MASK = 0x02;
/** Channel. */
- private final ReliableChannel ch;
+ private final ReliableChannelEx ch;
/** Utils for serialization/deserialization. */
private final ClientUtils utils;
@@ -69,7 +69,7 @@ class ClientComputeImpl implements ClientCompute {
private final AtomicInteger tasksCnt = new AtomicInteger();
/** Constructor. */
- ClientComputeImpl(ReliableChannel ch, ClientBinaryMarshaller marsh,
ClientClusterGroupImpl dfltGrp) {
+ ClientComputeImpl(ReliableChannelEx ch, ClientBinaryMarshaller marsh,
ClientClusterGroupImpl dfltGrp) {
this.ch = ch;
this.dfltGrp = dfltGrp;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
index 4116c851ab5..01f7923fed7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
@@ -40,7 +40,7 @@ class ClientFieldsQueryPager extends
GenericQueryPager<List<?>> implements Field
/** Constructor. */
ClientFieldsQueryPager(
- ReliableChannel ch,
+ ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
@@ -59,7 +59,7 @@ class ClientFieldsQueryPager extends
GenericQueryPager<List<?>> implements Field
/** Constructor. */
ClientFieldsQueryPager(
- ReliableChannel ch,
+ ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientIgniteSetImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientIgniteSetImpl.java
index 5f842d2f45c..289a08343f4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientIgniteSetImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientIgniteSetImpl.java
@@ -43,7 +43,7 @@ class ClientIgniteSetImpl<T> implements ClientIgniteSet<T> {
private final String name;
/** */
- private final ReliableChannel ch;
+ private final ReliableChannelEx ch;
/** */
private final ClientUtils serDes;
@@ -69,7 +69,7 @@ class ClientIgniteSetImpl<T> implements ClientIgniteSet<T> {
* @param cacheId Cache id.
*/
public ClientIgniteSetImpl(
- ReliableChannel ch,
+ ReliableChannelEx ch,
ClientUtils serDes,
String name,
boolean colocated,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java
index ffbe7184288..aa184eb9753 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java
@@ -36,7 +36,7 @@ class ClientQueryPager<K, V> extends
GenericQueryPager<Cache.Entry<K, V>> {
/** Constructor. */
ClientQueryPager(
- ReliableChannel ch,
+ ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
@@ -53,7 +53,7 @@ class ClientQueryPager<K, V> extends
GenericQueryPager<Cache.Entry<K, V>> {
/** Constructor. */
ClientQueryPager(
- ReliableChannel ch,
+ ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
index 7c65cd7ee71..6cda195ddf7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServicesImpl.java
@@ -57,7 +57,7 @@ class ClientServicesImpl implements ClientServices {
static final int SRV_TOP_UPDATE_PERIOD = 60_000;
/** Channel. */
- private final ReliableChannel ch;
+ private final ReliableChannelImpl ch;
/** Binary marshaller. */
private final ClientBinaryMarshaller marsh;
@@ -75,7 +75,7 @@ class ClientServicesImpl implements ClientServices {
private final Map<String, ServiceTopology> servicesTopologies;
/** Constructor. */
- ClientServicesImpl(ReliableChannel ch, ClientBinaryMarshaller marsh,
ClientClusterGroupImpl grp, IgniteLogger log) {
+ ClientServicesImpl(ReliableChannelImpl ch, ClientBinaryMarshaller marsh,
ClientClusterGroupImpl grp, IgniteLogger log) {
this.ch = ch;
this.marsh = marsh;
this.grp = grp;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
index a53e48b1b03..464a4c8c2be 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
@@ -39,7 +39,7 @@ abstract class GenericQueryPager<T> implements QueryPager<T> {
private final Consumer<PayloadOutputChannel> qryWriter;
/** Channel. */
- private final ReliableChannel ch;
+ private final ReliableChannelEx ch;
/** Client Transaction. */
private final @Nullable TcpClientTransaction tx;
@@ -64,7 +64,7 @@ abstract class GenericQueryPager<T> implements QueryPager<T> {
/** Constructor. */
GenericQueryPager(
- ReliableChannel ch,
+ ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
@@ -83,7 +83,7 @@ abstract class GenericQueryPager<T> implements QueryPager<T> {
/** Constructor. */
GenericQueryPager(
- ReliableChannel ch,
+ ReliableChannelEx ch,
@Nullable TcpClientTransaction tx,
ClientOperation qryOp,
ClientOperation pageQryOp,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannelEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannelEx.java
new file mode 100644
index 00000000000..75ec987df3d
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannelEx.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.client.ClientAuthenticationException;
+import org.apache.ignite.client.ClientAuthorizationException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.IgniteClientFuture;
+
+/**
+ * Interface for communication channel with failover and partition awareness.
+ */
+interface ReliableChannelEx extends AutoCloseable {
+ /**
+ * Send request and handle response.
+ *
+ * @throws ClientException Thrown by {@code payloadWriter} or {@code
payloadReader}.
+ * @throws ClientAuthenticationException When user name or password is
invalid.
+ * @throws ClientAuthorizationException When user has no permission to
perform operation.
+ * @throws ClientProtocolError When failed to handshake with server.
+ * @throws ClientServerError When failed to process request on server.
+ */
+ public <T> T service(
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
+ ) throws ClientException, ClientError;
+
+ /**
+ * Send request to one of the passed nodes and handle response.
+ *
+ * @throws ClientException Thrown by {@code payloadWriter} or {@code
payloadReader}.
+ * @throws ClientAuthenticationException When user name or password is
invalid.
+ * @throws ClientAuthorizationException When user has no permission to
perform operation.
+ * @throws ClientProtocolError When failed to handshake with server.
+ * @throws ClientServerError When failed to process request on server.
+ */
+ public <T> T service(
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader,
+ List<UUID> targetNodes
+ ) throws ClientException, ClientError;
+
+ /**
+ * Send request and handle response asynchronously.
+ */
+ public <T> IgniteClientFuture<T> serviceAsync(
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
+ ) throws ClientException, ClientError;
+
+ /**
+ * Send request to affinity node and handle response.
+ */
+ public <T> T affinityService(
+ int cacheId,
+ Object key,
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
+ ) throws ClientException, ClientError;
+
+ /**
+ * Send request to affinity node and handle response.
+ */
+ public <T> T affinityService(
+ int cacheId,
+ int part,
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
+ ) throws ClientException, ClientError;
+
+ /**
+ * Send request to affinity node and handle response asynchronously.
+ */
+ public <T> IgniteClientFuture<T> affinityServiceAsync(
+ int cacheId,
+ Object key,
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
+ ) throws ClientException, ClientError;
+
+ /**
+ * Send request without payload and handle response.
+ */
+ public default <T> T service(
+ ClientOperation op,
+ Function<PayloadInputChannel, T> payloadReader
+ ) throws ClientException, ClientError {
+ return service(op, null, payloadReader);
+ }
+
+ /**
+ * Send request without payload and handle response asynchronously.
+ */
+ public default <T> IgniteClientFuture<T> serviceAsync(
+ ClientOperation op,
+ Function<PayloadInputChannel, T> payloadReader
+ ) throws ClientException, ClientError {
+ return serviceAsync(op, null, payloadReader);
+ }
+
+ /**
+ * Send request and handle response without payload.
+ */
+ public default void request(
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter
+ ) throws ClientException, ClientError {
+ service(op, payloadWriter, null);
+ }
+
+ /**
+ * Send request and handle response without payload asynchronously.
+ */
+ public default IgniteClientFuture<Void> requestAsync(
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter
+ ) throws ClientException, ClientError {
+ return serviceAsync(op, payloadWriter, null);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannelImpl.java
similarity index 93%
rename from
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannelImpl.java
index cacc7aa5ed0..c3170cfee67 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannelImpl.java
@@ -42,7 +42,6 @@ import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.client.ClientAuthenticationException;
-import org.apache.ignite.client.ClientAuthorizationException;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientOperationType;
@@ -61,7 +60,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Communication channel with failover and partition awareness.
*/
-final class ReliableChannel implements AutoCloseable {
+final class ReliableChannelImpl implements ReliableChannelEx {
/** Channel factory. */
private final BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, ClientChannel> chFactory;
@@ -125,7 +124,7 @@ final class ReliableChannel implements AutoCloseable {
/**
* Constructor.
*/
- ReliableChannel(
+ ReliableChannelImpl(
BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, ClientChannel> chFactory,
ClientConfiguration clientCfg,
IgniteBinary binary
@@ -183,16 +182,8 @@ final class ReliableChannel implements AutoCloseable {
log.debug("ReliableChannel stopped");
}
- /**
- * Send request and handle response.
- *
- * @throws ClientException Thrown by {@code payloadWriter} or {@code
payloadReader}.
- * @throws ClientAuthenticationException When user name or password is
invalid.
- * @throws ClientAuthorizationException When user has no permission to
perform operation.
- * @throws ClientProtocolError When failed to handshake with server.
- * @throws ClientServerError When failed to process request on server.
- */
- public <T> T service(
+ /** {@inheritDoc} */
+ @Override public <T> T service(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
@@ -200,16 +191,8 @@ final class ReliableChannel implements AutoCloseable {
return service(op, payloadWriter, payloadReader,
affinityCtx.dataCenterNodes());
}
- /**
- * Send request to one of the passed nodes and handle response.
- *
- * @throws ClientException Thrown by {@code payloadWriter} or {@code
payloadReader}.
- * @throws ClientAuthenticationException When user name or password is
invalid.
- * @throws ClientAuthorizationException When user has no permission to
perform operation.
- * @throws ClientProtocolError When failed to handshake with server.
- * @throws ClientServerError When failed to process request on server.
- */
- public <T> T service(
+ /** {@inheritDoc} */
+ @Override public <T> T service(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader,
@@ -225,13 +208,11 @@ final class ReliableChannel implements AutoCloseable {
);
}
- /**
- * Send request and handle response asynchronously.
- */
- public <T> IgniteClientFuture<T> serviceAsync(
- ClientOperation op,
- Consumer<PayloadOutputChannel> payloadWriter,
- Function<PayloadInputChannel, T> payloadReader
+ /** {@inheritDoc} */
+ @Override public <T> IgniteClientFuture<T> serviceAsync(
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError {
CompletableFuture<T> fut = new CompletableFuture<>();
@@ -371,42 +352,8 @@ final class ReliableChannel implements AutoCloseable {
fut.completeExceptionally(composeException(failures));
}
- /**
- * Send request without payload and handle response.
- */
- public <T> T service(ClientOperation op, Function<PayloadInputChannel, T>
payloadReader)
- throws ClientException, ClientError {
- return service(op, null, payloadReader);
- }
-
- /**
- * Send request without payload and handle response asynchronously.
- */
- public <T> IgniteClientFuture<T> serviceAsync(ClientOperation op,
Function<PayloadInputChannel, T> payloadReader)
- throws ClientException, ClientError {
- return serviceAsync(op, null, payloadReader);
- }
-
- /**
- * Send request and handle response without payload.
- */
- public void request(ClientOperation op, Consumer<PayloadOutputChannel>
payloadWriter)
- throws ClientException, ClientError {
- service(op, payloadWriter, null);
- }
-
- /**
- * Send request and handle response without payload.
- */
- public IgniteClientFuture<Void> requestAsync(ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter)
- throws ClientException, ClientError {
- return serviceAsync(op, payloadWriter, null);
- }
-
- /**
- * Send request to affinity node and handle response.
- */
- public <T> T affinityService(
+ /** {@inheritDoc} */
+ @Override public <T> T affinityService(
int cacheId,
Object key,
ClientOperation op,
@@ -425,10 +372,8 @@ final class ReliableChannel implements AutoCloseable {
return service(op, payloadWriter, payloadReader);
}
- /**
- * Send request to affinity node and handle response.
- */
- public <T> T affinityService(
+ /** {@inheritDoc} */
+ @Override public <T> T affinityService(
int cacheId,
int part,
ClientOperation op,
@@ -447,10 +392,8 @@ final class ReliableChannel implements AutoCloseable {
return service(op, payloadWriter, payloadReader);
}
- /**
- * Send request to affinity node and handle response.
- */
- public <T> IgniteClientFuture<T> affinityServiceAsync(
+ /** {@inheritDoc} */
+ @Override public <T> IgniteClientFuture<T> affinityServiceAsync(
int cacheId,
Object key,
ClientOperation op,
@@ -1160,7 +1103,7 @@ final class ReliableChannel implements AutoCloseable {
ClientChannel channel = chFactory.apply(chCfg, connMgr);
if (channel.serverNodeId() != null) {
-
channel.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
+
channel.addTopologyChangeListener(ReliableChannelImpl.this::onTopologyChanged);
UUID prevId = serverNodeId;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index dffc38043b7..0cf8a9e46a4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -108,7 +108,7 @@ public class TcpClientCache<K, V> implements ClientCache<K,
V> {
private final int cacheId;
/** Channel. */
- private final ReliableChannel ch;
+ private final ReliableChannelImpl ch;
/** Cache name. */
private final String name;
@@ -142,13 +142,13 @@ public class TcpClientCache<K, V> implements
ClientCache<K, V> {
"non-transactional ClientCache %s operation within a transaction.";
/** Constructor. */
- TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller
marsh, TcpClientTransactions transactions,
+ TcpClientCache(String name, ReliableChannelImpl ch, ClientBinaryMarshaller
marsh, TcpClientTransactions transactions,
ClientCacheEntryListenersRegistry lsnrsRegistry, IgniteLogger log) {
this(name, ch, marsh, transactions, lsnrsRegistry, false, null, log);
}
/** Constructor. */
- TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller
marsh,
+ TcpClientCache(String name, ReliableChannelImpl ch, ClientBinaryMarshaller
marsh,
TcpClientTransactions transactions, ClientCacheEntryListenersRegistry
lsnrsRegistry, boolean keepBinary,
ExpiryPolicy expiryPlc, IgniteLogger log) {
this.name = name;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
index 24446be7ca2..02bc1408da3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
@@ -38,7 +38,7 @@ import static
org.apache.ignite.internal.client.thin.ProtocolVersionFeature.TRAN
*/
class TcpClientTransactions implements ClientTransactions {
/** Channel. */
- private final ReliableChannel ch;
+ private final ReliableChannelEx ch;
/** Marshaller. */
private final ClientBinaryMarshaller marsh;
@@ -56,7 +56,7 @@ class TcpClientTransactions implements ClientTransactions {
private final ClientTransactionConfiguration txCfg;
/** Constructor. */
- TcpClientTransactions(ReliableChannel ch, ClientBinaryMarshaller marsh,
ClientTransactionConfiguration txCfg) {
+ TcpClientTransactions(ReliableChannelEx ch, ClientBinaryMarshaller marsh,
ClientTransactionConfiguration txCfg) {
this.ch = ch;
this.marsh = marsh;
this.txCfg = txCfg;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 504f838aed6..b21eca85781 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -88,7 +88,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class TcpIgniteClient implements IgniteClient {
/** Channel. */
- private final ReliableChannel ch;
+ private final ReliableChannelImpl ch;
/** Ignite Binary. */
private final IgniteBinary binary;
@@ -148,7 +148,7 @@ public class TcpIgniteClient implements IgniteClient {
binary = new ClientBinary(marsh);
- ch = new ReliableChannel(chFactory, cfg, binary);
+ ch = new ReliableChannelImpl(chFactory, cfg, binary);
evtLsnrs = cfg.getEventListeners() == null ? null :
cfg.getEventListeners().clone();
@@ -536,7 +536,7 @@ public class TcpIgniteClient implements IgniteClient {
/**
* @return Channel.
*/
- ReliableChannel reliableChannel() {
+ ReliableChannelImpl reliableChannel() {
return ch;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelDuplicationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelDuplicationTest.java
index 819846e00aa..75abce6c588 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelDuplicationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelDuplicationTest.java
@@ -108,10 +108,10 @@ public class ReliableChannelDuplicationTest extends
ThinClientAbstractPartitionA
*
* @param holders List of channel holders.
*/
- private void assertNoDuplicates(List<ReliableChannel.ClientChannelHolder>
holders) {
+ private void
assertNoDuplicates(List<ReliableChannelImpl.ClientChannelHolder> holders) {
Set<InetSocketAddress> addrs = new HashSet<>();
- for (ReliableChannel.ClientChannelHolder holder : holders) {
+ for (ReliableChannelImpl.ClientChannelHolder holder : holders) {
holder.getAddresses().forEach(addr -> {
if (!addrs.add(addr))
throw new AssertionError("Duplicate remote address found:
" + addr);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index ce1380d7525..a029f263045 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -71,7 +71,7 @@ public class ReliableChannelTest {
ClientConfiguration ccfg = new ClientConfiguration().setAddresses(
"127.0.0.1:10800", "127.0.0.1:10800", "127.0.0.1:10801");
- ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+ ReliableChannelImpl rc = new ReliableChannelImpl(chFactory, ccfg,
null);
rc.channelsInit();
@@ -85,7 +85,7 @@ public class ReliableChannelTest {
public void testAddressWithoutPort() {
ClientConfiguration ccfg = new
ClientConfiguration().setAddresses("127.0.0.1");
- ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+ ReliableChannelImpl rc = new ReliableChannelImpl(chFactory, ccfg,
null);
rc.channelsInit();
@@ -116,7 +116,7 @@ public class ReliableChannelTest {
Set<String> usedChannels = new HashSet<>();
for (int i = 0; i < 100; i++) {
- ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+ ReliableChannelImpl rc = new ReliableChannelImpl(chFactory, ccfg,
null);
rc.channelsInit();
@@ -147,7 +147,7 @@ public class ReliableChannelTest {
.nextAddresesResponse("127.0.0.1:10811", "127.0.0.1:10811",
"127.0.0.1:10812", "127.0.0.1:10813");
ClientConfiguration ccfg = new
ClientConfiguration().setAddressesFinder(finder);
- ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+ ReliableChannelImpl rc = new ReliableChannelImpl(chFactory, ccfg,
null);
Supplier<List<String>> holderAddrs = () ->
rc.getChannelHolders().stream()
.map(h ->
F.first(h.getAddresses()).toString().replace("/<unresolved>", "")) // Replace
unnecessary part on JDK 17.
@@ -209,17 +209,17 @@ public class ReliableChannelTest {
/** */
private void checkDoesNotReinit(ClientConfiguration ccfg) {
- ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+ ReliableChannelImpl rc = new ReliableChannelImpl(chFactory, ccfg,
null);
rc.channelsInit();
- List<ReliableChannel.ClientChannelHolder> originalChannels =
rc.getChannelHolders();
- List<ReliableChannel.ClientChannelHolder> copyOriginalChannels = new
ArrayList<>(originalChannels);
+ List<ReliableChannelImpl.ClientChannelHolder> originalChannels =
rc.getChannelHolders();
+ List<ReliableChannelImpl.ClientChannelHolder> copyOriginalChannels =
new ArrayList<>(originalChannels);
// Imitate topology change.
rc.initChannelHolders();
- List<ReliableChannel.ClientChannelHolder> newChannels =
rc.getChannelHolders();
+ List<ReliableChannelImpl.ClientChannelHolder> newChannels =
rc.getChannelHolders();
assertSame(originalChannels, newChannels);
@@ -240,7 +240,7 @@ public class ReliableChannelTest {
.setAddresses(dfltAddrs)
.setPartitionAwarenessEnabled(false);
- ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+ ReliableChannelImpl rc = new ReliableChannelImpl(chFactory, ccfg,
null);
rc.channelsInit();
@@ -266,24 +266,24 @@ public class ReliableChannelTest {
ClientConfiguration ccfg = new
ClientConfiguration().setAddressesFinder(finder);
- ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+ ReliableChannelImpl rc = new ReliableChannelImpl(chFactory, ccfg,
null);
rc.channelsInit();
- List<ReliableChannel.ClientChannelHolder> originChannels =
Collections.unmodifiableList(rc.getChannelHolders());
+ List<ReliableChannelImpl.ClientChannelHolder> originChannels =
Collections.unmodifiableList(rc.getChannelHolders());
// Imitate topology change.
rc.initChannelHolders();
- assertEquals(2, F.size(originChannels,
ReliableChannel.ClientChannelHolder::isClosed));
+ assertEquals(2, F.size(originChannels,
ReliableChannelImpl.ClientChannelHolder::isClosed));
- List<ReliableChannel.ClientChannelHolder> reuseChannel =
originChannels.stream()
+ List<ReliableChannelImpl.ClientChannelHolder> reuseChannel =
originChannels.stream()
.filter(c -> !c.isClosed())
.collect(Collectors.toList());
assertEquals(1, reuseChannel.size());
- List<ReliableChannel.ClientChannelHolder> newChannels =
rc.getChannelHolders();
+ List<ReliableChannelImpl.ClientChannelHolder> newChannels =
rc.getChannelHolders();
assertEquals(2, newChannels.size());
@@ -305,7 +305,7 @@ public class ReliableChannelTest {
.setAddressesFinder(finder)
.setPartitionAwarenessEnabled(false);
- ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+ ReliableChannelImpl rc = new ReliableChannelImpl(chFactory, ccfg,
null);
rc.channelsInit();
@@ -329,7 +329,7 @@ public class ReliableChannelTest {
.setAddresses(dfltAddrs)
.setPartitionAwarenessEnabled(true);
- ReliableChannel rc = new ReliableChannel((cfg, hnd) -> new
TestFailureClientChannel(), ccfg, null);
+ ReliableChannelImpl rc = new ReliableChannelImpl((cfg, hnd) -> new
TestFailureClientChannel(), ccfg, null);
rc.channelsInit();
}
@@ -361,7 +361,7 @@ public class ReliableChannelTest {
.nextAddresesResponse(addrs);
ClientConfiguration ccfg = new
ClientConfiguration().setAddressesFinder(finder);
- ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+ ReliableChannelImpl rc = new ReliableChannelImpl(chFactory, ccfg,
null);
rc.channelsInit();
int initCnt = rc.getChannelHolders().size();
@@ -395,7 +395,7 @@ public class ReliableChannelTest {
// Emulate cluster is down after TcpClientChannel#send operation.
AtomicInteger step = new AtomicInteger();
- ReliableChannel rc = new ReliableChannel((cfg, hnd) -> {
+ ReliableChannelImpl rc = new ReliableChannelImpl((cfg, hnd) -> {
if (step.getAndIncrement() == 0)
return new TestAsyncServiceFailureClientChannel();
else
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index 6bc4c3dec45..a7096fc646e 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -289,10 +289,10 @@ public abstract class
ThinClientAbstractPartitionAwarenessTest extends GridCommo
* @return {@code true} if the channel is connected, {@code false}
otherwise.
*/
protected boolean isConnected(int chIdx) {
- List<ReliableChannel.ClientChannelHolder> channelHolders =
((TcpIgniteClient)client).reliableChannel().getChannelHolders();
+ List<ReliableChannelImpl.ClientChannelHolder> channelHolders =
((TcpIgniteClient)client).reliableChannel().getChannelHolders();
int chPort = DFLT_PORT + chIdx;
- for (ReliableChannel.ClientChannelHolder holder : channelHolders) {
+ for (ReliableChannelImpl.ClientChannelHolder holder : channelHolders) {
if (holder == null || holder.isClosed()) {
continue;
}