This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 40e140db4f IGNITE-20921 Add stopGuard and busyLock to client connector
classes (#2911)
40e140db4f is described below
commit 40e140db4ff608800383f092ba59ea66c2fe796d
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Dec 1 16:44:07 2023 +0200
IGNITE-20921 Add stopGuard and busyLock to client connector classes (#2911)
Update `ClientHandlerModule`, `ClientResourceRegistry`,
`ClientPrimaryReplicaTracker`.
---
.../ignite/client/handler/ClientHandlerModule.java | 67 ++++++++++++++--------
.../handler/ClientPrimaryReplicaTracker.java | 48 +++++++++++++++-
.../client/handler/ClientResourceRegistry.java | 54 ++++++++---------
3 files changed, 115 insertions(+), 54 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index b90e257f51..274afd20d7 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -31,6 +31,7 @@ import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import
org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
@@ -52,6 +53,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterService;
@@ -117,6 +119,10 @@ public class ClientHandlerModule implements
IgniteComponent {
private final ClientPrimaryReplicaTracker primaryReplicaTracker;
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
/**
* Constructor.
*
@@ -211,6 +217,12 @@ public class ClientHandlerModule implements
IgniteComponent {
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
metricManager.unregisterSource(metrics);
primaryReplicaTracker.stop();
@@ -255,36 +267,46 @@ public class ClientHandlerModule implements
IgniteComponent {
bootstrap.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
- long connectionId =
CONNECTION_ID_GEN.incrementAndGet();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("New client connection [connectionId=" +
connectionId
- + ", remoteAddress=" + ch.remoteAddress()
+ ']');
+ if (!busyLock.enterBusy()) {
+ ch.close();
+ return;
}
- if (configuration.idleTimeout() > 0) {
- IdleStateHandler idleStateHandler = new
IdleStateHandler(
- configuration.idleTimeout(), 0, 0,
TimeUnit.MILLISECONDS);
+ try {
+ long connectionId =
CONNECTION_ID_GEN.incrementAndGet();
- ch.pipeline().addLast(idleStateHandler);
- ch.pipeline().addLast(new
IdleChannelHandler(configuration.idleTimeout(), metrics, connectionId));
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New client connection
[connectionId=" + connectionId
+ + ", remoteAddress=" +
ch.remoteAddress() + ']');
+ }
- if (sslContext != null) {
- ch.pipeline().addFirst("ssl",
sslContext.newHandler(ch.alloc()));
- }
+ if (configuration.idleTimeout() > 0) {
+ IdleStateHandler idleStateHandler = new
IdleStateHandler(
+ configuration.idleTimeout(), 0, 0,
TimeUnit.MILLISECONDS);
+
+ ch.pipeline().addLast(idleStateHandler);
+ ch.pipeline().addLast(new
IdleChannelHandler(configuration.idleTimeout(), metrics, connectionId));
+ }
- ClientInboundMessageHandler messageHandler =
createInboundMessageHandler(configuration, clusterId, connectionId);
- authenticationManager.listen(messageHandler);
+ if (sslContext != null) {
+ ch.pipeline().addFirst("ssl",
sslContext.newHandler(ch.alloc()));
+ }
- ch.pipeline().addLast(
- new ClientMessageDecoder(),
- messageHandler
- );
+ ClientInboundMessageHandler messageHandler =
createInboundMessageHandler(
+ configuration, clusterId, connectionId);
+ authenticationManager.listen(messageHandler);
- ch.closeFuture().addListener(future ->
authenticationManager.stopListen(messageHandler));
+ ch.pipeline().addLast(
+ new ClientMessageDecoder(),
+ messageHandler
+ );
- metrics.connectionsInitiatedIncrement();
+ ch.closeFuture().addListener(future ->
authenticationManager.stopListen(messageHandler));
+
+ metrics.connectionsInitiatedIncrement();
+ } finally {
+ busyLock.leaveBusy();
+ }
}
})
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
configuration.connectTimeout());
@@ -340,5 +362,4 @@ public class ClientHandlerModule implements IgniteComponent
{
primaryReplicaTracker
);
}
-
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
index a87cc9bc75..c037538201 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
@@ -35,12 +36,14 @@ import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.event.EventParameters;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.TableNotFoundException;
import org.jetbrains.annotations.Nullable;
@@ -77,6 +80,10 @@ public class ClientPrimaryReplicaTracker implements
EventListener<EventParameter
private final SchemaSyncService schemaSyncService;
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
/**
* Constructor.
*
@@ -104,6 +111,25 @@ public class ClientPrimaryReplicaTracker implements
EventListener<EventParameter
* @return Primary replicas for the table, or null when not yet known.
*/
public CompletableFuture<PrimaryReplicasResult> primaryReplicasAsync(int
tableId, @Nullable Long maxStartTime) {
+ if (!busyLock.enterBusy()) {
+ return CompletableFuture.failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ return primaryReplicasAsyncInternal(tableId, maxStartTime);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Gets primary replicas by partition for the table.
+ *
+ * @param tableId Table ID.
+ * @param maxStartTime Timestamp.
+ * @return Primary replicas for the table, or null when not yet known.
+ */
+ private CompletableFuture<PrimaryReplicasResult>
primaryReplicasAsyncInternal(int tableId, @Nullable Long maxStartTime) {
HybridTimestamp timestamp = clock.now();
if (maxStartTime == null) {
@@ -227,6 +253,12 @@ public class ClientPrimaryReplicaTracker implements
EventListener<EventParameter
@SuppressWarnings({"rawtypes", "unchecked"})
void stop() {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
catalogService.removeListener(CatalogEvent.TABLE_DROP, (EventListener)
this);
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
(EventListener) this);
primaryReplicas.clear();
@@ -234,10 +266,22 @@ public class ClientPrimaryReplicaTracker implements
EventListener<EventParameter
@Override
public CompletableFuture<Boolean> notify(EventParameters parameters,
@Nullable Throwable exception) {
- if (exception != null) {
- return CompletableFuture.completedFuture(false);
+ if (!busyLock.enterBusy()) {
+ return CompletableFuture.failedFuture(new NodeStoppingException());
}
+ try {
+ if (exception != null) {
+ return CompletableFuture.completedFuture(false);
+ }
+
+ return notifyInternal(parameters);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private CompletableFuture<Boolean> notifyInternal(EventParameters
parameters) {
if (parameters instanceof DropTableEventParameters) {
removeTable((DropTableEventParameters) parameters);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
index 02e9441307..ed418d70a2 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
@@ -19,11 +19,11 @@ package org.apache.ignite.client.handler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
/**
* Per-connection resource registry.
@@ -42,12 +42,9 @@ public class ClientResourceRegistry {
/**
* RW lock.
*/
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
- /**
- * Closed flag.
- */
- private volatile boolean closed;
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
/**
* Stores the resource and returns the generated id.
@@ -116,31 +113,30 @@ public class ClientResourceRegistry {
* Closes the registry and releases all resources.
*/
public void close() {
- rwLock.writeLock().lock();
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
- try {
- closed = true;
- IgniteInternalException ex = null;
-
- for (ClientResource r : res.values()) {
- try {
- r.release();
- } catch (Exception e) {
- if (ex == null) {
- ex = new IgniteInternalException(e);
- } else {
- ex.addSuppressed(e);
- }
+ busyLock.block();
+
+ IgniteInternalException ex = null;
+
+ for (ClientResource r : res.values()) {
+ try {
+ r.release();
+ } catch (Exception e) {
+ if (ex == null) {
+ ex = new IgniteInternalException(e);
+ } else {
+ ex.addSuppressed(e);
}
}
+ }
- res.clear();
+ res.clear();
- if (ex != null) {
- throw ex;
- }
- } finally {
- rwLock.writeLock().unlock();
+ if (ex != null) {
+ throw ex;
}
}
@@ -148,7 +144,7 @@ public class ClientResourceRegistry {
* Enters the lock.
*/
private void enter() throws IgniteInternalCheckedException {
- if (!rwLock.readLock().tryLock() || closed) {
+ if (!busyLock.enterBusy()) {
throw new IgniteInternalCheckedException("Resource registry is
closed.");
}
}
@@ -157,6 +153,6 @@ public class ClientResourceRegistry {
* Leaves the lock.
*/
private void leave() {
- rwLock.readLock().unlock();
+ busyLock.leaveBusy();
}
}