This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/GEODE-5925_unlock in repository https://gitbox.apache.org/repos/asf/geode.git
commit e70f676fc65f8372564698d7b9c120109d5b0e5b Author: Bruce Schuchardt <[email protected]> AuthorDate: Thu Oct 25 15:16:40 2018 -0700 GEODE-5925 locks are not released in shutdown hook If the cache is closed because the JVM is exiting we aren't releasing locks until the DistributedSystem disconnects. This alters the behavior of shutdown and, in particular, causes long delays in designating new primary bucket owners and reestablishing redundancy. This PR enables unlocking during shutdown. --- .../internal/InternalDistributedSystem.java | 41 ++++++++------- .../distributed/internal/locks/DLockService.java | 10 +--- .../apache/geode/internal/cache/BucketAdvisor.java | 26 ++++------ .../cache/DestroyPartitionedRegionMessage.java | 2 +- .../cache/entries/AbstractRegionEntry.java | 2 +- .../internal/locks/DLockServiceJUnitTest.java | 60 ++++++++++++++++++++++ 6 files changed, 95 insertions(+), 46 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java index 28a6b8d..f940d32 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java @@ -158,6 +158,18 @@ public class InternalDistributedSystem extends DistributedSystem new AtomicReference<CreationStackGenerator>(DEFAULT_CREATION_STACK_GENERATOR); /** + * A value of Boolean.TRUE will identify a thread being used to execute + * disconnectListeners. {@link #addDisconnectListener} will not throw ShutdownException if the + * value is Boolean.TRUE. + */ + final ThreadLocal<Boolean> isDisconnectThread = new ThreadLocal() { + @Override + public Boolean initialValue() { + return Boolean.FALSE; + } + }; + + /** * The distribution manager that is used to communicate with the distributed system. */ protected DistributionManager dm; @@ -1054,7 +1066,7 @@ public class InternalDistributedSystem extends DistributedSystem Runnable r = new Runnable() { public void run() { try { - disconnectListenerThread.set(Boolean.TRUE); + isDisconnectThread.set(Boolean.TRUE); dc.onDisconnect(InternalDistributedSystem.this); } catch (CancelException e) { if (logger.isDebugEnabled()) { @@ -1095,11 +1107,12 @@ public class InternalDistributedSystem extends DistributedSystem } - public boolean isDisconnectListenerThread() { - Boolean disconnectListenerThreadBoolean = (Boolean) this.disconnectListenerThread.get(); + public boolean isDisconnectThread() { + return this.isDisconnectThread.get(); + } - return disconnectListenerThreadBoolean != null - && disconnectListenerThreadBoolean.booleanValue(); + public void setIsDisconnectThread() { + this.isDisconnectThread.set(Boolean.TRUE); } /** @@ -1343,8 +1356,8 @@ public class InternalDistributedSystem extends DistributedSystem // the distributed system close. InternalCache currentCache = getCache(); if (currentCache != null && !currentCache.isClosed()) { - disconnectListenerThread.set(Boolean.TRUE); // bug #42663 - this must be set while - // closing the cache + isDisconnectThread.set(Boolean.TRUE); // bug #42663 - this must be set while + // closing the cache try { currentCache.close(reason, dm.getRootCause(), keepAlive, true); // fix for 42150 } catch (VirtualMachineError e) { @@ -1358,7 +1371,7 @@ public class InternalDistributedSystem extends DistributedSystem "Exception trying to close cache", e); } finally { - disconnectListenerThread.set(Boolean.FALSE); + isDisconnectThread.set(Boolean.FALSE); } } @@ -2198,10 +2211,9 @@ public class InternalDistributedSystem extends DistributedSystem synchronized (this.listeners) { this.listeners.add(listener); - Boolean disconnectListenerThreadBoolean = (Boolean) disconnectListenerThread.get(); + boolean disconnectThreadBoolean = isDisconnectThread.get(); - if (disconnectListenerThreadBoolean == null - || !disconnectListenerThreadBoolean.booleanValue()) { + if (!disconnectThreadBoolean) { // Don't add disconnect listener after messaging has been disabled. // Do this test _after_ adding the listener to narrow the window. // It's possible to miss it still and never invoke the listener, but @@ -2219,13 +2231,6 @@ public class InternalDistributedSystem extends DistributedSystem } /** - * A non-null value of Boolean.TRUE will identify a thread being used to execute - * disconnectListeners. {@link #addDisconnectListener} will not throw ShutdownException if the - * value is Boolean.TRUE. - */ - final ThreadLocal disconnectListenerThread = new ThreadLocal(); - - /** * Removes a <code>DisconnectListener</code> from the list of listeners that will be notified when * this connection to the distributed system is disconnected. * diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java index ccd88b7..3f721a6 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java @@ -1727,14 +1727,6 @@ public class DLockService extends DistributedLockService { public void unlock(Object name) throws LockNotHeldException, LeaseExpiredException { final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); - if (this.ds.isDisconnectListenerThread()) { - if (isDebugEnabled_DLS) { - logger.trace(LogMarker.DLS_VERBOSE, - "{}, name: {} - disconnect listener thread is exiting unlock()", this, name); - } - return; - } - if (isDebugEnabled_DLS) { logger.trace(LogMarker.DLS_VERBOSE, "{}, name: {} - entering unlock()", this, name); } @@ -2292,7 +2284,7 @@ public class DLockService extends DistributedLockService { } // if hasActiveLocks, tell grantor we're destroying... - if (!isCurrentlyLockGrantor && maybeHasActiveLocks && !this.ds.isDisconnectListenerThread()) { + if (!isCurrentlyLockGrantor && maybeHasActiveLocks && !this.ds.isDisconnectThread()) { boolean retry; int nonGrantorDestroyLoopCount = 0; do { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java index 197da6d..62b27a9 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java @@ -200,7 +200,11 @@ public class BucketAdvisor extends CacheDistributionAdvisor { } } - private void assignStartingBucketAdvisor() { + private void assignStartingBucketAdvisorIfFixedPartitioned() { + if (startingBucketAdvisor != null) { + // already assigned + return; + } if (this.pRegion.isFixedPartitionedRegion()) { List<FixedPartitionAttributesImpl> fpas = this.pRegion.getFixedPartitionAttributesImpl(); if (fpas != null) { @@ -1852,24 +1856,14 @@ public class BucketAdvisor extends CacheDistributionAdvisor { if (parentAdvisor != null) { return; } - if (startingBucketAdvisor == null) { - assignStartingBucketAdvisor(); - if (startingBucketAdvisor != null) { - return; - } - } else { + assignStartingBucketAdvisorIfFixedPartitioned(); + if (startingBucketAdvisor != null) { return; } - // TODO fix this method to not release any locks if the - // redundancy is zero, since no locks are grabbed. try { DistributedMemberLock thePrimaryLock = getPrimaryLock(false); if (thePrimaryLock != null) { thePrimaryLock.unlock(); - } else { - // InternalDistributedSystem.isDisconnecting probably prevented us from - // creating the DLS... hope there's a thread closing this advisor but - // it's probably not safe to assert that it already happened } } catch (LockNotHeldException e) { Assert.assertTrue(!isHosting(), "Got LockNotHeldException for Bucket = " + this); @@ -2537,7 +2531,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor { // accordingly if (parentBA != null) { // Fix for 44350 - we don't want to get a primary move lock on - // the advisor, becuase that might deadlock with a user thread. + // the advisor, because that might deadlock with a user thread. // However, since all depose/elect operations on the parent bucket // cascade to the child bucket and get the child bucket move lock, // if should be safe to check this without the lock here. @@ -2548,9 +2542,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor { } } else { // we're not colocated, need to get the dlock - if (startingBucketAdvisor == null) { - assignStartingBucketAdvisor(); - } + assignStartingBucketAdvisorIfFixedPartitioned(); if (startingBucketAdvisor != null) { Assert.assertHoldsLock(this, false); synchronized (startingBucketAdvisor) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java index 7c63349..8ef9568 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java @@ -152,7 +152,7 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage { if (DistributionAdvisor.isNewerSerialNumber(oldSerial, this.prSerial)) { ok = false; if (logger.isDebugEnabled()) { - logger.debug("Not removing region {}l serial requested = {}; actual is {}", r.getName(), + logger.debug("Not removing region {} serial requested = {}; actual is {}", r.getName(), this.prSerial, r.getSerialNumber()); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java index 8d8e201..3858f89 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java @@ -158,7 +158,7 @@ public abstract class AbstractRegionEntry implements HashRegionEntry<Object, Obj event.setCallbacksInvokedByCurrentThread(); - if (logger.isDebugEnabled()) { + if (logger.isDebugEnabled() && !rgn.isInternalRegion()) { logger.debug("{} dispatching event {}", this, event); } // All the following code that sets "thr" is to workaround diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockServiceJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockServiceJUnitTest.java new file mode 100644 index 0000000..cedfbbd --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockServiceJUnitTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed.internal.locks; + +import static org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID.system; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.distributed.DistributedLockService; +import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.internal.InternalDistributedSystem; + +public class DLockServiceJUnitTest { + + DistributedSystem system; + DistributedLockService lockService; + + @Before + public void setup() { + Properties properties = new Properties(); + properties.put(ConfigurationProperties.LOCATORS, ""); + properties.put(ConfigurationProperties.MCAST_PORT, "0"); + system = DistributedSystem.connect(properties); + lockService = DistributedLockService.create("Test Lock Service", system); + } + + @After + public void teardown() { + if (system != null) { + system.disconnect(); + } + } + + @Test + public void locksAreReleasedDuringDisconnect() { + assertThat(lockService.lock("MyLock", 0, -1)).isTrue(); + assertThat(lockService.isHeldByCurrentThread("MyLock")).isTrue(); + ((InternalDistributedSystem) system).setIsDisconnectThread(); + lockService.unlock("MyLock"); + assertThat(lockService.isHeldByCurrentThread("MyLock")).isFalse(); + } +}
