GEODE-37 change package name from com.gemstone.gemfire (for ./geode-rebalancer/src/main/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-rebalancer/src/main/java/org/apache/geode)
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/401ef76c Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/401ef76c Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/401ef76c Branch: refs/heads/feature/GEODE-37_2 Commit: 401ef76cfb6087c12a80d7e89a18384e13d5fd8a Parents: 90d1187 Author: Hitesh Khamesra <[email protected]> Authored: Tue Sep 13 15:43:20 2016 -0700 Committer: Hitesh Khamesra <[email protected]> Committed: Tue Sep 13 15:43:20 2016 -0700 ---------------------------------------------------------------------- .../gemfire/cache/util/AutoBalancer.java | 553 ------------------- .../apache/geode/cache/util/AutoBalancer.java | 553 +++++++++++++++++++ 2 files changed, 553 insertions(+), 553 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/401ef76c/geode-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java ---------------------------------------------------------------------- diff --git a/geode-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java b/geode-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java deleted file mode 100644 index a6488f9..0000000 --- a/geode-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java +++ /dev/null @@ -1,553 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.util; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.logging.log4j.Logger; -import org.quartz.CronExpression; -import org.springframework.scheduling.support.CronSequenceGenerator; - -import com.gemstone.gemfire.GemFireConfigException; -import com.gemstone.gemfire.cache.CacheClosedException; -import com.gemstone.gemfire.cache.Declarable; -import com.gemstone.gemfire.cache.GemFireCache; -import com.gemstone.gemfire.cache.control.RebalanceOperation; -import com.gemstone.gemfire.cache.control.RebalanceResults; -import com.gemstone.gemfire.cache.partition.PartitionMemberInfo; -import com.gemstone.gemfire.distributed.DistributedLockService; -import com.gemstone.gemfire.distributed.internal.locks.DLockService; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.partitioned.InternalPRInfo; -import com.gemstone.gemfire.internal.cache.partitioned.LoadProbe; -import com.gemstone.gemfire.internal.logging.LogService; - -/** - * Re-balancing operation relocates data from heavily loaded members to lightly - * loaded members. In most cases, the decision to re-balance is based on the - * size of the member and a few other statistics. {@link AutoBalancer} monitors - * these statistics and if necessary, triggers a re-balancing request. - * Auto-Balancing is expected to prevent failures and data loss. - * - * <P> - * This implementation is based on {@code Initializer} implementation. By - * default auto-balancing is disabled. A user needs to configure - * {@link AutoBalancer} during cache initialization - * {@link GemFireCache#getInitializer()} - * - * <P> - * In a cluster only one member owns auto-balancing responsibility. This is - * achieved by grabbing a distributed lock. In case of a failure a new member - * will grab the lock and manage auto balancing. - * - * <P> - * {@link AutoBalancer} can be controlled using the following configurations - * <OL> - * <LI>{@link AutoBalancer#SCHEDULE} - * <LI>TBD THRESHOLDS - * - */ -public class AutoBalancer implements Declarable { - /** - * Use this configuration to manage out-of-balance audit frequency. If the - * auditor finds the system to be out-of-balance, it will trigger - * re-balancing. Any valid cron string is accepted. The sub-expressions - * represent the following: - * <OL> - * <LI>Seconds - * <LI>Minutes - * <LI>Hours - * <LI>Day-of-Month - * <LI>Month - * <LI>Day-of-Week - * <LI>Year (optional field) - * - * <P> - * For. e.g. {@code 0 0 * * * ?} for auditing the system every hour - */ - public static final String SCHEDULE = "schedule"; - - /** - * Use this configuration to manage re-balance invocation. Rebalance operation - * will be triggered if the total number of bytes rebalance operation may move - * is more than this threshold, in percentage of the total data size. - * <P> - * Default value {@link #DEFAULT_SIZE_THRESHOLD_PERCENT} - */ - public static final String SIZE_THRESHOLD_PERCENT = "size-threshold-percent"; - - /** - * Default value of {@link AutoBalancer#SIZE_THRESHOLD_PERCENT}. If 10% of - * data is misplaced, its a good time to redistribute buckets - */ - public static final int DEFAULT_SIZE_THRESHOLD_PERCENT = 10; - - /** - * In the initial data load phases, - * {@link AutoBalancer#SIZE_THRESHOLD_PERCENT} based rebalance invocation may - * be unnecessary. Rebalance should not be triggered if the total data size - * managed by cluster is too small. Rebalance operation will be triggered if - * the total number of bytes rebalance operation may move is more than this - * number of bytes. - * <P> - * Default value {@link #DEFAULT_MINIMUM_SIZE} - */ - public static final String MINIMUM_SIZE = "minimum-size"; - - /** - * Default value of {@link AutoBalancer#MINIMUM_SIZE}. In the initial data - * load phases, {@link AutoBalancer#SIZE_THRESHOLD_PERCENT} based rebalance - * invocation may be unnecessary. Do not rebalance if the data to be moved is - * less than 100MB - */ - public static final int DEFAULT_MINIMUM_SIZE = 100 * 1024 * 1024; - - /** - * Name of the DistributedLockService that {@link AutoBalancer} will use to - * guard against concurrent maintenance activity - */ - public static final String AUTO_BALANCER_LOCK_SERVICE_NAME = "__AUTO_B"; - - public static final Object AUTO_BALANCER_LOCK = "__AUTO_B_LOCK"; - - private final AuditScheduler scheduler; - private final OOBAuditor auditor; - private final TimeProvider clock; - private final CacheOperationFacade cacheFacade; - - private static final Logger logger = LogService.getLogger(); - - public AutoBalancer() { - this(null, null, null, null); - } - - public AutoBalancer(AuditScheduler scheduler, OOBAuditor auditor, TimeProvider clock, - CacheOperationFacade cacheFacade) { - this.cacheFacade = cacheFacade == null ? new GeodeCacheFacade() : cacheFacade; - this.scheduler = scheduler == null ? new CronScheduler() : scheduler; - this.auditor = auditor == null ? new SizeBasedOOBAuditor(this.cacheFacade) : auditor; - this.clock = clock == null ? new SystemClockTimeProvider() : clock; - } - - @Override - public void init(Properties props) { - if (logger.isDebugEnabled()) { - logger.debug("Initializing " + this.getClass().getSimpleName() + " with " + props); - } - - auditor.init(props); - - String schedule = null; - if (props != null) { - schedule = props.getProperty(SCHEDULE); - } - scheduler.init(schedule); - } - - /** - * Invokes audit triggers based on a cron schedule. - * <OL> - * <LI>computes delay = next slot - current time - * <LI>schedules a out-of-balance audit task to be started after delay - * computed earlier - * <LI>once the audit task completes, it repeats delay computation and task - * submission - */ - private class CronScheduler implements AuditScheduler { - final ScheduledExecutorService trigger; - CronSequenceGenerator generator; - - CronScheduler() { - trigger = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r, "AutoBalancer"); - thread.setDaemon(true); - return thread; - } - }); - } - - @Override - public void init(String schedule) { - if (logger.isDebugEnabled()) { - logger.debug("Initializing " + this.getClass().getSimpleName() + " with " + schedule); - } - - if (schedule == null || schedule.isEmpty()) { - throw new GemFireConfigException("Missing configuration: " + SCHEDULE); - } - if (!CronExpression.isValidExpression(schedule)) { - throw new GemFireConfigException("Invalid schedule: " + schedule); - } - generator = new CronSequenceGenerator(schedule); - - submitNext(); - } - - private void submitNext() { - long currentTime = clock.currentTimeMillis(); - Date nextSchedule = generator.next(new Date(currentTime)); - long delay = nextSchedule.getTime() - currentTime; - - if (logger.isDebugEnabled()) { - logger.debug("Now={}, next audit time={}, delay={} ms", new Date(currentTime), nextSchedule, delay); - } - - trigger.schedule(new Runnable() { - @Override - public void run() { - try { - auditor.execute(); - } catch (CacheClosedException e) { - logger.warn("Cache closed while attempting to rebalance the cluster. Abort future jobs", e); - return; - } catch (Exception e) { - logger.warn("Error while executing out-of-balance audit.", e); - } - submitNext(); - } - }, delay, TimeUnit.MILLISECONDS); - } - - @Override - public void destroy() { - trigger.shutdownNow(); - } - } - - /** - * Queries member statistics and health to determine if a re-balance operation - * is needed - * <OL> - * <LI>acquires distributed lock - * <LI>queries member health - * <LI>updates auto-balance stat - * <LI>release lock - */ - static class SizeBasedOOBAuditor implements OOBAuditor { - private int sizeThreshold = DEFAULT_SIZE_THRESHOLD_PERCENT; - private int sizeMinimum = DEFAULT_MINIMUM_SIZE; - - final CacheOperationFacade cache; - - public SizeBasedOOBAuditor(CacheOperationFacade cache) { - this.cache = cache; - } - - @Override - public void init(Properties props) { - if (logger.isDebugEnabled()) { - logger.debug("Initializing " + this.getClass().getSimpleName()); - } - - if (props != null) { - if (props.getProperty(SIZE_THRESHOLD_PERCENT) != null) { - sizeThreshold = Integer.valueOf(props.getProperty(SIZE_THRESHOLD_PERCENT)); - if (sizeThreshold <= 0 || sizeThreshold >= 100) { - throw new GemFireConfigException(SIZE_THRESHOLD_PERCENT + " should be integer, 1 to 99"); - } - } - if (props.getProperty(MINIMUM_SIZE) != null) { - sizeMinimum = Integer.valueOf(props.getProperty(MINIMUM_SIZE)); - if (sizeMinimum <= 0) { - throw new GemFireConfigException(MINIMUM_SIZE + " should be greater than 0"); - } - } - } - } - - @Override - public void execute() { - boolean result = cache.acquireAutoBalanceLock(); - if (!result) { - if (logger.isDebugEnabled()) { - logger.debug("Another member owns auto-balance lock. Skip this attempt to rebalance the cluster"); - } - return; - } - - cache.incrementAttemptCounter(); - result = needsRebalancing(); - if (!result) { - if (logger.isDebugEnabled()) { - logger.debug("Rebalancing is not needed"); - } - return; - } - - cache.rebalance(); - } - - /** - * By default auto-balancer will avoid rebalancing, because a user can - * always trigger rebalance manually. So in case of error or inconsistent - * data, return false. Return true if - * <OL> - * <LI>total transfer size is above threshold percent of total data size at - * cluster level - * <LI>If some smaller capacity nodes are heavily loaded while bigger - * capacity nodes are balanced. In such a scenario transfer size based - * trigger may not cause rebalance. - */ - boolean needsRebalancing() { - // test cluster level status - long transferSize = cache.getTotalTransferSize(); - if (transferSize <= sizeMinimum) { - return false; - } - - Map<PartitionedRegion, InternalPRInfo> details = cache.getRegionMemberDetails(); - long totalSize = cache.getTotalDataSize(details); - - if (totalSize > 0) { - int transferPercent = (int) ((100.0 * transferSize) / totalSize); - if (transferPercent >= sizeThreshold) { - return true; - } - } - - // TODO test member level skew - - return false; - } - - int getSizeThreshold() { - return sizeThreshold; - } - - public long getSizeMinimum() { - return sizeMinimum; - } - } - - /** - * Hides cache level details and exposes simple methods relevant for - * auto-balancing - */ - static class GeodeCacheFacade implements CacheOperationFacade { - private final AtomicBoolean isLockAcquired = new AtomicBoolean(false); - - private GemFireCacheImpl cache; - - public GeodeCacheFacade() { - this(null); - } - - public GeodeCacheFacade(GemFireCacheImpl cache) { - this.cache = cache; - } - - @Override - public Map<PartitionedRegion, InternalPRInfo> getRegionMemberDetails() { - GemFireCacheImpl cache = getCache(); - Map<PartitionedRegion, InternalPRInfo> detailsMap = new HashMap<>(); - for (PartitionedRegion region : cache.getPartitionedRegions()) { - LoadProbe probe = cache.getResourceManager().getLoadProbe(); - InternalPRInfo info = region.getRedundancyProvider().buildPartitionedRegionInfo(true, probe); - detailsMap.put(region, info); - } - return detailsMap; - } - - @Override - public long getTotalDataSize(Map<PartitionedRegion, InternalPRInfo> details) { - long totalSize = 0; - if (details != null) { - for (PartitionedRegion region : details.keySet()) { - InternalPRInfo info = details.get(region); - Set<PartitionMemberInfo> membersInfo = info.getPartitionMemberInfo(); - for (PartitionMemberInfo member : membersInfo) { - if (logger.isDebugEnabled()) { - logger.debug("Region:{}, Member: {}, Size: {}", region.getFullPath(), member, member.getSize()); - } - totalSize += member.getSize(); - } - } - } - return totalSize; - } - - @Override - public long getTotalTransferSize() { - try { - RebalanceOperation operation = getCache().getResourceManager().createRebalanceFactory().simulate(); - RebalanceResults result = operation.getResults(); - if (logger.isDebugEnabled()) { - logger.debug( - "Rebalance estimate: RebalanceResultsImpl [TotalBucketCreateBytes=" + result.getTotalBucketCreateBytes() - + ", TotalBucketCreatesCompleted=" + result.getTotalBucketCreatesCompleted() - + ", TotalBucketTransferBytes=" + result.getTotalBucketTransferBytes() - + ", TotalBucketTransfersCompleted=" + result.getTotalBucketTransfersCompleted() - + ", TotalPrimaryTransfersCompleted=" + result.getTotalPrimaryTransfersCompleted() + "]"); - } - return result.getTotalBucketTransferBytes(); - } catch (CancellationException e) { - logger.info("Error while trying to estimate rebalance cost ", e); - } catch (InterruptedException e) { - logger.info("Error while trying to estimate rebalance cost ", e); - } - return 0; - } - - @Override - public void incrementAttemptCounter() { - GemFireCacheImpl cache = getCache(); - try { - cache.getResourceManager().getStats().incAutoRebalanceAttempts(); - } catch (Exception e) { - logger.warn("Failed to increment AutoBalanceAttempts counter"); - } - } - - @Override - public void rebalance() { - try { - RebalanceOperation operation = getCache().getResourceManager().createRebalanceFactory().start(); - RebalanceResults result = operation.getResults(); - logger.info("Rebalance result: [TotalBucketCreateBytes=" + result.getTotalBucketCreateBytes() - + ", TotalBucketCreateTime=" + result.getTotalBucketCreateTime() + ", TotalBucketCreatesCompleted=" - + result.getTotalBucketCreatesCompleted() + ", TotalBucketTransferBytes=" - + result.getTotalBucketTransferBytes() + ", TotalBucketTransferTime=" + result.getTotalBucketTransferTime() - + ", TotalBucketTransfersCompleted=" + +result.getTotalBucketTransfersCompleted() - + ", TotalPrimaryTransferTime=" + result.getTotalPrimaryTransferTime() + ", TotalPrimaryTransfersCompleted=" - + result.getTotalPrimaryTransfersCompleted() + ", TotalTime=" + result.getTotalTime() + "]"); - } catch (CancellationException e) { - logger.info("Error rebalancing the cluster", e); - } catch (InterruptedException e) { - logger.info("Error rebalancing the cluster", e); - } - } - - GemFireCacheImpl getCache() { - if (cache == null) { - synchronized (this) { - if (cache == null) { - cache = GemFireCacheImpl.getInstance(); - if (cache == null) { - throw new IllegalStateException("Missing cache instance."); - } - } - } - } - if (cache.isClosed()) { - throw new CacheClosedException(); - } - return cache; - } - - @Override - public boolean acquireAutoBalanceLock() { - if (!isLockAcquired.get()) { - synchronized (isLockAcquired) { - if (!isLockAcquired.get()) { - DistributedLockService dls = getDLS(); - - boolean result = dls.lock(AUTO_BALANCER_LOCK, 0, -1); - if (result) { - isLockAcquired.set(true); - if (logger.isDebugEnabled()) { - logger.debug("Grabbed AutoBalancer lock"); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("Another member owns auto-balance lock. Skip this attempt to rebalance the cluster"); - } - } - } - } - } - return isLockAcquired.get(); - } - - @Override - public DistributedLockService getDLS() { - GemFireCacheImpl cache = getCache(); - DistributedLockService dls = DistributedLockService.getServiceNamed(AUTO_BALANCER_LOCK_SERVICE_NAME); - if (dls == null) { - if (logger.isDebugEnabled()) { - logger.debug("Creating DistributeLockService"); - } - dls = DLockService.create(AUTO_BALANCER_LOCK_SERVICE_NAME, cache.getDistributedSystem(), true, true, true); - } - - return dls; - } - } - - private class SystemClockTimeProvider implements TimeProvider { - @Override - public long currentTimeMillis() { - return System.currentTimeMillis(); - } - } - - interface AuditScheduler { - void init(String schedule); - - void destroy(); - } - - interface OOBAuditor { - void init(Properties props); - - void execute(); - } - - interface TimeProvider { - long currentTimeMillis(); - } - - interface CacheOperationFacade { - boolean acquireAutoBalanceLock(); - - DistributedLockService getDLS(); - - void rebalance(); - - void incrementAttemptCounter(); - - Map<PartitionedRegion, InternalPRInfo> getRegionMemberDetails(); - - long getTotalDataSize(Map<PartitionedRegion, InternalPRInfo> details); - - long getTotalTransferSize(); - } - - OOBAuditor getOOBAuditor() { - return auditor; - } - - public CacheOperationFacade getCacheOperationFacade() { - return this.cacheFacade; - } - - public void destroy() { - scheduler.destroy(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/401ef76c/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java ---------------------------------------------------------------------- diff --git a/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java b/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java new file mode 100644 index 0000000..a6488f9 --- /dev/null +++ b/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.util; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.logging.log4j.Logger; +import org.quartz.CronExpression; +import org.springframework.scheduling.support.CronSequenceGenerator; + +import com.gemstone.gemfire.GemFireConfigException; +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.cache.Declarable; +import com.gemstone.gemfire.cache.GemFireCache; +import com.gemstone.gemfire.cache.control.RebalanceOperation; +import com.gemstone.gemfire.cache.control.RebalanceResults; +import com.gemstone.gemfire.cache.partition.PartitionMemberInfo; +import com.gemstone.gemfire.distributed.DistributedLockService; +import com.gemstone.gemfire.distributed.internal.locks.DLockService; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.partitioned.InternalPRInfo; +import com.gemstone.gemfire.internal.cache.partitioned.LoadProbe; +import com.gemstone.gemfire.internal.logging.LogService; + +/** + * Re-balancing operation relocates data from heavily loaded members to lightly + * loaded members. In most cases, the decision to re-balance is based on the + * size of the member and a few other statistics. {@link AutoBalancer} monitors + * these statistics and if necessary, triggers a re-balancing request. + * Auto-Balancing is expected to prevent failures and data loss. + * + * <P> + * This implementation is based on {@code Initializer} implementation. By + * default auto-balancing is disabled. A user needs to configure + * {@link AutoBalancer} during cache initialization + * {@link GemFireCache#getInitializer()} + * + * <P> + * In a cluster only one member owns auto-balancing responsibility. This is + * achieved by grabbing a distributed lock. In case of a failure a new member + * will grab the lock and manage auto balancing. + * + * <P> + * {@link AutoBalancer} can be controlled using the following configurations + * <OL> + * <LI>{@link AutoBalancer#SCHEDULE} + * <LI>TBD THRESHOLDS + * + */ +public class AutoBalancer implements Declarable { + /** + * Use this configuration to manage out-of-balance audit frequency. If the + * auditor finds the system to be out-of-balance, it will trigger + * re-balancing. Any valid cron string is accepted. The sub-expressions + * represent the following: + * <OL> + * <LI>Seconds + * <LI>Minutes + * <LI>Hours + * <LI>Day-of-Month + * <LI>Month + * <LI>Day-of-Week + * <LI>Year (optional field) + * + * <P> + * For. e.g. {@code 0 0 * * * ?} for auditing the system every hour + */ + public static final String SCHEDULE = "schedule"; + + /** + * Use this configuration to manage re-balance invocation. Rebalance operation + * will be triggered if the total number of bytes rebalance operation may move + * is more than this threshold, in percentage of the total data size. + * <P> + * Default value {@link #DEFAULT_SIZE_THRESHOLD_PERCENT} + */ + public static final String SIZE_THRESHOLD_PERCENT = "size-threshold-percent"; + + /** + * Default value of {@link AutoBalancer#SIZE_THRESHOLD_PERCENT}. If 10% of + * data is misplaced, its a good time to redistribute buckets + */ + public static final int DEFAULT_SIZE_THRESHOLD_PERCENT = 10; + + /** + * In the initial data load phases, + * {@link AutoBalancer#SIZE_THRESHOLD_PERCENT} based rebalance invocation may + * be unnecessary. Rebalance should not be triggered if the total data size + * managed by cluster is too small. Rebalance operation will be triggered if + * the total number of bytes rebalance operation may move is more than this + * number of bytes. + * <P> + * Default value {@link #DEFAULT_MINIMUM_SIZE} + */ + public static final String MINIMUM_SIZE = "minimum-size"; + + /** + * Default value of {@link AutoBalancer#MINIMUM_SIZE}. In the initial data + * load phases, {@link AutoBalancer#SIZE_THRESHOLD_PERCENT} based rebalance + * invocation may be unnecessary. Do not rebalance if the data to be moved is + * less than 100MB + */ + public static final int DEFAULT_MINIMUM_SIZE = 100 * 1024 * 1024; + + /** + * Name of the DistributedLockService that {@link AutoBalancer} will use to + * guard against concurrent maintenance activity + */ + public static final String AUTO_BALANCER_LOCK_SERVICE_NAME = "__AUTO_B"; + + public static final Object AUTO_BALANCER_LOCK = "__AUTO_B_LOCK"; + + private final AuditScheduler scheduler; + private final OOBAuditor auditor; + private final TimeProvider clock; + private final CacheOperationFacade cacheFacade; + + private static final Logger logger = LogService.getLogger(); + + public AutoBalancer() { + this(null, null, null, null); + } + + public AutoBalancer(AuditScheduler scheduler, OOBAuditor auditor, TimeProvider clock, + CacheOperationFacade cacheFacade) { + this.cacheFacade = cacheFacade == null ? new GeodeCacheFacade() : cacheFacade; + this.scheduler = scheduler == null ? new CronScheduler() : scheduler; + this.auditor = auditor == null ? new SizeBasedOOBAuditor(this.cacheFacade) : auditor; + this.clock = clock == null ? new SystemClockTimeProvider() : clock; + } + + @Override + public void init(Properties props) { + if (logger.isDebugEnabled()) { + logger.debug("Initializing " + this.getClass().getSimpleName() + " with " + props); + } + + auditor.init(props); + + String schedule = null; + if (props != null) { + schedule = props.getProperty(SCHEDULE); + } + scheduler.init(schedule); + } + + /** + * Invokes audit triggers based on a cron schedule. + * <OL> + * <LI>computes delay = next slot - current time + * <LI>schedules a out-of-balance audit task to be started after delay + * computed earlier + * <LI>once the audit task completes, it repeats delay computation and task + * submission + */ + private class CronScheduler implements AuditScheduler { + final ScheduledExecutorService trigger; + CronSequenceGenerator generator; + + CronScheduler() { + trigger = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, "AutoBalancer"); + thread.setDaemon(true); + return thread; + } + }); + } + + @Override + public void init(String schedule) { + if (logger.isDebugEnabled()) { + logger.debug("Initializing " + this.getClass().getSimpleName() + " with " + schedule); + } + + if (schedule == null || schedule.isEmpty()) { + throw new GemFireConfigException("Missing configuration: " + SCHEDULE); + } + if (!CronExpression.isValidExpression(schedule)) { + throw new GemFireConfigException("Invalid schedule: " + schedule); + } + generator = new CronSequenceGenerator(schedule); + + submitNext(); + } + + private void submitNext() { + long currentTime = clock.currentTimeMillis(); + Date nextSchedule = generator.next(new Date(currentTime)); + long delay = nextSchedule.getTime() - currentTime; + + if (logger.isDebugEnabled()) { + logger.debug("Now={}, next audit time={}, delay={} ms", new Date(currentTime), nextSchedule, delay); + } + + trigger.schedule(new Runnable() { + @Override + public void run() { + try { + auditor.execute(); + } catch (CacheClosedException e) { + logger.warn("Cache closed while attempting to rebalance the cluster. Abort future jobs", e); + return; + } catch (Exception e) { + logger.warn("Error while executing out-of-balance audit.", e); + } + submitNext(); + } + }, delay, TimeUnit.MILLISECONDS); + } + + @Override + public void destroy() { + trigger.shutdownNow(); + } + } + + /** + * Queries member statistics and health to determine if a re-balance operation + * is needed + * <OL> + * <LI>acquires distributed lock + * <LI>queries member health + * <LI>updates auto-balance stat + * <LI>release lock + */ + static class SizeBasedOOBAuditor implements OOBAuditor { + private int sizeThreshold = DEFAULT_SIZE_THRESHOLD_PERCENT; + private int sizeMinimum = DEFAULT_MINIMUM_SIZE; + + final CacheOperationFacade cache; + + public SizeBasedOOBAuditor(CacheOperationFacade cache) { + this.cache = cache; + } + + @Override + public void init(Properties props) { + if (logger.isDebugEnabled()) { + logger.debug("Initializing " + this.getClass().getSimpleName()); + } + + if (props != null) { + if (props.getProperty(SIZE_THRESHOLD_PERCENT) != null) { + sizeThreshold = Integer.valueOf(props.getProperty(SIZE_THRESHOLD_PERCENT)); + if (sizeThreshold <= 0 || sizeThreshold >= 100) { + throw new GemFireConfigException(SIZE_THRESHOLD_PERCENT + " should be integer, 1 to 99"); + } + } + if (props.getProperty(MINIMUM_SIZE) != null) { + sizeMinimum = Integer.valueOf(props.getProperty(MINIMUM_SIZE)); + if (sizeMinimum <= 0) { + throw new GemFireConfigException(MINIMUM_SIZE + " should be greater than 0"); + } + } + } + } + + @Override + public void execute() { + boolean result = cache.acquireAutoBalanceLock(); + if (!result) { + if (logger.isDebugEnabled()) { + logger.debug("Another member owns auto-balance lock. Skip this attempt to rebalance the cluster"); + } + return; + } + + cache.incrementAttemptCounter(); + result = needsRebalancing(); + if (!result) { + if (logger.isDebugEnabled()) { + logger.debug("Rebalancing is not needed"); + } + return; + } + + cache.rebalance(); + } + + /** + * By default auto-balancer will avoid rebalancing, because a user can + * always trigger rebalance manually. So in case of error or inconsistent + * data, return false. Return true if + * <OL> + * <LI>total transfer size is above threshold percent of total data size at + * cluster level + * <LI>If some smaller capacity nodes are heavily loaded while bigger + * capacity nodes are balanced. In such a scenario transfer size based + * trigger may not cause rebalance. + */ + boolean needsRebalancing() { + // test cluster level status + long transferSize = cache.getTotalTransferSize(); + if (transferSize <= sizeMinimum) { + return false; + } + + Map<PartitionedRegion, InternalPRInfo> details = cache.getRegionMemberDetails(); + long totalSize = cache.getTotalDataSize(details); + + if (totalSize > 0) { + int transferPercent = (int) ((100.0 * transferSize) / totalSize); + if (transferPercent >= sizeThreshold) { + return true; + } + } + + // TODO test member level skew + + return false; + } + + int getSizeThreshold() { + return sizeThreshold; + } + + public long getSizeMinimum() { + return sizeMinimum; + } + } + + /** + * Hides cache level details and exposes simple methods relevant for + * auto-balancing + */ + static class GeodeCacheFacade implements CacheOperationFacade { + private final AtomicBoolean isLockAcquired = new AtomicBoolean(false); + + private GemFireCacheImpl cache; + + public GeodeCacheFacade() { + this(null); + } + + public GeodeCacheFacade(GemFireCacheImpl cache) { + this.cache = cache; + } + + @Override + public Map<PartitionedRegion, InternalPRInfo> getRegionMemberDetails() { + GemFireCacheImpl cache = getCache(); + Map<PartitionedRegion, InternalPRInfo> detailsMap = new HashMap<>(); + for (PartitionedRegion region : cache.getPartitionedRegions()) { + LoadProbe probe = cache.getResourceManager().getLoadProbe(); + InternalPRInfo info = region.getRedundancyProvider().buildPartitionedRegionInfo(true, probe); + detailsMap.put(region, info); + } + return detailsMap; + } + + @Override + public long getTotalDataSize(Map<PartitionedRegion, InternalPRInfo> details) { + long totalSize = 0; + if (details != null) { + for (PartitionedRegion region : details.keySet()) { + InternalPRInfo info = details.get(region); + Set<PartitionMemberInfo> membersInfo = info.getPartitionMemberInfo(); + for (PartitionMemberInfo member : membersInfo) { + if (logger.isDebugEnabled()) { + logger.debug("Region:{}, Member: {}, Size: {}", region.getFullPath(), member, member.getSize()); + } + totalSize += member.getSize(); + } + } + } + return totalSize; + } + + @Override + public long getTotalTransferSize() { + try { + RebalanceOperation operation = getCache().getResourceManager().createRebalanceFactory().simulate(); + RebalanceResults result = operation.getResults(); + if (logger.isDebugEnabled()) { + logger.debug( + "Rebalance estimate: RebalanceResultsImpl [TotalBucketCreateBytes=" + result.getTotalBucketCreateBytes() + + ", TotalBucketCreatesCompleted=" + result.getTotalBucketCreatesCompleted() + + ", TotalBucketTransferBytes=" + result.getTotalBucketTransferBytes() + + ", TotalBucketTransfersCompleted=" + result.getTotalBucketTransfersCompleted() + + ", TotalPrimaryTransfersCompleted=" + result.getTotalPrimaryTransfersCompleted() + "]"); + } + return result.getTotalBucketTransferBytes(); + } catch (CancellationException e) { + logger.info("Error while trying to estimate rebalance cost ", e); + } catch (InterruptedException e) { + logger.info("Error while trying to estimate rebalance cost ", e); + } + return 0; + } + + @Override + public void incrementAttemptCounter() { + GemFireCacheImpl cache = getCache(); + try { + cache.getResourceManager().getStats().incAutoRebalanceAttempts(); + } catch (Exception e) { + logger.warn("Failed to increment AutoBalanceAttempts counter"); + } + } + + @Override + public void rebalance() { + try { + RebalanceOperation operation = getCache().getResourceManager().createRebalanceFactory().start(); + RebalanceResults result = operation.getResults(); + logger.info("Rebalance result: [TotalBucketCreateBytes=" + result.getTotalBucketCreateBytes() + + ", TotalBucketCreateTime=" + result.getTotalBucketCreateTime() + ", TotalBucketCreatesCompleted=" + + result.getTotalBucketCreatesCompleted() + ", TotalBucketTransferBytes=" + + result.getTotalBucketTransferBytes() + ", TotalBucketTransferTime=" + result.getTotalBucketTransferTime() + + ", TotalBucketTransfersCompleted=" + +result.getTotalBucketTransfersCompleted() + + ", TotalPrimaryTransferTime=" + result.getTotalPrimaryTransferTime() + ", TotalPrimaryTransfersCompleted=" + + result.getTotalPrimaryTransfersCompleted() + ", TotalTime=" + result.getTotalTime() + "]"); + } catch (CancellationException e) { + logger.info("Error rebalancing the cluster", e); + } catch (InterruptedException e) { + logger.info("Error rebalancing the cluster", e); + } + } + + GemFireCacheImpl getCache() { + if (cache == null) { + synchronized (this) { + if (cache == null) { + cache = GemFireCacheImpl.getInstance(); + if (cache == null) { + throw new IllegalStateException("Missing cache instance."); + } + } + } + } + if (cache.isClosed()) { + throw new CacheClosedException(); + } + return cache; + } + + @Override + public boolean acquireAutoBalanceLock() { + if (!isLockAcquired.get()) { + synchronized (isLockAcquired) { + if (!isLockAcquired.get()) { + DistributedLockService dls = getDLS(); + + boolean result = dls.lock(AUTO_BALANCER_LOCK, 0, -1); + if (result) { + isLockAcquired.set(true); + if (logger.isDebugEnabled()) { + logger.debug("Grabbed AutoBalancer lock"); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("Another member owns auto-balance lock. Skip this attempt to rebalance the cluster"); + } + } + } + } + } + return isLockAcquired.get(); + } + + @Override + public DistributedLockService getDLS() { + GemFireCacheImpl cache = getCache(); + DistributedLockService dls = DistributedLockService.getServiceNamed(AUTO_BALANCER_LOCK_SERVICE_NAME); + if (dls == null) { + if (logger.isDebugEnabled()) { + logger.debug("Creating DistributeLockService"); + } + dls = DLockService.create(AUTO_BALANCER_LOCK_SERVICE_NAME, cache.getDistributedSystem(), true, true, true); + } + + return dls; + } + } + + private class SystemClockTimeProvider implements TimeProvider { + @Override + public long currentTimeMillis() { + return System.currentTimeMillis(); + } + } + + interface AuditScheduler { + void init(String schedule); + + void destroy(); + } + + interface OOBAuditor { + void init(Properties props); + + void execute(); + } + + interface TimeProvider { + long currentTimeMillis(); + } + + interface CacheOperationFacade { + boolean acquireAutoBalanceLock(); + + DistributedLockService getDLS(); + + void rebalance(); + + void incrementAttemptCounter(); + + Map<PartitionedRegion, InternalPRInfo> getRegionMemberDetails(); + + long getTotalDataSize(Map<PartitionedRegion, InternalPRInfo> details); + + long getTotalTransferSize(); + } + + OOBAuditor getOOBAuditor() { + return auditor; + } + + public CacheOperationFacade getCacheOperationFacade() { + return this.cacheFacade; + } + + public void destroy() { + scheduler.destroy(); + } +}
