http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java index ec2dec1,82f5c88..18143b3 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java @@@ -24,8 -24,8 +24,9 @@@ import com.gemstone.gemfire.internal.In import com.gemstone.gemfire.internal.InternalInstantiator.InstantiatorAttributesHolder; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.net.SocketCreator; + import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.Logger; import java.io.DataInput;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/SystemAdmin.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/SystemAdmin.java index bc0ff57,e52950f..81d9ec8 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SystemAdmin.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/SystemAdmin.java @@@ -26,9 -26,9 +26,12 @@@ import com.gemstone.gemfire.cache.persi import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.internal.*; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.distributed.internal.tcpserver.*; +import com.gemstone.gemfire.internal.StatArchiveReader.ResourceInst; +import com.gemstone.gemfire.internal.StatArchiveReader.StatValue; + import com.gemstone.gemfire.internal.statistics.StatArchiveReader; + import com.gemstone.gemfire.internal.statistics.StatArchiveReader.ResourceInst; + import com.gemstone.gemfire.internal.statistics.StatArchiveReader.StatValue; import com.gemstone.gemfire.internal.admin.remote.TailLogResponse; import com.gemstone.gemfire.internal.cache.DiskStoreImpl; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/DistributionLocatorId.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerLauncher.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java index 67c5dec,7422e16..5540a4a --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java @@@ -1272,18 -1282,7 +1270,7 @@@ public class AcceptorImpl extends Accep // java.lang.NullPointerException // at // com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:107) - - if (this.crHelper.emulateSlowServer() > 0) { - try { - Thread.sleep(this.crHelper.emulateSlowServer()); - } - catch (InterruptedException ugh) { - // This had better be due to shutdown; don't reenable the bit, - // it would just cause a hot-loop. - // Thread.currentThread().interrupt(); - }; - } -- ++ synchronized (this.syncLock) { if (!isRunning()) { closeSocket(s); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java index 5a26d72,6fd4560..d4a3be1 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java @@@ -81,10 -81,10 +81,10 @@@ import com.gemstone.gemfire.distributed import com.gemstone.gemfire.distributed.internal.ReplyMessage; import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; import com.gemstone.gemfire.internal.ClassLoadUtil; - import com.gemstone.gemfire.internal.DummyStatisticsFactory; + import com.gemstone.gemfire.internal.statistics.DummyStatisticsFactory; import com.gemstone.gemfire.internal.InternalDataSerializer; import com.gemstone.gemfire.internal.InternalInstantiator; -import com.gemstone.gemfire.internal.SocketCloser; +import com.gemstone.gemfire.internal.net.SocketCloser; import com.gemstone.gemfire.internal.SystemTimer; import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.VersionedDataInputStream; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java index 8f81196,26efd4d..a371447 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java @@@ -43,9 -43,9 +43,10 @@@ import com.gemstone.gemfire.internal.lo import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; import com.gemstone.gemfire.internal.logging.log4j.LogMarker; +import com.gemstone.gemfire.internal.net.SocketCreator; import com.gemstone.gemfire.internal.offheap.annotations.Released; import com.gemstone.gemfire.internal.sequencelog.EntryLogger; + import com.gemstone.gemfire.internal.statistics.StatisticsTypeFactoryImpl; import com.gemstone.gemfire.security.AuthenticationFailedException; import com.gemstone.gemfire.security.AuthenticationRequiredException; import com.gemstone.gemfire.security.GemFireSecurityException; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheServerStats.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheServerStats.java index 445790e,658b35e..4b0b231 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheServerStats.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheServerStats.java @@@ -27,8 -23,11 +23,11 @@@ import com.gemstone.gemfire.StatisticsT import com.gemstone.gemfire.cache.server.ServerLoad; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; import com.gemstone.gemfire.distributed.internal.PoolStatHelper; -import com.gemstone.gemfire.internal.SocketCreator; +import com.gemstone.gemfire.internal.net.SocketCreator; + /** + * Cache Server statistic definitions + */ public class CacheServerStats implements MessageStats { private static final String typeName = "CacheServerStats"; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/HostStatHelper.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/HostStatHelper.java index 0000000,b52eaae..d965d92 mode 000000,100644..100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/HostStatHelper.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/HostStatHelper.java @@@ -1,0 -1,303 +1,305 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.gemstone.gemfire.internal.statistics; + + import com.gemstone.gemfire.*; + //import com.gemstone.gemfire.util.*; + import com.gemstone.gemfire.internal.PureJavaMode; + import com.gemstone.gemfire.internal.SocketCreator; + import com.gemstone.gemfire.internal.i18n.LocalizedStrings; ++import com.gemstone.gemfire.internal.net.SocketCreator; ++ + import com.gemstone.gemfire.internal.statistics.platform.LinuxProcFsStatistics; + import com.gemstone.gemfire.internal.statistics.platform.LinuxProcessStats; + import com.gemstone.gemfire.internal.statistics.platform.LinuxSystemStats; + import com.gemstone.gemfire.internal.statistics.platform.OSXProcessStats; + import com.gemstone.gemfire.internal.statistics.platform.OSXSystemStats; + import com.gemstone.gemfire.internal.statistics.platform.OsStatisticsFactory; + import com.gemstone.gemfire.internal.statistics.platform.ProcessStats; + import com.gemstone.gemfire.internal.statistics.platform.SolarisProcessStats; + import com.gemstone.gemfire.internal.statistics.platform.SolarisSystemStats; + import com.gemstone.gemfire.internal.statistics.platform.WindowsProcessStats; + import com.gemstone.gemfire.internal.statistics.platform.WindowsSystemStats; + + import java.net.InetAddress; + import java.net.UnknownHostException; + + /** + * Provides native methods which fetch operating system statistics. + */ + public class HostStatHelper { + static final int SOLARIS_CODE = 1; // Sparc Solaris + static final int WINDOWS_CODE = 2; + static final int LINUX_CODE = 3; // x86 Linux + static final int OSX_CODE = 4; // Mac OS X + + static final int PROCESS_STAT_FLAG = 1; + static final int SYSTEM_STAT_FLAG = 2; + + static final int osCode; + + static { + String osName = System.getProperty("os.name", "unknown"); + if (! PureJavaMode.osStatsAreAvailable()) { + throw new RuntimeException(LocalizedStrings.HostStatHelper_HOSTSTATHELPER_NOT_ALLOWED_IN_PURE_JAVA_MODE.toLocalizedString()); + } else if (osName.equals("SunOS")) { + osCode = SOLARIS_CODE; + } else if (osName.startsWith("Windows")) { + osCode = WINDOWS_CODE; + } else if (osName.startsWith("Linux")) { + osCode = LINUX_CODE; + } else if (osName.equals("Mac OS X")) { + osCode = OSX_CODE; + } else { + throw new InternalGemFireException(LocalizedStrings.HostStatHelper_UNSUPPORTED_OS_0_SUPPORTED_OSS_ARE_SUNOSSPARC_SOLARIS_LINUXX86_AND_WINDOWS.toLocalizedString(osName)); + } + } + public static boolean isWindows() { + return osCode == WINDOWS_CODE; + } + public static boolean isUnix() { + return osCode != WINDOWS_CODE; + } + public static boolean isSolaris() { + return osCode == SOLARIS_CODE; + } + public static boolean isLinux() { + return osCode == LINUX_CODE; + } + public static boolean isOSX() { + return osCode == OSX_CODE; + } + + private HostStatHelper() { + // instances are not allowed + } + + static int initOSStats() { + if(isLinux()) { + return LinuxProcFsStatistics.init(); + } else { + return HostStatHelper.init(); + } + } + + static void closeOSStats() { + if(isLinux()) { + LinuxProcFsStatistics.close(); + } else { + HostStatHelper.close(); + } + } + + static void readyRefreshOSStats() { + if(isLinux()) { + LinuxProcFsStatistics.readyRefresh(); + } else { + HostStatHelper.readyRefresh(); + } + } + + /** + * Allocates and initializes any resources required to sample + * operating system statistics. + * returns 0 if initialization succeeded + */ + private static native int init(); + /** + * Frees up resources used by this class. Once close is called this + * class can no longer be used. + */ + private static native void close(); + /** + * Should be called before any calls to the refresh methods. + * On some platforms if this is not called then the refesh methods + * will just keep returning the same old data. + */ + private static native void readyRefresh(); + /** + * Refreshes the specified process stats instance by fetching + * the current OS values for the given stats and storing them in the instance. + */ + private static void refreshProcess(LocalStatisticsImpl s) { + int pid = (int)s.getNumericId(); + if(isLinux()) { + LinuxProcFsStatistics.refreshProcess(pid, s._getIntStorage(), s._getLongStorage(), s._getDoubleStorage()); + } else { + refreshProcess(pid, s._getIntStorage(), s._getLongStorage(), s._getDoubleStorage()); + } + } + private static native void refreshProcess(int pid, int[] ints, long[] longs, double[] doubles); + /** + * Refreshes the specified system stats instance by fetching + * the current OS values for the local machine and storing them in + * the instance. + */ + private static void refreshSystem(LocalStatisticsImpl s) { + if(isLinux()) { + LinuxProcFsStatistics.refreshSystem(s._getIntStorage(), s._getLongStorage(), s._getDoubleStorage()); + } else { + refreshSystem(s._getIntStorage(), s._getLongStorage(), s._getDoubleStorage()); + } + } + private static native void refreshSystem(int[] ints, long[] longs, double[] doubles); + + /** + * The call should have already checked to make sure + * usesSystemCalls returns true. + */ + public static void refresh(LocalStatisticsImpl stats) { + int flags = stats.getOsStatFlags(); + if ((flags & PROCESS_STAT_FLAG) != 0) { + HostStatHelper.refreshProcess(stats); + } else if ((flags & SYSTEM_STAT_FLAG) != 0) { + HostStatHelper.refreshSystem(stats); + } else { + throw new RuntimeException(LocalizedStrings.HostStatHelper_UNEXPECTED_OS_STATS_FLAGS_0.toLocalizedString(Integer.valueOf(flags))); + } + } + + /** + * Creates and returns a {@link Statistics} with + * the given pid and name. The resource's stats will contain a snapshot + * of the current statistic values for the specified process. + */ + public static Statistics newProcess(OsStatisticsFactory f, long pid, String name) { + Statistics stats; + switch (osCode) { + case SOLARIS_CODE: + stats = f.createOsStatistics(SolarisProcessStats.getType(), + name, pid, PROCESS_STAT_FLAG); + break; + case LINUX_CODE: + stats = f.createOsStatistics(LinuxProcessStats.getType(), + name, pid, PROCESS_STAT_FLAG); + break; + case OSX_CODE: + stats = f.createOsStatistics(OSXProcessStats.getType(), + name, pid, PROCESS_STAT_FLAG); + break; + case WINDOWS_CODE: + stats = f.createOsStatistics(WindowsProcessStats.getType(), + name, pid, PROCESS_STAT_FLAG); + break; + default: + throw new InternalGemFireException(LocalizedStrings.HostStatHelper_UNHANDLED_OSCODE_0_HOSTSTATHELPERNEWPROCESS.toLocalizedString(Integer.valueOf(osCode))); + } + // Note we don't call refreshProcess since we only want the manager to do that + return stats; + } + + /** + * Creates a new <code>ProcessStats</code> instance that wraps the + * given <code>Statistics</code>. + * + * @see #newProcess + * @since GemFire 3.5 + */ + static ProcessStats newProcessStats(Statistics stats) { + switch (osCode) { + case SOLARIS_CODE: + return SolarisProcessStats.createProcessStats(stats); + + case LINUX_CODE: + return LinuxProcessStats.createProcessStats(stats); + + case WINDOWS_CODE: + return WindowsProcessStats.createProcessStats(stats); + + case OSX_CODE: + return OSXProcessStats.createProcessStats(stats); + + default: + throw new InternalGemFireException(LocalizedStrings.HostStatHelper_UNHANDLED_OSCODE_0_HOSTSTATHELPERNEWPROCESSSTATS.toLocalizedString(Integer.valueOf(osCode))); + } + } + + /** + * Creates and returns a {@link Statistics} with the current + * machine's stats. The resource's stats will contain a snapshot + * of the current statistic values for the local machine. + */ + static void newSystem(OsStatisticsFactory f) { + Statistics stats; + switch (osCode) { + case SOLARIS_CODE: + stats = f.createOsStatistics(SolarisSystemStats.getType(), + getHostSystemName(), + getHostSystemId(), + SYSTEM_STAT_FLAG); + break; + case LINUX_CODE: + stats = f.createOsStatistics(LinuxSystemStats.getType(), + getHostSystemName(), + getHostSystemId(), + SYSTEM_STAT_FLAG); + break; + case WINDOWS_CODE: + stats = f.createOsStatistics(WindowsSystemStats.getType(), + getHostSystemName(), + getHostSystemId(), + SYSTEM_STAT_FLAG); + break; + case OSX_CODE: + stats = f.createOsStatistics(OSXSystemStats.getType(), + getHostSystemName(), + getHostSystemId(), + SYSTEM_STAT_FLAG); + break; + default: + throw new InternalGemFireException(LocalizedStrings.HostStatHelper_UNHANDLED_OSCODE_0_HOSTSTATHELPERNEWSYSTEM.toLocalizedString(Integer.valueOf(osCode))); + } + if (stats instanceof LocalStatisticsImpl) { + refreshSystem((LocalStatisticsImpl)stats); + } // otherwise its a Dummy implementation so do nothing + } + + /** + * @return this machine's fully qualified hostname + * or "unknownHostName" if one cannot be found. + */ + private static String getHostSystemName() { + String hostname = "unknownHostName"; + try { + InetAddress addr = SocketCreator.getLocalHost(); + hostname = addr.getCanonicalHostName(); + } catch (UnknownHostException uhe) { + } + return hostname; + } + + /** + * Generate a systemid based off of the ip address of the host. + * This duplicates the common implementation of + * <code>long gethostid(void) </code>. + * Punt on the ipv6 case and just use the same algorithm. + * @return a psuedo unique id based on the ip address + */ + private static long getHostSystemId() { + long id = 0L; + try { + InetAddress host = SocketCreator.getLocalHost(); + byte[] addr = host.getAddress(); + id = (addr[1] & 0xFFL) << 24 | + (addr[0] & 0xFFL) << 16 | + (addr[3] & 0xFFL) << 8 | + (addr[2] & 0xFFL) << 0; + } catch (UnknownHostException uhe) { + } + return id; + } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/HostStatSampler.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/HostStatSampler.java index 0000000,d414d2c..d7c3419 mode 000000,100644..100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/HostStatSampler.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/HostStatSampler.java @@@ -1,0 -1,549 +1,554 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.gemstone.gemfire.internal.statistics; + + import com.gemstone.gemfire.CancelCriterion; + import com.gemstone.gemfire.CancelException; + import com.gemstone.gemfire.Statistics; + import com.gemstone.gemfire.SystemFailure; + import com.gemstone.gemfire.distributed.internal.DistributionConfig; + import com.gemstone.gemfire.internal.NanoTimer; + import com.gemstone.gemfire.internal.SocketCreator; + import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + import com.gemstone.gemfire.internal.logging.LogService; + import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; + import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; + import com.gemstone.gemfire.internal.logging.log4j.LogMarker; ++import com.gemstone.gemfire.internal.net.SocketCreator; ++import com.gemstone.gemfire.internal.statistics.CallbackSampler; ++import com.gemstone.gemfire.internal.statistics.SampleCollector; ++import com.gemstone.gemfire.internal.statistics.StatArchiveHandlerConfig; ++import com.gemstone.gemfire.internal.statistics.StatisticsSampler; + import com.gemstone.gemfire.internal.statistics.platform.OsStatisticsFactory; + import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch; + import org.apache.logging.log4j.Logger; + + import java.io.File; + import java.net.UnknownHostException; + import java.util.List; + import java.util.concurrent.TimeUnit; + + /** + * HostStatSampler implements a thread which will monitor, sample, and archive + * statistics. It only has the common functionality that any sampler needs. + + */ + public abstract class HostStatSampler + implements Runnable, StatisticsSampler, StatArchiveHandlerConfig { + + private static final Logger logger = LogService.getLogger(); + + public static final String TEST_FILE_SIZE_LIMIT_IN_KB_PROPERTY = DistributionConfig.GEMFIRE_PREFIX + "stats.test.fileSizeLimitInKB"; + public static final String OS_STATS_DISABLED_PROPERTY = "osStatsDisabled"; + + protected static final String INITIALIZATION_TIMEOUT_PROPERTY = DistributionConfig.GEMFIRE_PREFIX + "statSamplerInitializationTimeout"; + protected static final int INITIALIZATION_TIMEOUT_DEFAULT = 30000; + protected static final long INITIALIZATION_TIMEOUT_MILLIS = + Long.getLong(INITIALIZATION_TIMEOUT_PROPERTY, INITIALIZATION_TIMEOUT_DEFAULT); + + /** + * Used to check if the sampler thread wake-up is delayed, and log a warning if it is delayed by longer than + * the amount of milliseconds specified by this property. The value of 0 disables the check. + */ + private static final long STAT_SAMPLER_DELAY_THRESHOLD = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "statSamplerDelayThreshold", 3000); + private static final long STAT_SAMPLER_DELAY_THRESHOLD_NANOS = NanoTimer.millisToNanos(STAT_SAMPLER_DELAY_THRESHOLD); + + private static final int MIN_MS_SLEEP = 1; + + private static final int WAIT_FOR_SLEEP_INTERVAL = 10; + + private static Thread statThread = null; + + private volatile boolean stopRequested = false; + + private final boolean osStatsDisabled = Boolean.getBoolean(OS_STATS_DISABLED_PROPERTY); + private final boolean fileSizeLimitInKB; + private final StatSamplerStats samplerStats; + + private VMStatsContract vmStats; + private SampleCollector sampleCollector; + + /** + * Used to signal thread that are waiting for the stat sampler to be initialized. + */ + private final StoppableCountDownLatch statSamplerInitializedLatch; + + private final CancelCriterion stopper; + + private final CallbackSampler callbackSampler; + + protected HostStatSampler(CancelCriterion stopper, + StatSamplerStats samplerStats) { + this.stopper = stopper; + this.statSamplerInitializedLatch = new StoppableCountDownLatch(this.stopper, 1); + this.samplerStats = samplerStats; + this.fileSizeLimitInKB = Boolean.getBoolean(TEST_FILE_SIZE_LIMIT_IN_KB_PROPERTY); + this.callbackSampler = new CallbackSampler(stopper, samplerStats); + } + + public final StatSamplerStats getStatSamplerStats() { + return this.samplerStats; + } + + /** + * Returns the number of times a statistics resource has been add or deleted. + */ + @Override + public final int getStatisticsModCount() { + return getStatisticsManager().getStatListModCount(); + } + + /** + * Returns an array of all the current statistic resource instances. + */ + @Override + public final Statistics[] getStatistics() { + return getStatisticsManager().getStatistics(); + } + + /** + * Returns a unique id for the sampler's system. + */ + @Override + public final long getSystemId() { + return getStatisticsManager().getId(); + } + + /** + * Returns the time this sampler's system was started. + */ + @Override + public final long getSystemStartTime() { + return getStatisticsManager().getStartTime(); + } + + /** + * Returns the path to this sampler's system directory; if it has one. + */ + @Override + public final String getSystemDirectoryPath() { + try { + return SocketCreator.getHostName(SocketCreator.getLocalHost()); + } catch (UnknownHostException ignore) { + return ""; + } + } + + @Override + public boolean waitForSample(long timeout) throws InterruptedException { + final long endTime = System.currentTimeMillis() + timeout; + final int startSampleCount = this.samplerStats.getSampleCount(); + while (System.currentTimeMillis() < endTime && + this.samplerStats.getSampleCount() <= startSampleCount) { + Thread.sleep(WAIT_FOR_SLEEP_INTERVAL); + } + return this.samplerStats.getSampleCount() > startSampleCount; + } + + @Override + public SampleCollector waitForSampleCollector(long timeout) throws InterruptedException { + final long endTime = System.currentTimeMillis() + timeout; + while (System.currentTimeMillis() < endTime && + this.sampleCollector == null || !this.sampleCollector.isInitialized()) { + Thread.sleep(WAIT_FOR_SLEEP_INTERVAL); + } + return this.sampleCollector; + } + + /** + * This service's main loop + */ + @Override + public final void run() { + NanoTimer timer = new NanoTimer(); + + final boolean isDebugEnabled_STATISTICS = logger.isTraceEnabled(LogMarker.STATISTICS); + if (isDebugEnabled_STATISTICS) { + logger.trace(LogMarker.STATISTICS, "HostStatSampler started"); + } + boolean latchCountedDown = false; + try { + initSpecialStats(); + + this.sampleCollector = new SampleCollector(this); + this.sampleCollector.initialize(this, NanoTimer.getTime()); + + this.statSamplerInitializedLatch.countDown(); + latchCountedDown = true; + + timer.reset(); + // subtract getNanoRate from lastTS to force a quick initial sample + long nanosLastTimeStamp = timer.getLastResetTime() - getNanoRate(); + while (!stopRequested()) { + SystemFailure.checkFailure(); + if (Thread.currentThread().isInterrupted()) { + break; + } + final long nanosBeforeSleep = timer.getLastResetTime(); + final long nanosToDelay = nanosLastTimeStamp + getNanoRate(); + delay(timer, nanosToDelay); + nanosLastTimeStamp = timer.getLastResetTime(); + if (!stopRequested() && isSamplingEnabled()) { + final long nanosTimeStamp = timer.getLastResetTime(); + final long nanosElapsedSleeping = nanosTimeStamp - nanosBeforeSleep; + checkElapsedSleepTime(nanosElapsedSleeping); + if (stopRequested()) break; + sampleSpecialStats(false); + if (stopRequested()) break; + checkListeners(); + if (stopRequested()) break; + + this.sampleCollector.sample(nanosTimeStamp); + + final long nanosSpentWorking = timer.reset(); + accountForTimeSpentWorking(nanosSpentWorking, nanosElapsedSleeping); + } else if (!stopRequested() && !isSamplingEnabled()) { + sampleSpecialStats(true); // fixes bug 42527 + } + } + } + catch (InterruptedException ex) { + // Silently exit + } + catch (CancelException ex) { + // Silently exit + } + catch (RuntimeException ex) { + logger.fatal(LogMarker.STATISTICS, ex.getMessage(), ex); + throw ex; + } + catch (VirtualMachineError err) { + SystemFailure.initiateFailure(err); + // If this ever returns, rethrow the error. We're poisoned + // now, so don't let this thread continue. + throw err; + } + catch (Error ex) { + // Whenever you catch Error or Throwable, you must also + // catch VirtualMachineError (see above). However, there is + // _still_ a possibility that you are dealing with a cascading + // error condition, so you also need to check to see if the JVM + // is still usable: + SystemFailure.checkFailure(); + logger.fatal(LogMarker.STATISTICS, ex.getMessage(), ex); + throw ex; + } finally { + try { + closeSpecialStats(); + if (this.sampleCollector != null) { + this.sampleCollector.close(); + } + } finally { + if (!latchCountedDown) { + // Make sure the latch gets counted down since + // other threads wait for this to indicate that + // the sampler is initialized. + this.statSamplerInitializedLatch.countDown(); + } + } + if (isDebugEnabled_STATISTICS) { + logger.trace(LogMarker.STATISTICS, "HostStatSampler stopped"); + } + } + } + + /** + * Starts the main thread for this service. + * @throws IllegalStateException if an instance of the {@link #statThread} is still running from a previous DistributedSystem. + */ + public final void start() { + synchronized(HostStatSampler.class) { + if (statThread != null) { + try { + int msToWait = getSampleRate() + 100; + statThread.join(msToWait); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + if (statThread.isAlive()) { + throw new IllegalStateException(LocalizedStrings.HostStatSampler_STATISTICS_SAMPLING_THREAD_IS_ALREADY_RUNNING_INDICATING_AN_INCOMPLETE_SHUTDOWN_OF_A_PREVIOUS_CACHE.toLocalizedString()); + } + } + ThreadGroup group = + LoggingThreadGroup.createThreadGroup("StatSampler Threads"); + + this.callbackSampler.start(getStatisticsManager(), group, getSampleRate(), TimeUnit.MILLISECONDS); + statThread = new Thread(group, this); + statThread.setName(statThread.getName() + " StatSampler"); + statThread.setPriority(Thread.MAX_PRIORITY); + statThread.setDaemon(true); + statThread.start(); + // fix #46310 (race between management and sampler init) by waiting for init here + try { + waitForInitialization(INITIALIZATION_TIMEOUT_MILLIS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Tell this service's main thread to terminate. + */ + public final void stop() { + stop(true); + } + private final void stop(boolean interruptIfAlive) { + synchronized (HostStatSampler.class) { + this.callbackSampler.stop(); + if ( statThread == null) { + return; + } + + this.stopRequested = true; + synchronized (this) { + this.notifyAll(); + } + try { + statThread.join(5000); + } catch (InterruptedException ignore) { + //It is important that we shutdown so we'll continue trying for another 2 seconds + try { + statThread.join(2000); + } catch (InterruptedException ex) { + } finally { + Thread.currentThread().interrupt(); + } + } finally { + if ( statThread.isAlive() ) { + if (interruptIfAlive) { + // It is still alive so interrupt the thread + statThread.interrupt(); + stop(false); + } else { + logger.warn(LogMarker.STATISTICS, LocalizedMessage.create(LocalizedStrings.HostStatSampler_HOSTSTATSAMPLER_THREAD_COULD_NOT_BE_STOPPED)); + } + } else { + this.stopRequested = false; + statThread = null; + } + } + } + } + + public final boolean isAlive() { + synchronized (HostStatSampler.class) { + return statThread != null && statThread.isAlive(); + } + } + + /** + * Waits for the special statistics to be initialized. For tests, please + * use {@link #waitForInitialization(long)} instead. + * + * @see #initSpecialStats + * @since GemFire 3.5 + */ + public final void waitForInitialization() throws InterruptedException { + this.statSamplerInitializedLatch.await(); + } + + /** + * Waits for the special statistics to be initialized. This overridden + * version of {@link #waitForInitialization()} should always be used + * within tests. + * + * @see #initSpecialStats + * @since GemFire 7.0 + */ + public final boolean waitForInitialization(long ms) throws InterruptedException { + return this.statSamplerInitializedLatch.await(ms); + } + + public final void changeArchive(File newFile) { + this.sampleCollector.changeArchive(newFile, NanoTimer.getTime()); + } + + /** + * Returns the <code>VMStatsContract</code> for this VM. + * + * @since GemFire 3.5 + */ + public final VMStatsContract getVMStats() { + return this.vmStats; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(getClass().getName()); + sb.append("@").append(System.identityHashCode(this)); + return sb.toString(); + } + + protected abstract void checkListeners(); + + /** + * Gets the sample rate in milliseconds + */ + protected abstract int getSampleRate(); + + /** + * Returns true if sampling is enabled. + */ + public abstract boolean isSamplingEnabled(); + + /** + * Returns the statistics manager using this sampler. + */ + protected abstract StatisticsManager getStatisticsManager(); + + protected OsStatisticsFactory getOsStatisticsFactory() { + return null; + } + + protected void initProcessStats(long id) { + // do nothing by default + } + + protected void sampleProcessStats(boolean prepareOnly) { + // do nothing by default + } + + protected void closeProcessStats() { + // do nothing by default + } + + protected long getSpecialStatsId() { + return getStatisticsManager().getId(); + } + + protected final boolean fileSizeLimitInKB() { + return this.fileSizeLimitInKB; + } + + protected final boolean osStatsDisabled() { + return this.osStatsDisabled; + } + + protected final boolean stopRequested() { + return stopper.isCancelInProgress() || this.stopRequested; + } + + public final SampleCollector getSampleCollector() { + return this.sampleCollector; + } + + /** + * Initialize any special sampler stats + */ + private synchronized void initSpecialStats() { + // add a vm resource + long id = getSpecialStatsId(); + this.vmStats = VMStatsContractFactory.create(getStatisticsManager(), id); + initProcessStats(id); + } + + /** + * Closes down anything initialied by initSpecialStats. + */ + private synchronized void closeSpecialStats() { + if (this.vmStats != null) { + this.vmStats.close(); + } + closeProcessStats(); + } + + /** + * Called when this sampler has spent some time working and wants + * it to be accounted for. + */ + private void accountForTimeSpentWorking(long nanosSpentWorking, + long nanosSpentSleeping) { + this.samplerStats.tookSample( + nanosSpentWorking, getStatisticsManager().getStatisticsCount(), nanosSpentSleeping); + } + + /** + * @param timer a NanoTimer used to compute the elapsed delay + * @param nanosToDelay the timestamp to delay until it is the current time + */ + private void delay(NanoTimer timer, final long nanosToDelay) throws InterruptedException { + timer.reset(); + long now = timer.getLastResetTime(); + long remainingNanos = nanosToDelay - now; + if (remainingNanos <= 0) { + remainingNanos = NanoTimer.millisToNanos(MIN_MS_SLEEP); + } + while (remainingNanos > 0 && !stopRequested()) { + long ms = NanoTimer.nanosToMillis(remainingNanos); + if (ms <= 0) { + Thread.yield(); + } else { + if (ms > MIN_MS_SLEEP) { + ms -= MIN_MS_SLEEP; + } + synchronized (this) { + if (stopRequested()) { + // check stopRequested inside the sync to prevent a race in which the wait misses the stopper's notify. + return; + } + this.wait(ms); // spurious wakeup ok + } + } + timer.reset(); + now = timer.getLastResetTime(); + remainingNanos = nanosToDelay - now; + } + } + + private long getNanoRate() { + return NanoTimer.millisToNanos(getSampleRate()); + } + + /** + * Collect samples of any operating system statistics + * + * @param prepareOnly + * set to true if you only want to call prepareForSample + */ + private void sampleSpecialStats(boolean prepareOnly) { + List<Statistics> statsList = getStatisticsManager().getStatsList(); + for (Statistics s : statsList) { + if (stopRequested()) return; + if (s instanceof StatisticsImpl) { + ((StatisticsImpl)s).prepareForSample(); + } + } + + if (!prepareOnly && this.vmStats != null) { + if (stopRequested()) return; + this.vmStats.refresh(); + } + sampleProcessStats(prepareOnly); + } + + /** + * Check the elapsed sleep time upon wakeup, and log a warning if it is longer than the delay + * threshold. + * + * @param elapsedSleepTime duration of sleep in nanoseconds + */ + private void checkElapsedSleepTime(long elapsedSleepTime) { + if (STAT_SAMPLER_DELAY_THRESHOLD > 0) { + final long wakeupDelay = elapsedSleepTime - getNanoRate(); + if (wakeupDelay > STAT_SAMPLER_DELAY_THRESHOLD_NANOS) { + this.samplerStats.incJvmPauses(); + logger.warn(LogMarker.STATISTICS, LocalizedMessage.create(LocalizedStrings.HostStatSampler_STATISTICS_SAMPLING_THREAD_DETECTED_A_WAKEUP_DELAY_OF_0_MS_INDICATING_A_POSSIBLE_RESOURCE_ISSUE, NanoTimer.nanosToMillis(wakeupDelay))); + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/StatArchiveWriter.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/StatArchiveWriter.java index 0000000,027bde1..ef3a91a mode 000000,100644..100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/StatArchiveWriter.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/statistics/StatArchiveWriter.java @@@ -1,0 -1,732 +1,737 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.gemstone.gemfire.internal.statistics; + + import com.gemstone.gemfire.GemFireIOException; + import com.gemstone.gemfire.InternalGemFireException; + import com.gemstone.gemfire.StatisticDescriptor; + import com.gemstone.gemfire.distributed.internal.DistributionConfig; + import com.gemstone.gemfire.internal.NanoTimer; + import com.gemstone.gemfire.internal.SocketCreator; + import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + import com.gemstone.gemfire.internal.logging.LogService; + import com.gemstone.gemfire.internal.logging.log4j.LogMarker; ++import com.gemstone.gemfire.internal.net.SocketCreator; ++import com.gemstone.gemfire.internal.statistics.ResourceInstance; ++import com.gemstone.gemfire.internal.statistics.ResourceType; ++import com.gemstone.gemfire.internal.statistics.SampleHandler; ++import com.gemstone.gemfire.internal.statistics.StatArchiveDescriptor; + + import org.apache.logging.log4j.Logger; + + import java.io.*; + import java.net.UnknownHostException; + import java.util.*; + import java.util.zip.GZIPOutputStream; + + /** + * StatArchiveWriter provides APIs to write statistic snapshots to an archive + * file. + * + */ + public class StatArchiveWriter implements StatArchiveFormat, SampleHandler { + + private static final Logger logger = LogService.getLogger(); + + private static volatile String traceStatisticsName = null; + private static volatile String traceStatisticsTypeName = null; + private static volatile int traceResourceInstId = -1; + + private final boolean trace = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "stats.debug.traceStatArchiveWriter"); + + private final Set<ResourceInstance> sampleWrittenForResources = + new HashSet<ResourceInstance>(); + private final Set<ResourceInstance> addedResources = + new HashSet<ResourceInstance>(); + private final StatArchiveDescriptor archiveDescriptor; + private long initialDate; + private final OutputStream outStream; + private final MyDataOutputStream dataOut; + private final OutputStream traceOutStream; + private final PrintStream traceDataOut; + private long previousMillisTimeStamp; + private int sampleCount; + + /** + * Opens a StatArchiveWriter that will archive to the specified file. + * @throws GemFireIOException if <code>archiveName</code> can not be written to + */ + public StatArchiveWriter(StatArchiveDescriptor archiveDescriptor) { + this.archiveDescriptor = archiveDescriptor; + + if (archiveDescriptor.getArchiveName().endsWith(".gz")) { + try { + this.outStream = new GZIPOutputStream(new FileOutputStream(archiveDescriptor.getArchiveName()), 32768); + } catch (IOException ex) { + throw new GemFireIOException(LocalizedStrings.StatArchiveWriter_COULD_NOT_OPEN_0.toLocalizedString(archiveDescriptor.getArchiveName()), ex); + } + } else { + try { + this.outStream = new BufferedOutputStream(new FileOutputStream(archiveDescriptor.getArchiveName()), 32768); + } catch (IOException ex) { + throw new GemFireIOException(LocalizedStrings.StatArchiveWriter_COULD_NOT_OPEN_0.toLocalizedString(archiveDescriptor.getArchiveName()), ex); + } + } + + this.dataOut = new MyDataOutputStream(this.outStream); + + if (this.trace) { + String traceFileName = archiveDescriptor.getArchiveName() + ".trace"; + try { + this.traceOutStream = new BufferedOutputStream(new FileOutputStream(traceFileName), 32768); + } catch (IOException ex) { + throw new GemFireIOException("Could not open " + traceFileName, ex); + } + this.traceDataOut = new PrintStream(this.traceOutStream); + } else { + this.traceOutStream = null; + this.traceDataOut = null; + } + } + + public String getArchiveName() { + return this.archiveDescriptor.getArchiveName(); + } + + public void initialize(long nanosTimeStamp) { + this.previousMillisTimeStamp = initPreviousMillisTimeStamp(nanosTimeStamp); + this.initialDate = initInitialDate(); + writeHeader(this.initialDate, this.archiveDescriptor); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(getClass().getName()); + sb.append("@").append(System.identityHashCode(this)).append("{"); + sb.append("archiveName=").append(this.archiveDescriptor.getArchiveName()); + sb.append("productDescription=").append(this.archiveDescriptor.getProductDescription()); + sb.append("systemDirectoryPath=").append(this.archiveDescriptor.getSystemDirectoryPath()); + sb.append("systemId=").append(this.archiveDescriptor.getSystemId()); + sb.append("systemStartTime=").append(this.archiveDescriptor.getSystemStartTime()); + sb.append("previousMillisTimeStamp=").append(this.previousMillisTimeStamp); + sb.append("initialDate=").append(this.initialDate); + return sb.toString(); + } + + /** + * Closes the statArchiver by flushing its data to disk a closing its output stream. + * @throws GemFireIOException if the archive file could not be closed. + */ + public final void close() { + try { + this.dataOut.flush(); + if (this.trace) { + this.traceDataOut.flush(); + } + } catch (IOException ignore) { + } + try { + outStream.close(); + if (this.trace) { + this.traceOutStream.close(); + } + } catch (IOException ex) { + throw new GemFireIOException(LocalizedStrings.StatArchiveWriter_COULD_NOT_CLOSE_STATARCHIVER_FILE.toLocalizedString(), ex); + } + if (getSampleCount() == 0) { + // If we are closing an empty file go ahead and delete it. + // This prevents the fix for 46917 from leaving a bunch of + // empty gfs files around. + deleteFileIfPossible(new File(getArchiveName())); + } + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", justification="Best effort attempt to delete a GFS file without any samples.") + private static void deleteFileIfPossible(File file) { + file.delete(); + } + + /** + * Returns the number of bytes written so far to this archive. + * This does not take compression into account. + */ + public final long bytesWritten() { + return this.dataOut.getBytesWritten(); + } + + protected long initPreviousMillisTimeStamp(long nanosTimeStamp) { + return NanoTimer.nanosToMillis(nanosTimeStamp); + } + + protected long initInitialDate() { + return System.currentTimeMillis(); + } + + protected TimeZone getTimeZone() { + return Calendar.getInstance().getTimeZone(); + } + + protected String getOSInfo() { + return System.getProperty("os.name") + " " + System.getProperty("os.version"); + } + + protected String getMachineInfo() { + String machineInfo = System.getProperty("os.arch"); + try { + String hostName = SocketCreator.getHostName(SocketCreator.getLocalHost()); + machineInfo += " " + hostName; + } catch (UnknownHostException ignore) { + } + return machineInfo; + } + + private void writeHeader(long initialDate, StatArchiveDescriptor archiveDescriptor) { + if (logger.isTraceEnabled(LogMarker.STATISTICS)) { + logger.trace(LogMarker.STATISTICS, "StatArchiveWriter#writeHeader initialDate={} archiveDescriptor={}", initialDate, archiveDescriptor); + } + try { + this.dataOut.writeByte(HEADER_TOKEN); + this.dataOut.writeByte(ARCHIVE_VERSION); + this.dataOut.writeLong(initialDate); + this.dataOut.writeLong(archiveDescriptor.getSystemId()); + this.dataOut.writeLong(archiveDescriptor.getSystemStartTime()); + TimeZone timeZone = getTimeZone(); + this.dataOut.writeInt(timeZone.getRawOffset()); + this.dataOut.writeUTF(timeZone.getID()); + this.dataOut.writeUTF(archiveDescriptor.getSystemDirectoryPath()); + this.dataOut.writeUTF(archiveDescriptor.getProductDescription()); + this.dataOut.writeUTF(getOSInfo()); + this.dataOut.writeUTF(getMachineInfo()); + + if (this.trace) { + this.traceDataOut.println("writeHeader traceStatisticsName: " + traceStatisticsName); + this.traceDataOut.println("writeHeader traceStatisticsTypeName: " + traceStatisticsTypeName); + this.traceDataOut.println("writeHeader#writeByte HEADER_TOKEN: " + HEADER_TOKEN); + this.traceDataOut.println("writeHeader#writeByte ARCHIVE_VERSION: " + ARCHIVE_VERSION); + this.traceDataOut.println("writeHeader#writeLong initialDate: " + initialDate); + this.traceDataOut.println("writeHeader#writeLong archiveDescriptor.getSystemId(): " + archiveDescriptor.getSystemId()); + this.traceDataOut.println("writeHeader#writeLong archiveDescriptor.getSystemStartTime(): " + archiveDescriptor.getSystemStartTime()); + this.traceDataOut.println("writeHeader#writeInt timeZone.getRawOffset(): " + timeZone.getRawOffset()); + this.traceDataOut.println("writeHeader#writeUTF timeZone.getID(): " + timeZone.getID()); + this.traceDataOut.println("writeHeader#writeUTF archiveDescriptor.getSystemDirectoryPath(): " + archiveDescriptor.getSystemDirectoryPath()); + this.traceDataOut.println("writeHeader#writeUTF archiveDescriptor.getProductDescription(): " + archiveDescriptor.getProductDescription()); + this.traceDataOut.println("writeHeader#writeUTF getOSInfo(): " + getOSInfo()); + this.traceDataOut.println("writeHeader#writeUTF getMachineInfo(): " + getMachineInfo()); + } + } catch (IOException ex) { + throw new GemFireIOException(LocalizedStrings.StatArchiveWriter_FAILED_WRITING_HEADER_TO_STATISTIC_ARCHIVE.toLocalizedString(), ex); + } + } + + public void allocatedResourceType(ResourceType resourceType) { + if (logger.isTraceEnabled(LogMarker.STATISTICS)) { + logger.trace(LogMarker.STATISTICS, "StatArchiveWriter#allocatedResourceType resourceType={}", resourceType); + } + if (resourceType.getStatisticDescriptors().length >= ILLEGAL_STAT_OFFSET) { + throw new InternalGemFireException(LocalizedStrings.StatArchiveWriter_COULD_NOT_ARCHIVE_TYPE_0_BECAUSE_IT_HAD_MORE_THAN_1_STATISTICS.toLocalizedString(new Object[] {resourceType.getStatisticsType().getName(), Integer.valueOf(ILLEGAL_STAT_OFFSET-1)})); + } + // write the type to the archive + try { + this.dataOut.writeByte(RESOURCE_TYPE_TOKEN); + this.dataOut.writeInt(resourceType.getId()); + this.dataOut.writeUTF(resourceType.getStatisticsType().getName()); + this.dataOut.writeUTF(resourceType.getStatisticsType().getDescription()); + StatisticDescriptor[] stats = resourceType.getStatisticDescriptors(); + this.dataOut.writeShort(stats.length); + if (this.trace && (traceStatisticsTypeName == null || traceStatisticsTypeName.equals(resourceType.getStatisticsType().getName()))) { + this.traceDataOut.println("allocatedResourceType#writeByte RESOURCE_TYPE_TOKEN: " + RESOURCE_TYPE_TOKEN); + this.traceDataOut.println("allocatedResourceType#writeInt resourceType.getId(): " + resourceType.getId()); + this.traceDataOut.println("allocatedResourceType#writeUTF resourceType.getStatisticsType().getName(): " + resourceType.getStatisticsType().getName()); + this.traceDataOut.println("allocatedResourceType#writeUTF resourceType.getStatisticsType().getDescription(): " + resourceType.getStatisticsType().getDescription()); + this.traceDataOut.println("allocatedResourceType#writeShort stats.length: " + stats.length); + } + for (int i=0; i < stats.length; i++) { + this.dataOut.writeUTF(stats[i].getName()); + this.dataOut.writeByte(((StatisticDescriptorImpl)stats[i]).getTypeCode()); + this.dataOut.writeBoolean(stats[i].isCounter()); + this.dataOut.writeBoolean(stats[i].isLargerBetter()); + this.dataOut.writeUTF(stats[i].getUnit()); + this.dataOut.writeUTF(stats[i].getDescription()); + if (this.trace && (traceStatisticsTypeName == null || traceStatisticsTypeName.equals(resourceType.getStatisticsType().getName()))) { + this.traceDataOut.println("allocatedResourceType#writeUTF stats[i].getName(): " + stats[i].getName()); + this.traceDataOut.println("allocatedResourceType#writeByte ((StatisticDescriptorImpl)stats[i]).getTypeCode(): " + ((StatisticDescriptorImpl)stats[i]).getTypeCode()); + this.traceDataOut.println("allocatedResourceType#writeBoolean stats[i].isCounter(): " + stats[i].isCounter()); + this.traceDataOut.println("allocatedResourceType#writeBoolean stats[i].isLargerBetter(): " + stats[i].isLargerBetter()); + this.traceDataOut.println("allocatedResourceType#writeUTF stats[i].getUnit(): " + stats[i].getUnit()); + this.traceDataOut.println("allocatedResourceType#writeUTF stats[i].getDescription(): " + stats[i].getDescription()); + } + } + } catch (IOException ex) { + throw new GemFireIOException(LocalizedStrings.StatArchiveWriter_FAILED_WRITING_NEW_RESOURCE_TYPE_TO_STATISTIC_ARCHIVE.toLocalizedString(), ex); + } + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", justification="This is only for debugging and there is never more than one instance being traced because there is only one stat sampler.") + public void allocatedResourceInstance(ResourceInstance statResource) { + if (logger.isTraceEnabled(LogMarker.STATISTICS)) { + logger.trace(LogMarker.STATISTICS, "StatArchiveWriter#allocatedResourceInstance statResource={}", statResource); + } + if (statResource.getResourceType().getStatisticDescriptors().length >= ILLEGAL_STAT_OFFSET) { + throw new InternalGemFireException(LocalizedStrings.StatArchiveWriter_COULD_NOT_ARCHIVE_TYPE_0_BECAUSE_IT_HAD_MORE_THAN_1_STATISTICS.toLocalizedString(new Object[] {statResource.getResourceType().getStatisticsType().getName(), Integer.valueOf(ILLEGAL_STAT_OFFSET-1)})); + } + if (statResource.getStatistics().isClosed()) { + return; + } + this.addedResources.add(statResource); + try { + this.dataOut.writeByte(RESOURCE_INSTANCE_CREATE_TOKEN); + this.dataOut.writeInt(statResource.getId()); + this.dataOut.writeUTF(statResource.getStatistics().getTextId()); + this.dataOut.writeLong(statResource.getStatistics().getNumericId()); + this.dataOut.writeInt(statResource.getResourceType().getId()); + if (this.trace && (traceStatisticsName == null || traceStatisticsName.equals(statResource.getStatistics().getTextId())) && (traceStatisticsTypeName == null || traceStatisticsTypeName.equals(statResource.getResourceType().getStatisticsType().getName()))) { + traceResourceInstId = statResource.getId(); + this.traceDataOut.println("writeHeader traceResourceInstId: " + traceResourceInstId); + this.traceDataOut.println("allocatedResourceInstance#writeByte RESOURCE_INSTANCE_CREATE_TOKEN: " + RESOURCE_INSTANCE_CREATE_TOKEN); + this.traceDataOut.println("allocatedResourceInstance#writeInt statResource.getId(): " + statResource.getId()); + this.traceDataOut.println("allocatedResourceInstance#writeUTF statResource.getStatistics().getTextId(): " + statResource.getStatistics().getTextId()); + this.traceDataOut.println("allocatedResourceInstance#writeLong statResource.getStatistics().getNumericId(): " + statResource.getStatistics().getNumericId()); + this.traceDataOut.println("allocatedResourceInstance#writeInt statResource.getResourceType().getId(): " + statResource.getResourceType().getId()); + } + } catch (IOException ex) { + throw new GemFireIOException(LocalizedStrings.StatArchiveWriter_FAILED_WRITING_NEW_RESOURCE_INSTANCE_TO_STATISTIC_ARCHIVE.toLocalizedString(), ex); + } + } + + public void destroyedResourceInstance(ResourceInstance resourceInstance) { + if (logger.isTraceEnabled(LogMarker.STATISTICS)) { + logger.trace(LogMarker.STATISTICS, "StatArchiveWriter#destroyedResourceInstance resourceInstance={}", resourceInstance); + } + if (!this.addedResources.contains(resourceInstance)) { // Fix for bug #45377 + return; + } + + this.sampleWrittenForResources.remove(resourceInstance); + this.addedResources.remove(resourceInstance); + + try { + this.dataOut.writeByte(RESOURCE_INSTANCE_DELETE_TOKEN); + this.dataOut.writeInt(resourceInstance.getId()); + if (this.trace && (traceStatisticsName == null || traceStatisticsName.equals(resourceInstance.getStatistics().getTextId())) && (traceStatisticsTypeName == null || traceStatisticsTypeName.equals(resourceInstance.getResourceType().getStatisticsType().getName()))) { + this.traceDataOut.println("destroyedResourceInstance#writeByte RESOURCE_INSTANCE_DELETE_TOKEN: " + RESOURCE_INSTANCE_DELETE_TOKEN); + this.traceDataOut.println("destroyedResourceInstance#writeInt resourceInstance.getId(): " + resourceInstance.getId()); + } + } catch (IOException ex) { + throw new GemFireIOException(LocalizedStrings.StatArchiveWriter_FAILED_WRITING_DELETE_RESOURCE_INSTANCE_TO_STATISTIC_ARCHIVE.toLocalizedString(), ex); + } + } + + static long calcDelta(long previousMillis, long currentMillis) { + long delta = currentMillis - previousMillis; + if (delta <= 0) { + throw new IllegalArgumentException("Sample timestamp must be greater than previous timestamp (millisTimeStamp is " + currentMillis + ", previousMillis is " + previousMillis + " and delta is " + delta + ")."); + } + return delta; + } + + private void writeTimeStamp(long nanosTimeStamp) throws IOException { + final long millisTimeStamp = NanoTimer.nanosToMillis(nanosTimeStamp); + final long delta = calcDelta(this.previousMillisTimeStamp, millisTimeStamp); + if (logger.isTraceEnabled(LogMarker.STATISTICS)) { + logger.trace(LogMarker.STATISTICS, "StatArchiveWriter#writeTimeStamp millisTimeStamp={}, delta={}", millisTimeStamp, (int)delta); + } + if (delta > MAX_SHORT_TIMESTAMP) { + if (delta > Integer.MAX_VALUE) { + throw new InternalGemFireException(LocalizedStrings.StatArchiveWriter_TIMESTAMP_DELTA_0_WAS_GREATER_THAN_1.toLocalizedString(new Object[] {Long.valueOf(delta), Integer.valueOf(Integer.MAX_VALUE)})); + } + this.dataOut.writeShort(INT_TIMESTAMP_TOKEN); + this.dataOut.writeInt((int)delta); + if (this.trace) { + this.traceDataOut.println("writeTimeStamp#writeShort INT_TIMESTAMP_TOKEN: " + INT_TIMESTAMP_TOKEN); + this.traceDataOut.println("writeTimeStamp#writeInt (int)delta: " + (int)delta); + } + } else { + this.dataOut.writeShort((int)delta); + if (this.trace) { + this.traceDataOut.println("writeTimeStamp#writeShort (int)delta: " + (int)delta); + } + } + this.previousMillisTimeStamp = millisTimeStamp; + } + + /** + * Writes the resource instance id to the <code>dataOut</code> stream. + */ + private void writeResourceInst(int instId) throws IOException { + if (logger.isTraceEnabled(LogMarker.STATISTICS)) { + logger.trace(LogMarker.STATISTICS, "StatArchiveWriter#writeResourceInst instId={}", instId); + } + if (instId > MAX_BYTE_RESOURCE_INST_ID) { + if (instId > MAX_SHORT_RESOURCE_INST_ID) { + this.dataOut.writeByte(INT_RESOURCE_INST_ID_TOKEN); + this.dataOut.writeInt(instId); + if (this.trace && (traceResourceInstId == -1 || traceResourceInstId == instId)) { + this.traceDataOut.println("writeResourceInst#writeByte INT_RESOURCE_INST_ID_TOKEN: " + INT_RESOURCE_INST_ID_TOKEN); + if (instId == ILLEGAL_RESOURCE_INST_ID) { + this.traceDataOut.println("writeResourceInst#writeInt ILLEGAL_RESOURCE_INST_ID: " + ILLEGAL_RESOURCE_INST_ID); + } else { + this.traceDataOut.println("writeResourceInst#writeInt instId: " + instId); + } + } + } else { + this.dataOut.writeByte(SHORT_RESOURCE_INST_ID_TOKEN); + this.dataOut.writeShort(instId); + if (this.trace && (traceResourceInstId == -1 || traceResourceInstId == instId)) { + this.traceDataOut.println("writeResourceInst#writeByte SHORT_RESOURCE_INST_ID_TOKEN: " + SHORT_RESOURCE_INST_ID_TOKEN); + if (instId == ILLEGAL_RESOURCE_INST_ID) { + this.traceDataOut.println("writeResourceInst#writeShort ILLEGAL_RESOURCE_INST_ID: " + ILLEGAL_RESOURCE_INST_ID); + } else { + this.traceDataOut.println("writeResourceInst#writeShort instId: " + instId); + } + } + } + } else { + this.dataOut.writeByte(instId); + if (this.trace && (traceResourceInstId == -1 || traceResourceInstId == instId)) { + if (instId == ILLEGAL_RESOURCE_INST_ID) { + this.traceDataOut.println("writeResourceInst#writeByte ILLEGAL_RESOURCE_INST_ID: " + ILLEGAL_RESOURCE_INST_ID); + } else { + this.traceDataOut.println("writeResourceInst#writeByte instId: " + instId); + } + } + } + } + + public void sampled(long nanosTimeStamp, List<ResourceInstance> resourceInstances) { + if (logger.isTraceEnabled(LogMarker.STATISTICS)) { + logger.trace(LogMarker.STATISTICS, "StatArchiveWriter#sampled nanosTimeStamp={}, resourceInstances={}", nanosTimeStamp, resourceInstances); + } + try { + this.dataOut.writeByte(SAMPLE_TOKEN); + if (this.trace) { + this.traceDataOut.println("sampled#writeByte SAMPLE_TOKEN: " + SAMPLE_TOKEN); + } + writeTimeStamp(nanosTimeStamp); + for (ResourceInstance ri : resourceInstances) { + writeSample(ri); + } + writeResourceInst(ILLEGAL_RESOURCE_INST_ID); + this.dataOut.flush(); + if (this.trace) { + this.traceDataOut.flush(); + } + } catch (IOException ex) { + throw new GemFireIOException(LocalizedStrings.StatArchiveWriter_FAILED_WRITING_SAMPLE_TO_STATISTIC_ARCHIVE.toLocalizedString(), ex); + } + this.sampleCount++; // only inc after sample done w/o an exception thrown + } + + public int getSampleCount() { + return this.sampleCount; + } + + private void writeSample(ResourceInstance ri) throws IOException { + final boolean isDebugEnabled_STATISTICS = logger.isTraceEnabled(LogMarker.STATISTICS); + if (isDebugEnabled_STATISTICS) { + logger.trace(LogMarker.STATISTICS, "StatArchiveWriter#writeSample ri={}", ri); + } + if (this.trace && (traceStatisticsName == null || traceStatisticsName.equals(ri.getStatistics().getTextId())) && (traceStatisticsTypeName == null || traceStatisticsTypeName.equals(ri.getResourceType().getStatisticsType().getName()))) { + this.traceDataOut.println("writeSample#writeSample for ri=" + ri); + } + if (ri.getStatistics().isClosed()) { + return; + } + StatisticDescriptor[] stats = ri.getResourceType().getStatisticDescriptors(); + if (stats.length > 254) { + throw new Error("StatisticsType " + ri.getResourceType().getStatisticsType().getName() + " has too many stats: " + stats.length); + } + boolean wroteInstId = false; + boolean checkForChange = true; + + if (!this.sampleWrittenForResources.contains(ri)) { + // first time for this instance so all values need to be written + checkForChange = false; + this.sampleWrittenForResources.add(ri); + } + + long[] previousStatValues = ri.getPreviousStatValues(); + if (isDebugEnabled_STATISTICS) { + logger.trace(LogMarker.STATISTICS, "StatArchiveWriter#writeSample checkForChange={}, previousStatValues={}, stats.length={}", + checkForChange, Arrays.toString(previousStatValues), stats.length); + } + if (previousStatValues == null) { + previousStatValues = new long[stats.length]; + ri.setPreviousStatValues(previousStatValues); + } + + int statsWritten = 0; + try { + for (int i=0; i < stats.length; i++) { + long value = ri.getLatestStatValues()[i]; + if (!checkForChange || value != previousStatValues[i]) { + long delta = checkForChange ? value - previousStatValues[i] : value; + if (!wroteInstId) { + wroteInstId = true; + writeResourceInst(ri.getId()); + } + this.dataOut.writeByte(i); + if (this.trace && (traceStatisticsName == null || traceStatisticsName.equals(ri.getStatistics().getTextId())) && (traceStatisticsTypeName == null || traceStatisticsTypeName.equals(ri.getResourceType().getStatisticsType().getName()))) { + this.traceDataOut.println("writeSample#writeByte i: " + i); + } + if (isDebugEnabled_STATISTICS) { + logger.trace(LogMarker.STATISTICS, "StatArchiveWriter#writeStatValue stats[{}]={}, delta={}", i, stats[i], delta); + } + writeStatValue(stats[i], delta, this.dataOut); + if (this.trace && (traceStatisticsName == null || traceStatisticsName.equals(ri.getStatistics().getTextId())) && (traceStatisticsTypeName == null || traceStatisticsTypeName.equals(ri.getResourceType().getStatisticsType().getName()))) { + byte typeCode = ((StatisticDescriptorImpl)stats[i]).getTypeCode(); + switch(typeCode) { + case BYTE_CODE: + this.traceDataOut.println("writeStatValue#writeByte " + typeCodeToString(typeCode) + " delta: " + delta); + break; + case SHORT_CODE: + this.traceDataOut.println("writeStatValue#writeShort" + typeCodeToString(typeCode) + " delta: " + delta); + break; + case INT_CODE: + case FLOAT_CODE: + case LONG_CODE: + case DOUBLE_CODE: + this.traceDataOut.println("writeStatValue#writeCompactValue " + typeCodeToString(typeCode) + " delta: " + delta); + break; + default: + } + } + } + } + } catch (IllegalStateException closedEx) { + // resource was closed causing getStatValue to throw this exception + } + + if (wroteInstId) { + this.dataOut.writeByte(ILLEGAL_STAT_OFFSET); + if (this.trace && (traceStatisticsName == null + || traceStatisticsName.equals(ri.getStatistics().getTextId())) + && (traceStatisticsTypeName == null + || traceStatisticsTypeName.equals(ri.getResourceType().getStatisticsType().getName()))) { + this.traceDataOut.println("writeSample#writeByte ILLEGAL_STAT_OFFSET: " + ILLEGAL_STAT_OFFSET); + } + } + if (isDebugEnabled_STATISTICS) { + logger.trace(LogMarker.STATISTICS, "StatArchiveWriter#writeSample statsWritten={}", statsWritten); + } + } + + public static void writeCompactValue(long v, DataOutput dataOut) throws IOException { + if (v <= MAX_1BYTE_COMPACT_VALUE && v >= MIN_1BYTE_COMPACT_VALUE) { + dataOut.writeByte((int)v); + } else if (v <= MAX_2BYTE_COMPACT_VALUE && v >= MIN_2BYTE_COMPACT_VALUE) { + dataOut.writeByte(COMPACT_VALUE_2_TOKEN); + dataOut.writeShort((int)v); + } else { + byte[] buffer = new byte[8]; + int idx = 0; + long originalValue = v; + if (v < 0) { + while (v != -1 && v != 0) { + buffer[idx++] = (byte)(v & 0xFF); + v >>= 8; + } + // On windows v goes to zero somtimes; seems like a bug + if (v == 0) { + // when this happens we end up with a bunch of -1 bytes + // so strip off the high order ones + while (buffer[idx-1] == -1) { + idx--; + } + // System.out.print("DEBUG: originalValue=" + originalValue); + // for (int dbx=0; dbx<idx; dbx++) { + // System.out.print(" " + buffer[dbx]); + // } + // System.out.println(); + } + if ((buffer[idx-1] & 0x80) == 0) { + /* If the most significant byte does not have its high order bit set + * then add a -1 byte so we know this is a negative number + */ + buffer[idx++] = -1; + } + } else { + while (v != 0) { + buffer[idx++] = (byte)(v & 0xFF); + v >>= 8; + } + if ((buffer[idx-1] & 0x80) != 0) { + /* If the most significant byte has its high order bit set + * then add a zero byte so we know this is a positive number + */ + buffer[idx++] = 0; + } + } + if (idx <= 2) { + throw new InternalGemFireException(LocalizedStrings.StatArchiveWriter_EXPECTED_IDX_TO_BE_GREATER_THAN_2_IT_WAS_0_FOR_THE_VALUE_1.toLocalizedString(new Object[] {Integer.valueOf(idx), Long.valueOf(originalValue)})); + } + int token = COMPACT_VALUE_2_TOKEN + (idx - 2); + dataOut.writeByte(token); + for (int i=idx-1; i >= 0; i--) { + dataOut.writeByte(buffer[i]); + } + } + } + + public static long readCompactValue(DataInput dataIn) throws IOException { + long v = dataIn.readByte(); + boolean dump = false; + if (dump) { + System.out.print("compactValue(byte1)=" + v); + } + if (v < MIN_1BYTE_COMPACT_VALUE) { + if (v == COMPACT_VALUE_2_TOKEN) { + v = dataIn.readShort(); + if (dump) { + System.out.print("compactValue(short)=" + v); + } + } else { + int bytesToRead = ((byte)v - COMPACT_VALUE_2_TOKEN) + 2; + v = dataIn.readByte(); // note the first byte will be a signed byte. + if (dump) { + System.out.print("compactValue(" + bytesToRead + ")=" + v); + } + bytesToRead--; + while (bytesToRead > 0) { + v <<= 8; + v |= dataIn.readUnsignedByte(); + bytesToRead--; + } + } + } + return v; + } + + protected static void writeStatValue(StatisticDescriptor f, long v, DataOutput dataOut) throws IOException { + byte typeCode = ((StatisticDescriptorImpl)f).getTypeCode(); + writeStatValue(typeCode, v, dataOut); + } + + public static void writeStatValue(byte typeCode, long v, DataOutput dataOut) throws IOException { + switch(typeCode) { + case BYTE_CODE: + dataOut.writeByte((int)v); + break; + case SHORT_CODE: + dataOut.writeShort((int)v); + break; + case INT_CODE: + case FLOAT_CODE: + case LONG_CODE: + case DOUBLE_CODE: + writeCompactValue(v, dataOut); + break; + default: + throw new InternalGemFireException(LocalizedStrings.StatArchiveWriter_UNEXPECTED_TYPE_CODE_0.toLocalizedString(Byte.valueOf(typeCode))); + } + } + + protected static void setTraceFilter(String traceStatisticsName, String traceStatisticsTypeName) { + StatArchiveWriter.traceStatisticsName = traceStatisticsName; + StatArchiveWriter.traceStatisticsTypeName = traceStatisticsTypeName; + StatArchiveWriter.traceResourceInstId = -1; + } + + protected static void clearTraceFilter() { + StatArchiveWriter.traceStatisticsName = null; + StatArchiveWriter.traceStatisticsTypeName = null; + StatArchiveWriter.traceResourceInstId = -1; + } + + private static String typeCodeToString(byte typeCode) { + switch(typeCode) { + case BYTE_CODE: + return "BYTE_CODE"; + case SHORT_CODE: + return "SHORT_CODE"; + case INT_CODE: + return "INT_CODE"; + case FLOAT_CODE: + return "FLOAT_CODE"; + case LONG_CODE: + return "LONG_CODE"; + case DOUBLE_CODE: + return "DOUBLE_CODE"; + default: + return "unknown typeCode " + typeCode; + } + } + + private static class MyDataOutputStream implements DataOutput { + private long bytesWritten = 0; + private final DataOutputStream dataOut; + + public MyDataOutputStream(OutputStream out) { + this.dataOut = new DataOutputStream(out); + } + + public final long getBytesWritten() { + return this.bytesWritten; + } + public final void flush() throws IOException { + this.dataOut.flush(); + } + @SuppressWarnings("unused") + public final void close() throws IOException { + this.dataOut.close(); + } + + public final void write(int b) throws IOException { + throw new RuntimeException(LocalizedStrings.StatArchiveWriter_METHOD_UNIMPLEMENTED.toLocalizedString()); + } + public final void write(byte[] b, int off, int len) throws IOException { + throw new RuntimeException(LocalizedStrings.StatArchiveWriter_METHOD_UNIMPLEMENTED.toLocalizedString()); + } + public final void write(byte[] b) throws IOException { + throw new RuntimeException(LocalizedStrings.StatArchiveWriter_METHOD_UNIMPLEMENTED.toLocalizedString()); + } + public final void writeBytes(String v) throws IOException { + throw new RuntimeException(LocalizedStrings.StatArchiveWriter_METHOD_UNIMPLEMENTED.toLocalizedString()); + } + public final void writeChar(int v) throws IOException { + throw new RuntimeException(LocalizedStrings.StatArchiveWriter_METHOD_UNIMPLEMENTED.toLocalizedString()); + } + public final void writeChars(String v) throws IOException { + throw new RuntimeException(LocalizedStrings.StatArchiveWriter_METHOD_UNIMPLEMENTED.toLocalizedString()); + } + public final void writeDouble(double v) throws IOException { + throw new RuntimeException(LocalizedStrings.StatArchiveWriter_METHOD_UNIMPLEMENTED.toLocalizedString()); + } + public final void writeFloat(float v) throws IOException { + throw new RuntimeException(LocalizedStrings.StatArchiveWriter_METHOD_UNIMPLEMENTED.toLocalizedString()); + } + + public final void writeBoolean(boolean v) throws IOException { + this.dataOut.writeBoolean(v); + this.bytesWritten += 1; + } + public final void writeByte(int v) throws IOException { + this.dataOut.writeByte(v); + this.bytesWritten += 1; + } + public final void writeShort(int v) throws IOException { + this.dataOut.writeShort(v); + this.bytesWritten += 2; + } + public final void writeInt(int v) throws IOException { + this.dataOut.writeInt(v); + this.bytesWritten += 4; + } + public final void writeLong(long v) throws IOException { + this.dataOut.writeLong(v); + this.bytesWritten += 8; + } + public final void writeUTF(String v) throws IOException { + this.dataOut.writeUTF(v); + this.bytesWritten += v.length() + 2; // this is the minimum. The max is v.size()*3 +2 + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java index d5e2c16,a38244c..800a203 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java @@@ -1199,9 -1166,9 +1199,9 @@@ public class TCPConduit implements Runn * Returns true if member is part of view, false if membership is not confirmed before timeout. */ public boolean waitForMembershipCheck(InternalDistributedMember remoteId) { - return membershipManager.waitForMembershipCheck(remoteId); + return membershipManager.waitForNewMember(remoteId); } - + /** * simulate being sick */ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java index a3a65b0,f3c771f..d4baf0e --- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java @@@ -57,10 -57,8 +57,11 @@@ import com.gemstone.gemfire.internal.Ge import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.lang.StringUtils; import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.net.SSLConfigurationFactory; +import com.gemstone.gemfire.internal.net.SSLEnabledComponent; +import com.gemstone.gemfire.internal.net.SocketCreator; +import com.gemstone.gemfire.internal.net.SocketCreatorFactory; + import com.gemstone.gemfire.internal.security.GeodeSecurityUtil; import com.gemstone.gemfire.internal.security.shiro.JMXShiroAuthenticator; import com.gemstone.gemfire.internal.tcp.TCPConduit; import com.gemstone.gemfire.management.ManagementException; @@@ -92,9 -90,10 +93,10 @@@ public class ManagementAgent */ private boolean running = false; private Registry registry; - private JMXConnectorServer cs; + private JMXConnectorServer jmxConnectorServer; private JMXShiroAuthenticator shiroAuthenticator; private final DistributionConfig config; + // TODO: add this -- private boolean isSecured; private boolean isHttpServiceRunning = false; /** @@@ -438,11 -453,10 +440,10 @@@ } }; - String shiroConfig = this.config.getShiroInit(); - if (!StringUtils.isBlank(shiroConfig) || isIntegratedSecurity()) { + if (isIntegratedSecurity()) { shiroAuthenticator = new JMXShiroAuthenticator(); env.put(JMXConnectorServer.AUTHENTICATOR, shiroAuthenticator); - cs.addNotificationListener(shiroAuthenticator, null, cs.getAttributes()); + jmxConnectorServer.addNotificationListener(shiroAuthenticator, null, jmxConnectorServer.getAttributes()); // always going to assume authorization is needed as well, if no custom AccessControl, then the CustomAuthRealm // should take care of that MBeanServerWrapper mBeanServerWrapper = new MBeanServerWrapper(); @@@ -498,15 -514,14 +499,14 @@@ private boolean isIntegratedSecurity() { - String factoryName = config.getSecurityManager(); - return factoryName != null && !factoryName.isEmpty(); + return GeodeSecurityUtil.isJmxSecurityRequired(); } - private static class GemFireRMIClientSocketFactory implements RMIClientSocketFactory, - Serializable { + private static class GemFireRMIClientSocketFactory implements RMIClientSocketFactory, Serializable { + private static final long serialVersionUID = -7604285019188827617L; - private/* final hack to prevent serialization */transient SocketCreator sc; + private/* final hack to prevent serialization */ transient SocketCreator sc; public GemFireRMIClientSocketFactory(SocketCreator sc) { this.sc = sc; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java index 75d4d56,e835bab..110d447 mode 100755,100644..100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceImplJUnitTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dfbc88b2/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionConfigJUnitTest.java ---------------------------------------------------------------------- diff --cc geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionConfigJUnitTest.java index a843457,6d6f36d..47cc08c --- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionConfigJUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionConfigJUnitTest.java @@@ -78,7 -81,7 +81,7 @@@ public class DistributionConfigJUnitTes @Test public void testGetAttributeNames() { String[] attNames = AbstractDistributionConfig._getAttNames(); - assertEquals(attNames.length, 155); - assertEquals(attNames.length, 144); ++ assertEquals(attNames.length, 156); List boolList = new ArrayList(); List intList = new ArrayList(); @@@ -353,36 -354,76 +357,106 @@@ props.put("security-username", "testName"); DistributionConfig config = new DistributionConfigImpl(props); - assertEquals(config.getSecurityProps().size(), 4); + // SECURITY_ENABLED_COMPONENTS is automatically added to getSecurityProps + assertEquals(config.getSecurityProps().size(), 5); + } + + @Test + public void securityEnabledComponentsDefaultShouldBeAll() throws Exception { + Properties props = new Properties(); + props.put(SECURITY_MANAGER, SampleSecurityManager.class.getName()); + + DistributionConfig config = new DistributionConfigImpl(props); + + assertThat(config.getSecurityEnabledComponents()).contains(SecurableComponents.ALL); + } + + @Test + public void oneSecurityEnabledComponent() throws Exception { + Properties props = new Properties(); + props.put(SECURITY_MANAGER, SampleSecurityManager.class.getName()); + props.put(SECURITY_ENABLED_COMPONENTS, SecurableComponents.JMX); + + DistributionConfig config = new DistributionConfigImpl(props); + + assertThat(config.getSecurityEnabledComponents()) + .doesNotContain(SecurableComponents.ALL) + .doesNotContain(SecurableComponents.GATEWAY) + .doesNotContain(SecurableComponents.SERVER) + .doesNotContain(SecurableComponents.HTTP_SERVICE) + .doesNotContain(SecurableComponents.CLUSTER) + .contains(SecurableComponents.JMX); + } + + @Test + public void twoSecurityEnabledComponents() throws Exception { + Properties props = new Properties(); + props.put(SECURITY_MANAGER, SampleSecurityManager.class.getName()); + props.put(SECURITY_ENABLED_COMPONENTS, SecurableComponents.JMX + "," + SecurableComponents.CLUSTER); + + DistributionConfig config = new DistributionConfigImpl(props); + + assertThat(config.getSecurityEnabledComponents()) + .doesNotContain(SecurableComponents.ALL) + .doesNotContain(SecurableComponents.GATEWAY) + .doesNotContain(SecurableComponents.SERVER) + .doesNotContain(SecurableComponents.HTTP_SERVICE) + .contains(SecurableComponents.CLUSTER) + .contains(SecurableComponents.JMX); + } + + @Test + public void multipleSecurityEnabledComponents() throws Exception { + Properties props = new Properties(); + props.put(SECURITY_MANAGER, SampleSecurityManager.class.getName()); + props.put(SECURITY_ENABLED_COMPONENTS, SecurableComponents.JMX + "," + SecurableComponents.CLUSTER+ "," + SecurableComponents.HTTP_SERVICE); + + DistributionConfig config = new DistributionConfigImpl(props); + + assertThat(config.getSecurityEnabledComponents()) + .doesNotContain(SecurableComponents.ALL) + .doesNotContain(SecurableComponents.GATEWAY) + .doesNotContain(SecurableComponents.SERVER) + .contains(SecurableComponents.HTTP_SERVICE) + .contains(SecurableComponents.CLUSTER) + .contains(SecurableComponents.JMX); + } + + @Test + public void nonExistentSecurityEnabledComponentShouldThrow() throws Exception { + Properties props = new Properties(); + props.put(SECURITY_ENABLED_COMPONENTS, "notapplicable"); + + assertThatThrownBy(() -> new DistributionConfigImpl(props)).isExactlyInstanceOf(GemFireConfigException.class); } + + @Test + public void testSSLEnabledComponents() { + Properties props = new Properties(); + props.put(MCAST_PORT, "0"); + props.put(SSL_ENABLED_COMPONENTS, "all"); + + DistributionConfig config = new DistributionConfigImpl(props); + } + + @Test(expected = IllegalArgumentException.class) + public void testSSLEnabledComponentsLegacyFail() { + Properties props = new Properties(); + props.put(MCAST_PORT, "0"); + props.put(CLUSTER_SSL_ENABLED, "true"); + props.put(HTTP_SERVICE_SSL_ENABLED, "true"); + props.put(SSL_ENABLED_COMPONENTS, "all"); + + DistributionConfig config = new DistributionConfigImpl(props); + } + @Test + public void testSSLEnabledComponentsLegacyPass() { + Properties props = new Properties(); + props.put(MCAST_PORT, "0"); + props.put(CLUSTER_SSL_ENABLED, "true"); + props.put(HTTP_SERVICE_SSL_ENABLED, "true"); + props.put(SSL_ENABLED_COMPONENTS, ""); + + DistributionConfig config = new DistributionConfigImpl(props); + } }
