This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit fe190e6b99fedbeadd92c73937aa57aa547a9ce0 Author: Jacob Barrett <jbarr...@pivotal.io> AuthorDate: Thu May 20 15:09:56 2021 -0700 GEODE-6588: Cleanup GatewaySenderAdvisor --- .../distributed/internal/locks/DLockService.java | 2 + .../internal/cache/wan/GatewaySenderAdvisor.java | 198 +++++++++------------ 2 files changed, 87 insertions(+), 113 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java index 3813c16..652245c 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; @@ -2720,6 +2721,7 @@ public class DLockService extends DistributedLockService { * * @see org.apache.geode.distributed.DistributedLockService#create(String, DistributedSystem) */ + @NotNull public static DistributedLockService create(String serviceName, InternalDistributedSystem ds, boolean distributed, boolean destroyOnDisconnect, boolean automateFreeResources) throws IllegalArgumentException, IllegalStateException { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java index 401296c..bf50c63 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java @@ -41,7 +41,6 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; import org.apache.geode.distributed.internal.locks.DLockService; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.Assert; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.UpdateAttributesProcessor; @@ -65,12 +64,12 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { private Thread lockObtainingThread; - private AbstractGatewaySender sender; + private final AbstractGatewaySender sender; private GatewaySenderAdvisor(DistributionAdvisee sender) { super(sender); this.sender = (AbstractGatewaySender) sender; - this.lockToken = getDLockServiceName() + "-token"; + lockToken = getDLockServiceName() + "-token"; } public static GatewaySenderAdvisor createGatewaySenderAdvisor(DistributionAdvisee sender) { @@ -80,11 +79,11 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } public String getDLockServiceName() { - return getClass().getName() + "_" + this.sender.getId(); + return getClass().getName() + "_" + sender.getId(); } public Thread getLockObtainingThread() { - return this.lockObtainingThread; + return lockObtainingThread; } /** Instantiate new Sender profile for this member */ @@ -163,7 +162,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } } - List<String> senderEventFilterClassNames = new ArrayList<String>(); + List<String> senderEventFilterClassNames = new ArrayList<>(); for (org.apache.geode.cache.wan.GatewayEventFilter filter : sender.getGatewayEventFilters()) { senderEventFilterClassNames.add(filter.getClass().getName()); } @@ -183,7 +182,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } } - Set<String> senderTransportFilterClassNames = new LinkedHashSet<String>(); + Set<String> senderTransportFilterClassNames = new LinkedHashSet<>(); for (GatewayTransportFilter filter : sender.getGatewayTransportFilters()) { senderTransportFilterClassNames.add(filter.getClass().getName()); } @@ -205,7 +204,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } } } - List<String> senderEventListenerClassNames = new ArrayList<String>(); + List<String> senderEventListenerClassNames = new ArrayList<>(); for (AsyncEventListener listener : sender.getAsyncEventListeners()) { senderEventListenerClassNames.add(listener.getClass().getName()); } @@ -259,13 +258,13 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { return; } // IF this sender is not primary - if (!this.sender.isPrimary()) { + if (!sender.isPrimary()) { if (!adviseEldestGatewaySender()) {// AND this is not the eldest // sender if (logger.isDebugEnabled()) { logger.debug( "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", - this.sender); + sender); } return; } @@ -273,7 +272,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } } else { if (sp.serverLocation != null) { - this.sender.setServerLocation(sp.serverLocation); + sender.setServerLocation(sp.serverLocation); } } } @@ -293,12 +292,12 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { if (advisePrimaryGatewaySender() != null) { return; } - if (!this.sender.isPrimary()) {// IF this sender is not primary + if (!sender.isPrimary()) {// IF this sender is not primary if (!adviseEldestGatewaySender()) {// AND this is not the eldest sender if (logger.isDebugEnabled()) { logger.debug( "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", - this.sender); + sender); } return; } @@ -309,25 +308,24 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } public boolean isPrimary() { - return sender.isParallel() || this.isPrimary; + return sender.isParallel() || isPrimary; } public void initDLockService() { - InternalDistributedSystem ds = this.sender.getCache().getInternalDistributedSystem(); + InternalDistributedSystem ds = sender.getCache().getInternalDistributedSystem(); String dlsName = getDLockServiceName(); - this.lockService = DistributedLockService.getServiceNamed(dlsName); - if (this.lockService == null) { - this.lockService = DLockService.create(dlsName, ds, true, true, true); + lockService = DistributedLockService.getServiceNamed(dlsName); + if (lockService == null) { + lockService = DLockService.create(dlsName, ds, true, true, true); } - Assert.assertTrue(this.lockService != null); if (logger.isDebugEnabled()) { - logger.debug("{}: Obtained DistributedLockService: {}", this, this.lockService); + logger.debug("{}: Obtained DistributedLockService: {}", this, lockService); } } public boolean volunteerForPrimary() { if (logger.isDebugEnabled()) { - logger.debug("Sender : {} is volunteering for Primary ", this.sender.getId()); + logger.debug("Sender : {} is volunteering for Primary ", sender.getId()); } if (advisePrimaryGatewaySender() == null) { @@ -335,16 +333,16 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { if (logger.isDebugEnabled()) { logger.debug( "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...", - this.sender); + sender); } return false; } if (logger.isDebugEnabled()) { logger.debug("Sender : {} no Primary available. So going to acquire distributed lock", - this.sender); + sender); } - this.lockService.lock(this.lockToken, 10000, -1); - return this.lockService.isHeldByCurrentThread(this.lockToken); + lockService.lock(lockToken, 10000, -1); + return lockService.isHeldByCurrentThread(lockToken); } return false; } @@ -357,11 +355,11 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { * @return boolean true if this eldest sender; false otherwise */ private boolean adviseEldestGatewaySender() { - Profile[] snapshot = this.profiles; + Profile[] snapshot = profiles; // sender with minimum startTime is eldest. Find out the minimum start time // of remote senders. - TreeSet<Long> senderStartTimes = new TreeSet<Long>(); + TreeSet<Long> senderStartTimes = new TreeSet<>(); for (Profile profile : snapshot) { GatewaySenderProfile sp = (GatewaySenderProfile) profile; if (!sp.isParallel && sp.isRunning) { @@ -375,96 +373,70 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { // it should give up // and allow existing running senders to volunteer return (senderStartTimes.isEmpty()) - || (this.sender.isRunning() && (this.sender.startTime <= senderStartTimes.first())); - } - - private InternalDistributedMember adviseEldestGatewaySenderNode() { - Profile[] snapshot = this.profiles; - - // sender with minimum startTime is eldest. Find out the minimum start time - // of remote senders. - InternalDistributedMember node = null; - GatewaySenderProfile eldestProfile = null; - for (Profile profile : snapshot) { - GatewaySenderProfile sp = (GatewaySenderProfile) profile; - if (!sp.isParallel && sp.isRunning) { - if (eldestProfile == null) { - eldestProfile = sp; - } - if (sp.startTime < eldestProfile.startTime) { - eldestProfile = sp; - } - } - } - - if (eldestProfile != null) { - node = eldestProfile.getDistributedMember(); - } - return node; + || (sender.isRunning() && (sender.startTime <= senderStartTimes.first())); } public void makePrimary() { - logger.info("{} : Starting as primary", this.sender); - AbstractGatewaySenderEventProcessor eventProcessor = this.sender.getEventProcessor(); + logger.info("{} : Starting as primary", sender); + AbstractGatewaySenderEventProcessor eventProcessor = sender.getEventProcessor(); if (eventProcessor != null) { eventProcessor.removeCacheListener(); } - logger.info("{} : Becoming primary gateway sender", this.sender); + logger.info("{} : Becoming primary gateway sender", sender); notifyAndBecomePrimary(); - new UpdateAttributesProcessor(this.sender).distribute(false); + new UpdateAttributesProcessor(sender).distribute(false); } public void notifyAndBecomePrimary() { - synchronized (this.primaryLock) { + synchronized (primaryLock) { setIsPrimary(true); notifyPrimaryLock(); } } public void notifyPrimaryLock() { - synchronized (this.primaryLock) { - this.primaryLock.notifyAll(); + synchronized (primaryLock) { + primaryLock.notifyAll(); } } public void makeSecondary() { if (logger.isDebugEnabled()) { logger.debug("{}: Did not obtain the lock on {}. Starting as secondary gateway sender.", - this.sender, this.lockToken); + sender, lockToken); } // Set primary flag to false logger.info( "{} starting as secondary because primary gateway sender is available on member :{}", - new Object[] {this.sender.getId(), advisePrimaryGatewaySender()}); - this.isPrimary = false; - new UpdateAttributesProcessor(this.sender).distribute(false); + new Object[] {sender.getId(), advisePrimaryGatewaySender()}); + isPrimary = false; + new UpdateAttributesProcessor(sender).distribute(false); } public void launchLockObtainingVolunteerThread() { String threadName = "Gateway Sender Primary Lock Acquisition Thread Volunteer"; - this.lockObtainingThread = new LoggingThread(threadName, () -> { - GatewaySenderAdvisor.this.sender.getLifeCycleLock().readLock().lock(); + lockObtainingThread = new LoggingThread(threadName, () -> { + sender.getLifeCycleLock().readLock().lock(); try { // Attempt to obtain the lock - if (!(GatewaySenderAdvisor.this.sender.isRunning())) { + if (!(sender.isRunning())) { return; } if (logger.isDebugEnabled()) { - logger.debug("{}: Obtaining the lock on {}", this, GatewaySenderAdvisor.this.lockToken); + logger.debug("{}: Obtaining the lock on {}", this, lockToken); } if (volunteerForPrimary()) { if (logger.isDebugEnabled()) { logger.debug("{}: Obtained the lock on {}", this, - GatewaySenderAdvisor.this.lockToken); + lockToken); } - logger.info("{} is becoming primary gateway Sender.", - GatewaySenderAdvisor.this); + logger.info("{} is becoming primary gateway Sender.", this); // As soon as the lock is obtained, set primary - GatewaySenderAdvisor.this.makePrimary(); + makePrimary(); } } catch (CancelException e) { // no action necessary @@ -472,14 +444,14 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { if (!sender.getStopper().isCancelInProgress()) { logger.fatal(String.format( "%s: The thread to obtain the failover lock was interrupted. This gateway sender will never become the primary.", - GatewaySenderAdvisor.this), + this), e); } } finally { - GatewaySenderAdvisor.this.sender.getLifeCycleLock().readLock().unlock(); + sender.getLifeCycleLock().readLock().unlock(); } }); - this.lockObtainingThread.start(); + lockObtainingThread.start(); } public void waitToBecomePrimary(AbstractGatewaySenderEventProcessor callingProcessor) @@ -487,10 +459,10 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { if (isPrimary()) { return; } - synchronized (this.primaryLock) { - logger.info("{} : Waiting to become primary gateway", this.sender.getId()); + synchronized (primaryLock) { + logger.info("{} : Waiting to become primary gateway", sender.getId()); while (!isPrimary()) { - this.primaryLock.wait(1000); + primaryLock.wait(1000); if (sender.getEventProcessor() != null && callingProcessor.isStopped()) { logger.info("The event processor is stopped, not to wait for being primary any more."); return; @@ -527,11 +499,11 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { public boolean manualStart; - public ArrayList<String> eventFiltersClassNames = new ArrayList<String>(); + public ArrayList<String> eventFiltersClassNames = new ArrayList<>(); - public ArrayList<String> transFiltersClassNames = new ArrayList<String>(); + public ArrayList<String> transFiltersClassNames = new ArrayList<>(); - public ArrayList<String> senderEventListenerClassNames = new ArrayList<String>(); + public ArrayList<String> senderEventListenerClassNames = new ArrayList<>(); public boolean isDiskSynchronous; @@ -553,47 +525,47 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { public void fromData(DataInput in, DeserializationContext context) throws IOException, ClassNotFoundException { fromDataPre_GEODE_1_14_0_0(in, context); - this.enforceThreadsConnectSameReceiver = in.readBoolean(); + enforceThreadsConnectSameReceiver = in.readBoolean(); } public void fromDataPre_GEODE_1_14_0_0(DataInput in, DeserializationContext context) throws IOException, ClassNotFoundException { super.fromData(in, context); - this.Id = DataSerializer.readString(in); - this.startTime = in.readLong(); - this.remoteDSId = in.readInt(); - this.isRunning = in.readBoolean(); - this.isPrimary = in.readBoolean(); - this.isParallel = in.readBoolean(); - this.isBatchConflationEnabled = in.readBoolean(); - this.isPersistenceEnabled = in.readBoolean(); - this.alertThreshold = in.readInt(); - this.manualStart = in.readBoolean(); - this.eventFiltersClassNames = DataSerializer.readArrayList(in); - this.transFiltersClassNames = DataSerializer.readArrayList(in); - this.senderEventListenerClassNames = DataSerializer.readArrayList(in); - this.isDiskSynchronous = in.readBoolean(); - this.dispatcherThreads = in.readInt(); + Id = DataSerializer.readString(in); + startTime = in.readLong(); + remoteDSId = in.readInt(); + isRunning = in.readBoolean(); + isPrimary = in.readBoolean(); + isParallel = in.readBoolean(); + isBatchConflationEnabled = in.readBoolean(); + isPersistenceEnabled = in.readBoolean(); + alertThreshold = in.readInt(); + manualStart = in.readBoolean(); + eventFiltersClassNames = DataSerializer.readArrayList(in); + transFiltersClassNames = DataSerializer.readArrayList(in); + senderEventListenerClassNames = DataSerializer.readArrayList(in); + isDiskSynchronous = in.readBoolean(); + dispatcherThreads = in.readInt(); if (StaticSerialization.getVersionForDataStream(in).isOlderThan(KnownVersion.GFE_90)) { Gateway.OrderPolicy oldOrderPolicy = DataSerializer.readObject(in); if (oldOrderPolicy != null) { if (oldOrderPolicy.name().equals(OrderPolicy.KEY.name())) { - this.orderPolicy = OrderPolicy.KEY; + orderPolicy = OrderPolicy.KEY; } else if (oldOrderPolicy.name().equals(OrderPolicy.THREAD.name())) { - this.orderPolicy = OrderPolicy.THREAD; + orderPolicy = OrderPolicy.THREAD; } else { - this.orderPolicy = OrderPolicy.PARTITION; + orderPolicy = OrderPolicy.PARTITION; } } else { - this.orderPolicy = null; + orderPolicy = null; } } else { - this.orderPolicy = DataSerializer.readObject(in); + orderPolicy = DataSerializer.readObject(in); } boolean serverLocationFound = DataSerializer.readPrimitiveBoolean(in); if (serverLocationFound) { - this.serverLocation = new ServerLocation(); - InternalDataSerializer.invokeFromData(this.serverLocation, in); + serverLocation = new ServerLocation(); + InternalDataSerializer.invokeFromData(serverLocation, in); } } @@ -623,8 +595,8 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { out.writeBoolean(isDiskSynchronous); out.writeInt(dispatcherThreads); if (StaticSerialization.getVersionForDataStream(out).isOlderThan(KnownVersion.GFE_90) - && this.orderPolicy != null) { - String orderPolicyName = this.orderPolicy.name(); + && orderPolicy != null) { + String orderPolicyName = orderPolicy.name(); if (orderPolicyName.equals(Gateway.OrderPolicy.KEY.name())) { DataSerializer.writeObject(Gateway.OrderPolicy.KEY, out); } else if (orderPolicyName.equals(Gateway.OrderPolicy.THREAD.name())) { @@ -635,7 +607,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } else { DataSerializer.writeObject(orderPolicy, out); } - boolean serverLocationFound = (this.serverLocation != null); + boolean serverLocationFound = (serverLocation != null); DataSerializer.writePrimitiveBoolean(serverLocationFound, out); if (serverLocationFound) { InternalDataSerializer.invokeToData(serverLocation, out); @@ -669,15 +641,15 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { @Override public void fillInToString(StringBuilder sb) { super.fillInToString(sb); - sb.append("; id=" + this.Id); - sb.append("; remoteDSName=" + this.remoteDSId); - sb.append("; isRunning=" + this.isRunning); - sb.append("; isPrimary=" + this.isPrimary); + sb.append("; id=").append(Id); + sb.append("; remoteDSName=").append(remoteDSId); + sb.append("; isRunning=").append(isRunning); + sb.append("; isPrimary=").append(isPrimary); } } public InternalDistributedMember advisePrimaryGatewaySender() { - Profile[] snapshot = this.profiles; + Profile[] snapshot = profiles; for (Profile profile : snapshot) { GatewaySenderProfile sp = (GatewaySenderProfile) profile; if (!sp.isParallel && sp.isPrimary) { @@ -693,7 +665,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { @Override public void close() { - new UpdateAttributesProcessor(this.getAdvisee(), true).distribute(false); + new UpdateAttributesProcessor(getAdvisee(), true).distribute(false); super.close(); } }