This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 40e140db4f IGNITE-20921 Add stopGuard and busyLock to client connector 
classes (#2911)
40e140db4f is described below

commit 40e140db4ff608800383f092ba59ea66c2fe796d
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Dec 1 16:44:07 2023 +0200

    IGNITE-20921 Add stopGuard and busyLock to client connector classes (#2911)
    
    Update `ClientHandlerModule`, `ClientResourceRegistry`, 
`ClientPrimaryReplicaTracker`.
---
 .../ignite/client/handler/ClientHandlerModule.java | 67 ++++++++++++++--------
 .../handler/ClientPrimaryReplicaTracker.java       | 48 +++++++++++++++-
 .../client/handler/ClientResourceRegistry.java     | 54 ++++++++---------
 3 files changed, 115 insertions(+), 54 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index b90e257f51..274afd20d7 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -31,6 +31,7 @@ import java.net.InetSocketAddress;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import 
org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
@@ -52,6 +53,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.ErrorGroups;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterService;
@@ -117,6 +119,10 @@ public class ClientHandlerModule implements 
IgniteComponent {
 
     private final ClientPrimaryReplicaTracker primaryReplicaTracker;
 
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
     /**
      * Constructor.
      *
@@ -211,6 +217,12 @@ public class ClientHandlerModule implements 
IgniteComponent {
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
         metricManager.unregisterSource(metrics);
         primaryReplicaTracker.stop();
 
@@ -255,36 +267,46 @@ public class ClientHandlerModule implements 
IgniteComponent {
         bootstrap.childHandler(new ChannelInitializer<>() {
                     @Override
                     protected void initChannel(Channel ch) {
-                        long connectionId = 
CONNECTION_ID_GEN.incrementAndGet();
-
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("New client connection [connectionId=" + 
connectionId
-                                    + ", remoteAddress=" + ch.remoteAddress() 
+ ']');
+                        if (!busyLock.enterBusy()) {
+                            ch.close();
+                            return;
                         }
 
-                        if (configuration.idleTimeout() > 0) {
-                            IdleStateHandler idleStateHandler = new 
IdleStateHandler(
-                                    configuration.idleTimeout(), 0, 0, 
TimeUnit.MILLISECONDS);
+                        try {
+                            long connectionId = 
CONNECTION_ID_GEN.incrementAndGet();
 
-                            ch.pipeline().addLast(idleStateHandler);
-                            ch.pipeline().addLast(new 
IdleChannelHandler(configuration.idleTimeout(), metrics, connectionId));
-                        }
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("New client connection 
[connectionId=" + connectionId
+                                        + ", remoteAddress=" + 
ch.remoteAddress() + ']');
+                            }
 
-                        if (sslContext != null) {
-                            ch.pipeline().addFirst("ssl", 
sslContext.newHandler(ch.alloc()));
-                        }
+                            if (configuration.idleTimeout() > 0) {
+                                IdleStateHandler idleStateHandler = new 
IdleStateHandler(
+                                        configuration.idleTimeout(), 0, 0, 
TimeUnit.MILLISECONDS);
+
+                                ch.pipeline().addLast(idleStateHandler);
+                                ch.pipeline().addLast(new 
IdleChannelHandler(configuration.idleTimeout(), metrics, connectionId));
+                            }
 
-                        ClientInboundMessageHandler messageHandler = 
createInboundMessageHandler(configuration, clusterId, connectionId);
-                        authenticationManager.listen(messageHandler);
+                            if (sslContext != null) {
+                                ch.pipeline().addFirst("ssl", 
sslContext.newHandler(ch.alloc()));
+                            }
 
-                        ch.pipeline().addLast(
-                                new ClientMessageDecoder(),
-                                messageHandler
-                        );
+                            ClientInboundMessageHandler messageHandler = 
createInboundMessageHandler(
+                                    configuration, clusterId, connectionId);
+                            authenticationManager.listen(messageHandler);
 
-                        ch.closeFuture().addListener(future -> 
authenticationManager.stopListen(messageHandler));
+                            ch.pipeline().addLast(
+                                    new ClientMessageDecoder(),
+                                    messageHandler
+                            );
 
-                        metrics.connectionsInitiatedIncrement();
+                            ch.closeFuture().addListener(future -> 
authenticationManager.stopListen(messageHandler));
+
+                            metrics.connectionsInitiatedIncrement();
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
                     }
                 })
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
configuration.connectTimeout());
@@ -340,5 +362,4 @@ public class ClientHandlerModule implements IgniteComponent 
{
                 primaryReplicaTracker
         );
     }
-
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
index a87cc9bc75..c037538201 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
@@ -35,12 +36,14 @@ import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.event.EventParameters;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.jetbrains.annotations.Nullable;
 
@@ -77,6 +80,10 @@ public class ClientPrimaryReplicaTracker implements 
EventListener<EventParameter
 
     private final SchemaSyncService schemaSyncService;
 
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
     /**
      * Constructor.
      *
@@ -104,6 +111,25 @@ public class ClientPrimaryReplicaTracker implements 
EventListener<EventParameter
      * @return Primary replicas for the table, or null when not yet known.
      */
     public CompletableFuture<PrimaryReplicasResult> primaryReplicasAsync(int 
tableId, @Nullable Long maxStartTime) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return primaryReplicasAsyncInternal(tableId, maxStartTime);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Gets primary replicas by partition for the table.
+     *
+     * @param tableId Table ID.
+     * @param maxStartTime Timestamp.
+     * @return Primary replicas for the table, or null when not yet known.
+     */
+    private CompletableFuture<PrimaryReplicasResult> 
primaryReplicasAsyncInternal(int tableId, @Nullable Long maxStartTime) {
         HybridTimestamp timestamp = clock.now();
 
         if (maxStartTime == null) {
@@ -227,6 +253,12 @@ public class ClientPrimaryReplicaTracker implements 
EventListener<EventParameter
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     void stop() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
         catalogService.removeListener(CatalogEvent.TABLE_DROP, (EventListener) 
this);
         
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
(EventListener) this);
         primaryReplicas.clear();
@@ -234,10 +266,22 @@ public class ClientPrimaryReplicaTracker implements 
EventListener<EventParameter
 
     @Override
     public CompletableFuture<Boolean> notify(EventParameters parameters, 
@Nullable Throwable exception) {
-        if (exception != null) {
-            return CompletableFuture.completedFuture(false);
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new NodeStoppingException());
         }
 
+        try {
+            if (exception != null) {
+                return CompletableFuture.completedFuture(false);
+            }
+
+            return notifyInternal(parameters);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Boolean> notifyInternal(EventParameters 
parameters) {
         if (parameters instanceof DropTableEventParameters) {
             removeTable((DropTableEventParameters) parameters);
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
index 02e9441307..ed418d70a2 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
@@ -19,11 +19,11 @@ package org.apache.ignite.client.handler;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 
 /**
  * Per-connection resource registry.
@@ -42,12 +42,9 @@ public class ClientResourceRegistry {
     /**
      * RW lock.
      */
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    /**
-     * Closed flag.
-     */
-    private volatile boolean closed;
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
 
     /**
      * Stores the resource and returns the generated id.
@@ -116,31 +113,30 @@ public class ClientResourceRegistry {
      * Closes the registry and releases all resources.
      */
     public void close() {
-        rwLock.writeLock().lock();
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
 
-        try {
-            closed = true;
-            IgniteInternalException ex = null;
-
-            for (ClientResource r : res.values()) {
-                try {
-                    r.release();
-                } catch (Exception e) {
-                    if (ex == null) {
-                        ex = new IgniteInternalException(e);
-                    } else {
-                        ex.addSuppressed(e);
-                    }
+        busyLock.block();
+
+        IgniteInternalException ex = null;
+
+        for (ClientResource r : res.values()) {
+            try {
+                r.release();
+            } catch (Exception e) {
+                if (ex == null) {
+                    ex = new IgniteInternalException(e);
+                } else {
+                    ex.addSuppressed(e);
                 }
             }
+        }
 
-            res.clear();
+        res.clear();
 
-            if (ex != null) {
-                throw ex;
-            }
-        } finally {
-            rwLock.writeLock().unlock();
+        if (ex != null) {
+            throw ex;
         }
     }
 
@@ -148,7 +144,7 @@ public class ClientResourceRegistry {
      * Enters the lock.
      */
     private void enter() throws IgniteInternalCheckedException {
-        if (!rwLock.readLock().tryLock() || closed) {
+        if (!busyLock.enterBusy()) {
             throw new IgniteInternalCheckedException("Resource registry is 
closed.");
         }
     }
@@ -157,6 +153,6 @@ public class ClientResourceRegistry {
      * Leaves the lock.
      */
     private void leave() {
-        rwLock.readLock().unlock();
+        busyLock.leaveBusy();
     }
 }

Reply via email to