GEODE-1995: Removed ReliableMessageQueue, ReliableMessageQueueFactory, ReliableMessageQueueFactoryImpl and it's usage in the code from GemfireCacheImpl and DistributedRegion.
GEODE-1995: Addressing Review Comments. Removed ReliableDistributionData CacheOperationMessage does not required now to implement ReliableDistributionData, as it is removed Removed all reference of getOperations and getOperationCount AbstractRegion#handleReliableDistribution does not use ReliableDistributionData removed the same. Removed SendQueueOperation Removed sendQueue from DistributedRegion Cleanup LocalizedStrings Removed SEND_QUEUE_MESSAGE from DataSerializableFixedID and DSFIDFactory Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/5ec0d470 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/5ec0d470 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/5ec0d470 Branch: refs/heads/GEODE-4160-mockito Commit: 5ec0d470f25ec4cb68dd7c5c31791eecb90b2548 Parents: b5fd6b5 Author: adongre <[email protected]> Authored: Tue Dec 13 12:50:31 2016 +0530 Committer: adongre <[email protected]> Committed: Tue Feb 28 12:35:38 2017 +0530 ---------------------------------------------------------------------- .../org/apache/geode/internal/DSFIDFactory.java | 2 - .../geode/internal/DataSerializableFixedID.java | 1 - .../geode/internal/cache/AbstractRegion.java | 4 +- .../geode/internal/cache/DestroyOperation.java | 6 - .../cache/DistributedCacheOperation.java | 27 +- .../cache/DistributedPutAllOperation.java | 27 -- .../geode/internal/cache/DistributedRegion.java | 70 +----- .../cache/DistributedRemoveAllOperation.java | 15 -- .../geode/internal/cache/GemFireCacheImpl.java | 27 -- .../internal/cache/InvalidateOperation.java | 8 - .../cache/ReliableDistributionData.java | 41 ---- .../internal/cache/ReliableMessageQueue.java | 69 ------ .../cache/ReliableMessageQueueFactory.java | 41 ---- .../cache/ReliableMessageQueueFactoryImpl.java | 246 ------------------- .../internal/cache/SendQueueOperation.java | 190 -------------- .../geode/internal/cache/TXCommitMessage.java | 41 +--- .../cache/UpdateEntryVersionOperation.java | 6 - .../geode/internal/cache/UpdateOperation.java | 13 - .../cache/wan/serial/BatchDestroyOperation.java | 7 - .../geode/internal/i18n/LocalizedStrings.java | 6 - 20 files changed, 9 insertions(+), 838 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index 5c18639..c02dc47 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -251,7 +251,6 @@ import org.apache.geode.internal.cache.RemoteRegionOperation.RemoteRegionOperati import org.apache.geode.internal.cache.RemoteRemoveAllMessage; import org.apache.geode.internal.cache.RoleEventImpl; import org.apache.geode.internal.cache.SearchLoadAndWriteProcessor; -import org.apache.geode.internal.cache.SendQueueOperation.SendQueueMessage; import org.apache.geode.internal.cache.ServerPingMessage; import org.apache.geode.internal.cache.StateFlushOperation.StateMarkerMessage; import org.apache.geode.internal.cache.StateFlushOperation.StateStabilizationMessage; @@ -667,7 +666,6 @@ public final class DSFIDFactory implements DataSerializableFixedID { registerDSFID(CLEAR_REGION_MESSAGE, ClearRegionMessage.class); registerDSFID(TOMBSTONE_MESSAGE, TombstoneMessage.class); registerDSFID(INVALIDATE_REGION_MESSAGE, InvalidateRegionMessage.class); - registerDSFID(SEND_QUEUE_MESSAGE, SendQueueMessage.class); registerDSFID(STATE_MARKER_MESSAGE, StateMarkerMessage.class); registerDSFID(STATE_STABILIZATION_MESSAGE, StateStabilizationMessage.class); registerDSFID(STATE_STABILIZED_MESSAGE, StateStabilizedMessage.class); http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java index 4e45646..457af2f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java @@ -160,7 +160,6 @@ public interface DataSerializableFixedID extends SerializationVersions { public static final byte PUT_ALL_MESSAGE = -84; public static final byte CLEAR_REGION_MESSAGE = -83; public static final byte INVALIDATE_REGION_MESSAGE = -82; - public static final byte SEND_QUEUE_MESSAGE = -81; public static final byte STATE_MARKER_MESSAGE = -80; public static final byte STATE_STABILIZATION_MESSAGE = -79; public static final byte STATE_STABILIZED_MESSAGE = -78; http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java index fe77578..7dffee2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java @@ -1719,15 +1719,13 @@ public abstract class AbstractRegion implements Region, RegionAttributes, Attrib * Makes sure that the data was distributed to every required role. If it was not it either queues * the data for later delivery or it throws an exception. * - * @param data the data that needs to be reliably distributed * @param successfulRecipients the successful recipients * @throws RoleException if a required role was not sent the message and the LossAction is either * NO_ACCESS or LIMITED_ACCESS. * @since GemFire 5.0 * */ - protected void handleReliableDistribution(ReliableDistributionData data, - Set successfulRecipients) { + protected void handleReliableDistribution(Set successfulRecipients) { // do nothing by default } http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java index a3d9376..5132ec0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java @@ -199,12 +199,6 @@ public class DestroyOperation extends DistributedCacheOperation { } @Override - public List getOperations() { - return Collections.singletonList(new QueuedOperation(getOperation(), this.key, null, null, - DistributedCacheOperation.DESERIALIZATION_POLICY_NONE, this.callbackArg)); - } - - @Override public ConflationKey getConflationKey() { if (!super.regionAllowsConflation || getProcessorId() != 0) { // if the publisher's region attributes do not support conflation http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index e4658b4..bded899 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -365,13 +365,7 @@ public abstract class DistributedCacheOperation { if (!reliableOp || region.isNoDistributionOk()) { // nothing needs be done in this case } else { - // create the message so it can be passed to - // handleReliableDistribution - // for queuing - CacheOperationMessage msg = createMessage(); - initMessage(msg, null); - msg.setRecipients(recipients); // it is going to no one - region.handleReliableDistribution(msg, Collections.EMPTY_SET); + region.handleReliableDistribution(Collections.EMPTY_SET); } /** compute local client routing before waiting for an ack only for a bucket */ @@ -625,7 +619,7 @@ public abstract class DistributedCacheOperation { if (departedMembers != null) { successfulRecips.removeAll(departedMembers); } - region.handleReliableDistribution(msg, successfulRecips); + region.handleReliableDistribution(successfulRecips); } } @@ -864,7 +858,7 @@ public abstract class DistributedCacheOperation { } public static abstract class CacheOperationMessage extends SerialDistributionMessage - implements MessageWithReply, DirectReplyMessage, ReliableDistributionData, OldValueImporter { + implements MessageWithReply, DirectReplyMessage, OldValueImporter { protected final static short POSSIBLE_DUPLICATE_MASK = POS_DUP; protected final static short OLD_VALUE_MASK = DistributionMessage.UNRESERVED_FLAGS_START; @@ -1482,21 +1476,6 @@ public abstract class DistributedCacheOperation { return this.directAck; } - // //////////////////////////////////////////////////////////////////// - // ReliableDistributionData methods - // //////////////////////////////////////////////////////////////////// - - public int getOperationCount() { - return 1; - } - - public List getOperations() { - byte noDeserialize = DistributedCacheOperation.DESERIALIZATION_POLICY_NONE; - QueuedOperation qOp = - new QueuedOperation(getOperation(), null, null, null, noDeserialize, this.callbackArg); - return Collections.singletonList(qOp); - } - public void setSendDelta(boolean sendDelta) { this.sendDelta = sendDelta; } http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java index 4082a29..61542c4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java @@ -1261,11 +1261,6 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation { return s; } - @Override - public int getOperationCount() { - return this.putAllDataSize; - } - public ClientProxyMembershipID getContext() { return this.context; } @@ -1274,27 +1269,5 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation { return this.putAllData; } - @Override - public List getOperations() { - QueuedOperation[] ops = new QueuedOperation[getOperationCount()]; - for (int i = 0; i < ops.length; i++) { - PutAllEntryData entry = this.putAllData[i]; - byte[] valueBytes = null; - Object valueObj = null; - Object v = entry.getValue(); - byte deserializationPolicy; - if (v instanceof CachedDeserializable) { - deserializationPolicy = DESERIALIZATION_POLICY_LAZY; - valueBytes = ((CachedDeserializable) v).getSerializedValue(); - } else { - deserializationPolicy = DESERIALIZATION_POLICY_NONE; - valueBytes = (byte[]) v; - } - - ops[i] = new QueuedOperation(entry.getOp(), entry.getKey(), valueBytes, valueObj, - deserializationPolicy, this.callbackArg); - } - return Arrays.asList(ops); - } } } http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 1aaf6c3..cc6ccf7 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -110,13 +110,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA private final boolean requiresReliabilityCheck; /** - * Provides a queue for reliable message delivery - * - * @since GemFire 5.0 - */ - protected final ReliableMessageQueue rmq; - - /** * Latch that is opened after initialization waits for required roles up to the * <a href="DistributedSystem#member-timeout">member-timeout </a>. */ @@ -183,18 +176,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA this.requiresReliabilityCheck = setRequiresReliabilityCheck; - { - ReliableMessageQueue tmp = null; - if (this.requiresReliabilityCheck) { - // if - // (attrs.getMembershipAttributes().getLossAction().isAllAccessWithQueuing()) - // { - // tmp = cache.getReliableMessageQueueFactory().create(this); - // } - } - this.rmq = tmp; - } - if (internalRegionArgs.isUsedForPartitionedRegionBucket()) { this.persistenceAdvisor = internalRegionArgs.getPersistenceAdvisor(); } else if (this.allowsPersistence()) { @@ -567,14 +548,12 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } @Override - protected void handleReliableDistribution(ReliableDistributionData data, - Set successfulRecipients) { - handleReliableDistribution(data, successfulRecipients, Collections.EMPTY_SET, - Collections.EMPTY_SET); + protected void handleReliableDistribution(Set successfulRecipients) { + handleReliableDistribution(successfulRecipients, Collections.EMPTY_SET, Collections.EMPTY_SET); } - protected void handleReliableDistribution(ReliableDistributionData data, Set successfulRecipients, - Set otherRecipients1, Set otherRecipients2) { + protected void handleReliableDistribution(Set successfulRecipients, Set otherRecipients1, + Set otherRecipients2) { if (this.requiresReliabilityCheck) { MembershipAttributes ra = getMembershipAttributes(); Set recipients = successfulRecipients; @@ -2140,19 +2119,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA return getSystem().getDistributionManager().getConfig(); } - /** - * Sends a list of queued messages to members playing a specified role - * - * @param list List of QueuedOperation instances to send. Any messages sent will be removed from - * this list - * @param role the role that a recipient must be playing - * @return true if at least one message made it to at least one guy playing the role - */ - boolean sendQueue(List list, Role role) { - SendQueueOperation op = new SendQueueOperation(getDistributionManager(), this, list, role); - return op.distribute(); - } - /* * @see SearchLoadAndWriteProcessor#initialize(LocalRegion, Object, Object) */ @@ -2521,10 +2487,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA this.getFullPath()), ex); } } - if (this.rmq != null) { - this.rmq.close(); - } - // Fix for #48066 - make sure that region operations are completely // distributed to peers before destroying the region. long timeout = 1000L * getCache().getDistributedSystem().getConfig().getAckWaitThreshold(); @@ -2628,9 +2590,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA logger.warn("postDestroyRegion: encountered cancellation", e); } - if (this.rmq != null && destroyDiskRegion) { - this.rmq.destroy(); - } } @Override @@ -3601,27 +3560,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA newlyAcquiredRoles = new HashSet(missingRequiredRoles); newlyAcquiredRoles.retainAll(roles); // find the intersection if (!newlyAcquiredRoles.isEmpty()) { - if (DistributedRegion.this.rmq != null) { - Iterator it = newlyAcquiredRoles.iterator(); - final DM dm = getDistributionManager(); - while (it.hasNext()) { - getCache().getCancelCriterion().checkCancelInProgress(null); - final Role role = (Role) it.next(); - try { - // do this in the waiting pool to make it async - // @todo darrel/klund: add a single serial executor for - // queue flush - dm.getWaitingThreadPool().execute(new Runnable() { - public void run() { - DistributedRegion.this.rmq.roleReady(role); - } - }); - break; - } catch (RejectedExecutionException ex) { - throw ex; - } - } // while - } missingRequiredRoles.removeAll(newlyAcquiredRoles); if (this.members == null && missingRequiredRoles.isEmpty()) { isMissingRequiredRoles = false; http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java index 4a86167..0c13b59 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java @@ -1042,11 +1042,6 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO return s; } - @Override - public int getOperationCount() { - return this.removeAllDataSize; - } - public ClientProxyMembershipID getContext() { return this.context; } @@ -1055,15 +1050,5 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO return this.removeAllData; } - @Override - public List getOperations() { - QueuedOperation[] ops = new QueuedOperation[getOperationCount()]; - for (int i = 0; i < ops.length; i++) { - RemoveAllEntryData entry = this.removeAllData[i]; - ops[i] = new QueuedOperation(entry.getOp(), entry.getKey(), null, null, (byte) 0, - this.callbackArg); - } - return Arrays.asList(ops); - } } } http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index 6e374ec..66f1a4a 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -878,8 +878,6 @@ public class GemFireCacheImpl this.cqService = CqServiceProvider.create(this); - this.rmqFactory = new ReliableMessageQueueFactoryImpl(); - // Create the CacheStatistics this.cachePerfStats = new CachePerfStats(system); CachePerfStats.enableClockStats = this.system.getConfig().getEnableTimeStatistics(); @@ -2327,17 +2325,6 @@ public class GemFireCacheImpl PoolManager.close(keepalive); if (isDebugEnabled) { - logger.debug("{}: closing reliable message queue...", this); - } - try { - getReliableMessageQueueFactory().close(true); - } catch (CancelException e) { - if (isDebugEnabled) { - logger.debug("Ignored cancellation while closing reliable message queue", e); - } - } - - if (isDebugEnabled) { logger.debug("{}: notifying admins of close...", this); } try { @@ -4497,22 +4484,8 @@ public class GemFireCacheImpl PoolManagerImpl.readyForEvents(this.system, false); } - /** - * This cache's reliable message queue factory. Should always have an instance of it. - */ - private final ReliableMessageQueueFactory rmqFactory; - private List<File> backupFiles = Collections.emptyList(); - /** - * Returns this cache's ReliableMessageQueueFactory. - * - * @since GemFire 5.0 - */ - public ReliableMessageQueueFactory getReliableMessageQueueFactory() { - return this.rmqFactory; - } - public InternalResourceManager getResourceManager() { return getResourceManager(true); } http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidateOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidateOperation.java index d6d38ef..eceb194 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidateOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidateOperation.java @@ -146,14 +146,6 @@ public class InvalidateOperation extends DistributedCacheOperation { } @Override - public List getOperations() { - byte deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_NONE; - QueuedOperation qOp = new QueuedOperation(getOperation(), this.key, null, null, - deserializationPolicy, this.callbackArg); - return Collections.singletonList(qOp); - } - - @Override public ConflationKey getConflationKey() { if (!super.regionAllowsConflation || getProcessorId() != 0) { // if the publisher's region attributes do not support conflation http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableDistributionData.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableDistributionData.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableDistributionData.java deleted file mode 100644 index 5c635ee..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableDistributionData.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -import java.util.*; - -/** - * Represents one or more distributed operations that can be reliably distributed. This interface - * allows the data to be queued and checked for reliable distribution. - * - * @since GemFire 5.0 - */ -public interface ReliableDistributionData { - // /** - // * Returns a set of the recipients that this data was sent to successfully. - // * @param processor the reply processor used for responses to this data. - // */ - // public Set getSuccessfulRecipients(ReliableReplyProcessor21 processor); - /** - * Returns the number of logical operations this data contains. - */ - public int getOperationCount(); - - /** - * Returns a list of QueuedOperation instances one for each logical operation done by this data - * instance. - */ - public List getOperations(); -} http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueue.java deleted file mode 100644 index 55c1039..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueue.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -import org.apache.geode.distributed.Role; - -import java.util.Set; - -/** - * A reliable message queue is used by a DistributedRegion to queue up distributed operations for - * required roles that are not present at the time the operation is done. Instances of this - * interface can be obtained from {@link ReliableMessageQueueFactory} which can be obtained from - * {@link GemFireCacheImpl#getReliableMessageQueueFactory}. - * - * @since GemFire 5.0 - */ -public interface ReliableMessageQueue { - /** - * Returns the region this queue belongs to. - */ - public DistributedRegion getRegion(); - - /** - * Adds a message to the queue to be sent to the list of roles. - * - * @param data the actual data that describes the operation to enqueue - * @param roles the roles that need to receive this message. - */ - public void add(ReliableDistributionData data, Set roles); - - /** - * Gets the roles that this queue currently has messages for. - * - * @return a set of {link Role}s that currently have queued messages. <code>null</code> is - * returned if no messages are queued. - */ - public Set getQueuingRoles(); - - /** - * Attempts to send any messages that have been added for the given role to all members that are - * currently playing that role. - * - * @param role the role whose queued messages should be sent - * @return true if send was successful; false if it was not and the messages are still queued. - */ - public boolean roleReady(Role role); - - /** - * Removes all the data in this queue causing it to never be sent. - */ - public void destroy(); - - /** - * Closes this queue. This frees up any memory used by the queue but its persistent data remains. - */ - public void close(); -} http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueueFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueueFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueueFactory.java deleted file mode 100644 index 39da937..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueueFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -/** - * Represents a factory for instances of {@link ReliableMessageQueue}. The Cache will have an - * instance of the factory that can be obtained from - * {@link GemFireCacheImpl#getReliableMessageQueueFactory}. - * - * @since GemFire 5.0 - */ -public interface ReliableMessageQueueFactory { - /** - * Creates an instance of {@link ReliableMessageQueue} given the region that the queue will be on. - * - * @param region the distributed region that the created queue will service. - * @return the created queue - */ - public ReliableMessageQueue create(DistributedRegion region); - - /** - * Cleanly shutdown this factory flushing any persistent data to disk. - * - * @param force true if close should always work - * @throws IllegalStateException if <code>force</code> is false and the factory is still in use. - * The factory is in use as long as a queue it produced remains unclosed. - */ - public void close(boolean force); -} http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueueFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueueFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueueFactoryImpl.java deleted file mode 100644 index 282a0e1..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ReliableMessageQueueFactoryImpl.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -import org.apache.geode.distributed.Role; -import org.apache.geode.internal.i18n.LocalizedStrings; - -import java.util.*; - -/** - * Implementation of {@link ReliableMessageQueueFactory} - * - * @since GemFire 5.0 - */ -public class ReliableMessageQueueFactoryImpl implements ReliableMessageQueueFactory { - private boolean closed; - - /** - * Create a factory given its persistence attributes. - */ - public ReliableMessageQueueFactoryImpl() { - this.closed = false; - } - - /** - * Contains all the unclosed queues that have been created by this factory. - */ - private final ArrayList queues = new ArrayList(); - - public ReliableMessageQueue create(DistributedRegion region) { - if (this.closed) { - throw new IllegalStateException( - LocalizedStrings.ReliableMessageQueueFactoryImpl_RELIABLE_MESSAGE_QUEUE_IS_CLOSED - .toLocalizedString()); - } - synchronized (this.queues) { - Queue q = new Queue(region); - this.queues.add(q); - return q; - } - } - - public void close(boolean force) { - // @todo darrel: nyi - if (!force) { - synchronized (this.queues) { - if (!this.queues.isEmpty()) { - throw new IllegalStateException( - LocalizedStrings.ReliableMessageQueueFactoryImpl_REGIONS_WITH_MESSAGE_QUEUING_ALREADY_EXIST - .toLocalizedString()); - } - } - } - this.closed = true; - } - - /** - * Maps DistributedRegion keys to QueuedRegionData values - */ - private final IdentityHashMap regionMap = new IdentityHashMap(128); - - /** - * Adds data in the specified region to be sent to the specified roles - */ - protected void add(DistributedRegion r, ReliableDistributionData data, Set roles) { - QueuedRegionData qrd = null; - synchronized (this.regionMap) { - qrd = (QueuedRegionData) this.regionMap.get(r); - } - qrd.add(r, data, roles); - r.getCachePerfStats().incReliableQueuedOps(data.getOperationCount() * roles.size()); - } - - public Set getQueuingRoles(DistributedRegion r) { - QueuedRegionData qrd = null; - synchronized (this.regionMap) { - qrd = (QueuedRegionData) this.regionMap.get(r); - } - return qrd.getQueuingRoles(r); - } - - protected boolean roleReady(DistributedRegion r, Role role) { - QueuedRegionData qrd = null; - synchronized (this.regionMap) { - qrd = (QueuedRegionData) this.regionMap.get(r); - } - return qrd.roleReady(r, role); - } - - /** - * Initializes data queuing for the given region - */ - protected void init(DistributedRegion r) { - QueuedRegionData qrd = new QueuedRegionData(); - synchronized (this.regionMap) { - Object old = this.regionMap.put(r, qrd); - if (old != null) { - throw new IllegalStateException( - LocalizedStrings.ReliableMessageQueueFactoryImpl_UNEXPECTED_QUEUEDREGIONDATA_0_FOR_REGION_1 - .toLocalizedString(new Object[] {old, r})); - } - } - } - - /** - * Removes any data queued for the given region - */ - protected void destroy(DistributedRegion r) { - QueuedRegionData qrd = null; - synchronized (this.regionMap) { - qrd = (QueuedRegionData) this.regionMap.remove(r); - } - if (qrd != null) { - qrd.destroy(r); - } - } - - /** - * Removes a previously created queue from this factory. - */ - protected void removeQueue(Queue q) { - synchronized (this.queues) { - this.queues.remove(q); - } - } - - /** - * Implements ReliableMessageQueue. - * - * @since GemFire 5.0 - */ - public class Queue implements ReliableMessageQueue { - private final DistributedRegion r; - - Queue(DistributedRegion r) { - this.r = r; - init(this.r); - } - - public DistributedRegion getRegion() { - return this.r; - } - - public void add(ReliableDistributionData data, Set roles) { - ReliableMessageQueueFactoryImpl.this.add(this.r, data, roles); - } - - public Set getQueuingRoles() { - return ReliableMessageQueueFactoryImpl.this.getQueuingRoles(this.r); - } - - public boolean roleReady(Role role) { - return ReliableMessageQueueFactoryImpl.this.roleReady(this.r, role); - } - - public void destroy() { - ReliableMessageQueueFactoryImpl.this.destroy(this.r); - } - - public void close() { - removeQueue(this); - } - } - /** - * Used to organize all the queued data for a region. - */ - static protected class QueuedRegionData { - /** - * Maps Role keys to lists of ReliableDistributionData - */ - private final HashMap roleMap = new HashMap(); - - /** - * Adds data in the specified region to be sent to the specified roles - */ - protected void add(DistributedRegion r, ReliableDistributionData data, Set roles) { - synchronized (this) { - Iterator it = roles.iterator(); - while (it.hasNext()) { - Role role = (Role) it.next(); - List l = (List) this.roleMap.get(role); - if (l == null) { - l = new ArrayList(); - this.roleMap.put(role, l); - } - l.addAll(data.getOperations()); - } - } - } - - protected Set getQueuingRoles(DistributedRegion r) { - Set result = null; - synchronized (this) { - Iterator it = this.roleMap.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry me = (Map.Entry) it.next(); - List l = (List) me.getValue(); - if (l != null && !l.isEmpty()) { - // found a role with a non-empty list of operations so add to result - if (result == null) { - result = new HashSet(); - } - result.add(me.getKey()); - } - } - } - return result; - } - - protected boolean roleReady(DistributedRegion r, Role role) { - List l = null; - synchronized (this) { - l = (List) this.roleMap.get(role); - } - if (l != null) { - // @todo darrel: do this in a background thread - while (!l.isEmpty()) { - if (!r.sendQueue(l, role)) { - // Couldn't send the last message so stop and return false - return false; - } - } - } - return true; - } - - /** - * Blows away all the data in this object. - */ - public void destroy(DistributedRegion r) { - // @todo darrel: nothing needs doing until we use disk - } - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/SendQueueOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SendQueueOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SendQueueOperation.java deleted file mode 100644 index a72dee9..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/SendQueueOperation.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.cache; - -import java.util.*; -import java.io.*; -import org.apache.geode.*; -import org.apache.geode.cache.*; -import org.apache.geode.distributed.*; -import org.apache.geode.distributed.internal.*; - - -/** - * Sends a chunk of queued messages to everyone currently playing a role. - * - * @since GemFire 5.0 - * - */ -public class SendQueueOperation { - // private ReplyProcessor21 processor = null; - private DM dm; - private DistributedRegion r; - private List l; - private Role role; - - SendQueueOperation(DM dm, DistributedRegion r, List l, Role role) { - this.dm = dm; - this.r = r; - this.l = l; - this.role = role; - } - - /** - * Returns true if distribution successful. Also modifies message list by removing messages sent - * to the required role. - */ - boolean distribute() { - CacheDistributionAdvisor advisor = this.r.getCacheDistributionAdvisor(); - Set recipients = advisor.adviseCacheOpRole(this.role); - if (recipients.isEmpty()) { - return false; - } - ReplyProcessor21 processor = new ReplyProcessor21(this.dm, recipients); - // @todo darrel: make this a reliable one - SendQueueMessage msg = new SendQueueMessage(); - msg.setRecipients(recipients); - msg.setRegionPath(this.r.getFullPath()); - msg.setProcessorId(processor.getProcessorId()); - msg.setOperations(this.l); - dm.putOutgoing(msg); - try { - processor.waitForReplies(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - // It's OK to keep going, no significant work below. - } catch (ReplyException ex) { - ex.handleAsUnexpected(); - } - if (msg.getSuccessfulRecipients().isEmpty()) { - return false; - } - // @todo darrel: now remove sent items from the list - this.r.getCachePerfStats().incReliableQueuedOps(-l.size()); - this.l.clear(); - return true; - } - - /** - * A batch of queued messages. Once they are processed on the other side an ack is sent. - */ - public static final class SendQueueMessage extends SerialDistributionMessage - implements MessageWithReply { - private int processorId; - private String regionPath; - /** - * List of QueuedOperation instances - */ - private List ops; - - @Override - public int getProcessorId() { - return this.processorId; - } - - public void setProcessorId(int id) { - this.processorId = id; - } - - public String getRegionPath() { - return this.regionPath; - } - - public void setRegionPath(String rp) { - this.regionPath = rp; - } - - public void setOperations(List l) { - this.ops = l; - } - - @Override - protected void process(DistributionManager dm) { - ReplyException rex = null; - boolean ignored = false; - try { - GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem()); - final LocalRegion lclRgn = gfc.getRegionByPathForProcessing(this.regionPath); - if (lclRgn != null) { - lclRgn.waitOnInitialization(); - final long lastMod = gfc.cacheTimeMillis(); - Iterator it = this.ops.iterator(); - while (it.hasNext()) { - QueuedOperation op = (QueuedOperation) it.next(); - op.process(lclRgn, getSender(), lastMod); - } - } else { - ignored = true; - } - } catch (RegionDestroyedException e) { - ignored = true; - } catch (CancelException e) { - ignored = true; - } finally { - ReplyMessage.send(getSender(), this.processorId, rex, dm, ignored, false, false); - } - } - - public int getDSFID() { - return SEND_QUEUE_MESSAGE; - } - - @Override - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - super.fromData(in); - this.regionPath = DataSerializer.readString(in); - this.processorId = in.readInt(); - { - int opCount = in.readInt(); - QueuedOperation[] ops = new QueuedOperation[opCount]; - for (int i = 0; i < opCount; i++) { - ops[i] = QueuedOperation.createFromData(in); - } - this.ops = Arrays.asList(ops); - } - } - - @Override - public void toData(DataOutput out) throws IOException { - super.toData(out); - DataSerializer.writeString(this.regionPath, out); - out.writeInt(this.processorId); - { - int opCount = this.ops.size(); - out.writeInt(opCount); - for (int i = 0; i < opCount; i++) { - QueuedOperation op = (QueuedOperation) this.ops.get(i); - op.toData(out); - } - } - } - - @Override - public String toString() { - StringBuffer buff = new StringBuffer(); - buff.append(getClass().getName()); - buff.append("(region path='"); // make sure this is the first one - buff.append(this.regionPath); - buff.append("'"); - buff.append("; processorId="); - buff.append(this.processorId); - buff.append("; queuedOps="); - buff.append(this.ops.size()); - buff.append(")"); - return buff.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java index e325bf1..7c2a3e3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java @@ -526,19 +526,7 @@ public class TXCommitMessage extends PooledDistributionMessage successfulRecipients.removeAll(regionDestroyedMembers); try { - ReliableDistributionData rdd = new ReliableDistributionData() { - // public Set getSuccessfulRecipients(ReliableReplyProcessor21 processor) { - // return successfulRecipients; - // } - public int getOperationCount() { - return rc.getOperationCount(); - } - - public List getOperations() { - return rc.getOperations(); - } - }; - rc.r.handleReliableDistribution(rdd, successfulRecipients); + rc.r.handleReliableDistribution(successfulRecipients); } catch (RegionDistributionException e) { if (regionDistributionExceptions == Collections.EMPTY_SET) { regionDistributionExceptions = new HashSet(); @@ -1408,19 +1396,6 @@ public class TXCommitMessage extends PooledDistributionMessage return this.opKeys == null; } - /** - * Returns the number of operations this region commit will do - * - * @since GemFire 5.0 - */ - int getOperationCount() { - int result = 0; - if (!isEmpty()) { - result = this.opKeys.size(); - } - return result; - } - boolean needsAck() { return this.r.getScope().isDistributedAck(); } @@ -1481,20 +1456,6 @@ public class TXCommitMessage extends PooledDistributionMessage return result.toString(); } - /** - * Returns a list of QueuedOperation instances for reliable distribution - * - * @since GemFire 5.0 - */ - List getOperations() { - QueuedOperation[] ops = new QueuedOperation[getOperationCount()]; - for (int i = 0; i < ops.length; i++) { - TXEntryState es = (TXEntryState) this.opEntries.get(i); - ops[i] = es.toFarSideQueuedOp(this.opKeys.get(i)); - } - return Arrays.asList(ops); - } - private void basicToData(DataOutput out) throws IOException { if (this.r != null) { DataSerializer.writeString(this.r.getFullPath(), out); http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateEntryVersionOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateEntryVersionOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateEntryVersionOperation.java index f534a6e..f82f0ce 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateEntryVersionOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateEntryVersionOperation.java @@ -104,12 +104,6 @@ public class UpdateEntryVersionOperation extends DistributedCacheOperation { } @Override - public List getOperations() { - return Collections.singletonList(new QueuedOperation(getOperation(), this.key, null, null, - DistributedCacheOperation.DESERIALIZATION_POLICY_NONE, this.callbackArg)); - } - - @Override protected void appendFields(StringBuilder buff) { super.appendFields(buff); buff.append("; key="); http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateOperation.java index 09ce587..1afae86 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateOperation.java @@ -445,19 +445,6 @@ public class UpdateOperation extends AbstractUpdateOperation { } } - @Override - public List getOperations() { - byte[] valueBytes = null; - Object valueObj = null; - if (this.newValueObj != null) { - valueBytes = EntryEventImpl.serialize(this.newValueObj); - } else { - valueBytes = this.newValue; - } - return Collections.singletonList(new QueuedOperation(getOperation(), this.key, valueBytes, - valueObj, this.deserializationPolicy, this.callbackArg)); - } - public boolean hasBridgeContext() { if (this.event != null) { return this.event.getContext() != null; http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java index 5da18a8..a368b60 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java @@ -37,7 +37,6 @@ import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.InternalCacheEvent; -import org.apache.geode.internal.cache.QueuedOperation; import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; @@ -210,12 +209,6 @@ public class BatchDestroyOperation extends DistributedCacheOperation { } @Override - public List getOperations() { - return Collections.singletonList(new QueuedOperation(getOperation(), this.key, null, null, - DistributedCacheOperation.DESERIALIZATION_POLICY_NONE, this.callbackArg)); - } - - @Override public ConflationKey getConflationKey() { if (!super.regionAllowsConflation || getProcessorId() != 0) { // if the publisher's region attributes do not support conflation http://git-wip-us.apache.org/repos/asf/geode/blob/5ec0d470/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java index fa63437..47ae0c5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java +++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java @@ -3604,12 +3604,6 @@ public class LocalizedStrings { public static final StringId RegisterInterest_CACHECLIENTPROXY_FOR_THIS_CLIENT_IS_NO_LONGER_ON_THE_SERVER_SO_REGISTERINTEREST_OPERATION_IS_UNSUCCESSFUL = new StringId(3176, "CacheClientProxy for this client is no longer on the server , so registerInterest operation is unsuccessful"); - public static final StringId ReliableMessageQueueFactoryImpl_REGIONS_WITH_MESSAGE_QUEUING_ALREADY_EXIST = - new StringId(3177, "Regions with message queuing already exist"); - public static final StringId ReliableMessageQueueFactoryImpl_RELIABLE_MESSAGE_QUEUE_IS_CLOSED = - new StringId(3178, "reliable message queue is closed"); - public static final StringId ReliableMessageQueueFactoryImpl_UNEXPECTED_QUEUEDREGIONDATA_0_FOR_REGION_1 = - new StringId(3179, "unexpected QueuedRegionData {0} for region {1}"); public static final StringId RemoteBridgeServer_A_REMOTE_BRIDGESERVER_CANNOT_BE_STARTED = new StringId(3180, "A remote BridgeServer cannot be started."); public static final StringId RemoteBridgeServer_A_REMOTE_BRIDGESERVER_CANNOT_BE_STOPPED =
