http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatSamplerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatSamplerIntegrationTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatSamplerIntegrationTest.java new file mode 100755 index 0000000..1742b69 --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatSamplerIntegrationTest.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.statistics; + +import static com.jayway.awaitility.Awaitility.*; +import static java.util.concurrent.TimeUnit.*; +import static org.junit.Assert.*; + +import java.io.File; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.RestoreSystemProperties; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + +import com.gemstone.gemfire.CancelCriterion; +import com.gemstone.gemfire.StatisticDescriptor; +import com.gemstone.gemfire.Statistics; +import com.gemstone.gemfire.StatisticsType; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.statistics.StatArchiveReader.StatValue; +import com.gemstone.gemfire.test.junit.categories.IntegrationTest; + +/** + * Integration tests for statistics sampling. + * + * @since GemFire 7.0 + */ +@Category(IntegrationTest.class) +public class StatSamplerIntegrationTest { + + private static final Logger logger = LogService.getLogger(); + + private Map<String,String> statisticTypes; + private Map<String,Map<String,Number>> allStatistics; + + @Rule + public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public TestName testName = new TestName(); + + @Before + public void setUp() { + this.statisticTypes = new HashMap<String,String>(); + this.allStatistics = new HashMap<String,Map<String,Number>>(); + } + + @After + public void tearDown() { + this.statisticTypes = null; + this.allStatistics = null; + StatisticsTypeFactoryImpl.clear(); + StatArchiveWriter.clearTraceFilter(); + } + + private String getName() { + return getClass().getSimpleName() + "_" + testName.getMethodName(); + } + + @Test + public void testStatSampler() throws Exception { + StatArchiveWriter.setTraceFilter("st1_1", "ST1"); + + File folder = temporaryFolder.newFolder(); + String archiveFileName = folder.getAbsolutePath() + File.separator + getName() + ".gfs"; + + System.setProperty("stats.log-level", "config"); + System.setProperty("stats.disable", "false"); + System.setProperty("stats.name", getName()); + System.setProperty("stats.archive-file", archiveFileName); + System.setProperty("stats.file-size-limit", "0"); + System.setProperty("stats.disk-space-limit", "0"); + System.setProperty("stats.sample-rate", "100"); + + final CancelCriterion stopper = new CancelCriterion() { + public String cancelInProgress() { + return null; + } + public RuntimeException generateCancelledException(Throwable e) { + return null; + } + }; + final LocalStatisticsFactory factory = new LocalStatisticsFactory(stopper); + final StatisticDescriptor[] statsST1 = new StatisticDescriptor[] { + factory.createDoubleCounter("double_counter_1", "d1", "u1"), + factory.createDoubleCounter("double_counter_2", "d2", "u2", true), + factory.createDoubleGauge( "double_gauge_3", "d3", "u3"), + factory.createDoubleGauge( "double_gauge_4", "d4", "u4", false), + factory.createIntCounter( "int_counter_5", "d5", "u5"), + factory.createIntCounter( "int_counter_6", "d6", "u6", true), + factory.createIntGauge( "int_gauge_7", "d7", "u7"), + factory.createIntGauge( "int_gauge_8", "d8", "u8", false), + factory.createLongCounter( "long_counter_9", "d9", "u9"), + factory.createLongCounter( "long_counter_10", "d10", "u10", true), + factory.createLongGauge( "long_gauge_11", "d11", "u11"), + factory.createLongGauge( "long_gauge_12", "d12", "u12", false), + factory.createLongGauge( "sampled_long", "d13", "u13", false), + factory.createIntGauge( "sampled_int", "d14", "u14", false), + factory.createDoubleGauge( "sampled_double", "d15", "u15", false) + }; + final StatisticsType ST1 = factory.createType("ST1", "ST1", statsST1); + final Statistics st1_1 = factory.createAtomicStatistics(ST1, "st1_1", 1); + st1_1.setIntSupplier("sampled_int", () -> 5); + getOrCreateExpectedValueMap(st1_1).put("sampled_int", 5); + st1_1.setLongSupplier("sampled_long", () -> 6); + getOrCreateExpectedValueMap(st1_1).put("sampled_long", 6); + st1_1.setDoubleSupplier("sampled_double", () -> 7.0); + getOrCreateExpectedValueMap(st1_1).put("sampled_double", 7.0); + + await("awaiting StatSampler readiness").atMost(30, SECONDS).until(() -> hasSamplerStatsInstances(factory)); + + Statistics[] samplerStatsInstances = factory.findStatisticsByTextId("statSampler"); + assertNotNull(samplerStatsInstances); + assertEquals(1, samplerStatsInstances.length); + final Statistics samplerStats = samplerStatsInstances[0]; + + incDouble(st1_1, "double_counter_1", 1); + incDouble(st1_1, "double_gauge_3", 3); + incInt(st1_1, "int_counter_5", 5); + incInt(st1_1, "int_gauge_7", 7); + incLong(st1_1, "long_counter_9", 9); + incLong(st1_1, "long_gauge_11", 11); + + awaitStatSample(samplerStats); + + incDouble(st1_1, "double_counter_1", 1); + incDouble(st1_1, "double_counter_2", 1); + incDouble(st1_1, "double_gauge_3", 1); + incDouble(st1_1, "double_gauge_4", 1); + incInt(st1_1, "int_counter_5", 1); + incInt(st1_1, "int_counter_6", 1); + + awaitStatSample(samplerStats); + + incDouble(st1_1, "double_counter_1", 1); + incDouble(st1_1, "double_counter_2", 1); + incDouble(st1_1, "double_gauge_3", 1); + incDouble(st1_1, "double_gauge_4", 1); + incInt(st1_1, "int_counter_5", 1); + incInt(st1_1, "int_counter_6", 1); + incInt(st1_1, "int_gauge_7", 1); + incInt(st1_1, "int_gauge_8", 1); + incLong(st1_1, "long_counter_9", 1); + incLong(st1_1, "long_counter_10", 1); + incLong(st1_1, "long_gauge_11", 1); + incLong(st1_1, "long_gauge_12", 1); + + awaitStatSample(samplerStats); + + incDouble(st1_1, "double_counter_1", 1); + incDouble(st1_1, "double_counter_2", 1); + incDouble(st1_1, "double_gauge_3", -1); + incDouble(st1_1, "double_gauge_4", 1); + incInt(st1_1, "int_counter_5", 1); + incInt(st1_1, "int_counter_6", 1); + incInt(st1_1, "int_gauge_7", -1); + incInt(st1_1, "int_gauge_8", 1); + incLong(st1_1, "long_counter_9", 1); + incLong(st1_1, "long_counter_10", 1); + incLong(st1_1, "long_gauge_11", -1); + incLong(st1_1, "long_gauge_12", 1); + + awaitStatSample(samplerStats); + + incDouble(st1_1, "double_counter_1", 1); + incDouble(st1_1, "double_counter_2", 1); + incDouble(st1_1, "double_gauge_3", 1); + incDouble(st1_1, "double_gauge_4", 1); + incInt(st1_1, "int_counter_5", 1); + incInt(st1_1, "int_counter_6", 1); + incInt(st1_1, "int_gauge_7", 1); + incInt(st1_1, "int_gauge_8", 1); + incLong(st1_1, "long_counter_9", 1); + incLong(st1_1, "long_counter_10", 1); + incLong(st1_1, "long_gauge_11", 1); + incLong(st1_1, "long_gauge_12", 1); + + awaitStatSample(samplerStats); + awaitStatSample(samplerStats); + awaitStatSample(samplerStats); + + incDouble(st1_1, "double_counter_1", 1); + incDouble(st1_1, "double_gauge_3", 3); + incInt(st1_1, "int_counter_5", 5); + incInt(st1_1, "int_gauge_7", 7); + incLong(st1_1, "long_counter_9", 9); + incLong(st1_1, "long_gauge_11", 11); + + awaitStatSample(samplerStats); + + incDouble(st1_1, "double_counter_1", 1); + incDouble(st1_1, "double_counter_2", 1); + incDouble(st1_1, "double_gauge_3", 1); + incDouble(st1_1, "double_gauge_4", 1); + incInt(st1_1, "int_counter_5", 1); + incInt(st1_1, "int_counter_6", 1); + + awaitStatSample(samplerStats); + + incDouble(st1_1, "double_counter_1", 1); + incDouble(st1_1, "double_counter_2", 1); + incDouble(st1_1, "double_gauge_3", 1); + incDouble(st1_1, "double_gauge_4", 1); + incInt(st1_1, "int_counter_5", 1); + incInt(st1_1, "int_counter_6", 1); + incInt(st1_1, "int_gauge_7", 1); + incInt(st1_1, "int_gauge_8", 1); + incLong(st1_1, "long_counter_9", 1); + incLong(st1_1, "long_counter_10", 1); + incLong(st1_1, "long_gauge_11", 1); + incLong(st1_1, "long_gauge_12", 1); + + awaitStatSample(samplerStats); + + incDouble(st1_1, "double_counter_1", 1); + incDouble(st1_1, "double_counter_2", 1); + incDouble(st1_1, "double_gauge_3", -1); + incDouble(st1_1, "double_gauge_4", 1); + incInt(st1_1, "int_counter_5", 1); + incInt(st1_1, "int_counter_6", 1); + incInt(st1_1, "int_gauge_7", -1); + incInt(st1_1, "int_gauge_8", 1); + incLong(st1_1, "long_counter_9", 1); + incLong(st1_1, "long_counter_10", 1); + incLong(st1_1, "long_gauge_11", -1); + incLong(st1_1, "long_gauge_12", 1); + + awaitStatSample(samplerStats); + + incDouble(st1_1, "double_counter_1", 1); + incDouble(st1_1, "double_counter_2", 1); + incDouble(st1_1, "double_gauge_3", 1); + incDouble(st1_1, "double_gauge_4", 1); + incInt(st1_1, "int_counter_5", 1); + incInt(st1_1, "int_counter_6", 1); + incInt(st1_1, "int_gauge_7", 1); + incInt(st1_1, "int_gauge_8", 1); + incLong(st1_1, "long_counter_9", 1); + incLong(st1_1, "long_counter_10", 1); + incLong(st1_1, "long_gauge_11", 1); + incLong(st1_1, "long_gauge_12", 1); + + awaitStatSample(samplerStats); + + factory.close(); + + final File archiveFile = new File(System.getProperty("stats.archive-file")); + assertTrue(archiveFile.exists()); + final StatArchiveReader reader = new StatArchiveReader( + new File[]{archiveFile}, null, false); + + List resources = reader.getResourceInstList(); + for (Iterator iter = resources.iterator(); iter.hasNext();) { + StatArchiveReader.ResourceInst ri = (StatArchiveReader.ResourceInst) iter.next(); + String resourceName = ri.getName(); + assertNotNull(resourceName); + + if (!resourceName.equals("st1_1")) { + logger.info("testStatSampler skipping {}", resourceName); + continue; + } + + String expectedStatsType = this.statisticTypes.get(resourceName); + assertNotNull(expectedStatsType); + assertEquals(expectedStatsType, ri.getType().getName()); + + Map<String,Number> expectedStatValues = this.allStatistics.get(resourceName); + assertNotNull(expectedStatValues); + + StatValue[] statValues = ri.getStatValues(); + for (int i = 0; i < statValues.length; i++) { + String statName = ri.getType().getStats()[i].getName(); + assertNotNull(statName); + assertNotNull(expectedStatValues.get(statName)); + + assertEquals(statName, statValues[i].getDescriptor().getName()); + + statValues[i].setFilter(StatValue.FILTER_NONE); + //double[] rawSnapshots = statValues[i].getRawSnapshots(); + assertEquals("Value " + i + " for " + statName + " is wrong: " + expectedStatValues, + expectedStatValues.get(statName).doubleValue(), + statValues[i].getSnapshotsMostRecent(), + 0); + } + } + } + + private boolean hasSamplerStatsInstances(final LocalStatisticsFactory factory) { + Statistics[] samplerStatsInstances = factory.findStatisticsByTextId("statSampler"); + return samplerStatsInstances != null && samplerStatsInstances.length > 0; + } + + private void awaitStatSample(final Statistics samplerStats) throws InterruptedException { + int startSampleCount = samplerStats.getInt("sampleCount"); + await("awaiting stat sample").atMost(30, SECONDS).until(() -> samplerStats.getInt("sampleCount") > startSampleCount); + } + + private void incDouble(Statistics statistics, String stat, double value) { + assertFalse(statistics.isClosed()); + Map<String, Number> statValues = getOrCreateExpectedValueMap(statistics); + statistics.incDouble(stat, value); + statValues.put(stat, statistics.getDouble(stat)); + if (this.statisticTypes.get(statistics.getTextId()) == null) { + this.statisticTypes.put(statistics.getTextId(), statistics.getType().getName()); + } + } + + private void incInt(Statistics statistics, String stat, int value) { + assertFalse(statistics.isClosed()); + Map<String, Number> statValues = getOrCreateExpectedValueMap(statistics); + statistics.incInt(stat, value); + statValues.put(stat, statistics.getInt(stat)); + if (this.statisticTypes.get(statistics.getTextId()) == null) { + this.statisticTypes.put(statistics.getTextId(), statistics.getType().getName()); + } + } + + private Map<String, Number> getOrCreateExpectedValueMap(final Statistics statistics) { + Map<String,Number> statValues = this.allStatistics.get(statistics.getTextId()); + if (statValues == null) { + statValues = new HashMap<String,Number>(); + this.allStatistics.put(statistics.getTextId(), statValues); + } + return statValues; + } + + private void incLong(Statistics statistics, String stat, long value) { + assertFalse(statistics.isClosed()); + Map<String, Number> statValues = getOrCreateExpectedValueMap(statistics); + statistics.incLong(stat, value); + statValues.put(stat, statistics.getLong(stat)); + if (this.statisticTypes.get(statistics.getTextId()) == null) { + this.statisticTypes.put(statistics.getTextId(), statistics.getType().getName()); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatSamplerTestCase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatSamplerTestCase.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatSamplerTestCase.java new file mode 100755 index 0000000..9b43dde --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatSamplerTestCase.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.statistics; + +import static org.junit.Assert.*; + +import java.io.File; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.Statistics; +import com.gemstone.gemfire.StatisticsType; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.util.StopWatch; + +/** + * Integration TestCase for StatSampler. + * + * @since GemFire 7.0 + */ +public abstract class StatSamplerTestCase { + + protected static final Logger logger = LogService.getLogger(); + + protected abstract StatisticsManager getStatisticsManager(); + + protected int getStatListModCount() { + return getStatisticsManager().getStatListModCount(); + } + + protected List<Statistics> getStatsList() { + return getStatisticsManager().getStatsList(); + } + + protected static void waitForFileToExist(final File file, final long millis, final long sleep) { + boolean done = false; + try { + for (StopWatch time = new StopWatch(true); !done && time.elapsedTimeMillis() < millis; done = (file.exists())) { + Thread.sleep(sleep); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + assertTrue("Waiting for file to exist: " + file, done); + } + + protected static void waitForFileToDelete(final File file, final long millis, final long sleep) { + boolean done = false; + try { + for (StopWatch time = new StopWatch(true); !done && time.elapsedTimeMillis() < millis; done = (!file.exists())) { + Thread.sleep(sleep); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + assertTrue("Waiting for file to be deleted: " + file, done); + } + + protected static void waitForExpectedStatValue(final Statistics statSamplerStats, final String statName, final int expectedStatValue, final long millis, final long sleep) { + boolean done = false; + try { + for (StopWatch time = new StopWatch(true); !done && time.elapsedTimeMillis() < millis; done = (statSamplerStats.getInt(statName) >= expectedStatValue)) { + Thread.sleep(sleep); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + assertTrue("Waiting for " + statName + " >= " + expectedStatValue, done); + } + + protected static void waitForStatSample(final Statistics statSamplerStats, final int samples, final long millis, final long sleep) { + waitForExpectedStatValue(statSamplerStats, "sampleCount", samples, millis, sleep); + } + + protected static void assertStatValueDoesNotChange(final Statistics statSamplerStats, final String statName, final int expectedStatValue, final long millis, final long sleep) { + boolean done = false; + try { + for (StopWatch time = new StopWatch(true); !done && time.elapsedTimeMillis() < millis; done = (statSamplerStats.getInt(statName) != expectedStatValue)) { + Thread.sleep(sleep); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + assertFalse("Waiting to assert that " + statName + " does not change from " + expectedStatValue, done); + } + + protected static class AllStatistics { + + private final HostStatSampler statSampler; + private final Map<StatisticsType, Set<Statistics>> allStatistics; + + protected AllStatistics(HostStatSampler statSampler) throws InterruptedException { + this.statSampler = statSampler; + this.allStatistics = initAllStatistics(); + } + + private Map<StatisticsType, Set<Statistics>> initAllStatistics() throws InterruptedException { + assertTrue(this.statSampler.waitForInitialization(5000)); + + Map<StatisticsType, Set<Statistics>> statsTypeToStats = new HashMap<StatisticsType, Set<Statistics>>(); + Statistics[] stats = this.statSampler.getStatistics(); + for (int i = 0; i < stats.length; i++) { + StatisticsType statsType = stats[i].getType(); + Set<Statistics> statsSet = statsTypeToStats.get(statsType); + if (statsSet == null) { + statsSet = new HashSet<Statistics>(); + statsSet.add(stats[i]); + statsTypeToStats.put(statsType, statsSet); + } else { + statsSet.add(stats[i]); + } + } + return statsTypeToStats; + } + + protected boolean containsStatisticsType(StatisticsType type) { + throw new UnsupportedOperationException("TODO"); + } + + protected boolean containsStatisticsType(String typeName) throws InterruptedException { + for (StatisticsType statType : this.allStatistics.keySet()) { + if (statType.getName().equals(typeName)) { + return true; + } + } + return false; + } + + protected boolean containsStatistics(Statistics statistics) { + throw new UnsupportedOperationException("TODO"); + } + + protected boolean containsStatistics(String instanceName) throws InterruptedException { + for (StatisticsType statType : this.allStatistics.keySet()) { + for (Statistics statistics : this.allStatistics.get(statType)) { + if (statistics.getTextId().equals(instanceName)) { + return true; + } + } + } + return false; + } + + /** + * Statistics[0]: typeName=StatSampler instanceName=statSampler + * Statistics[1]: typeName=VMStats instanceName=vmStats + * Statistics[2]: typeName=VMMemoryUsageStats instanceName=vmHeapMemoryStats + * Statistics[3]: typeName=VMMemoryUsageStats instanceName=vmNonHeapMemoryStats + * Statistics[4]: typeName=VMMemoryPoolStats instanceName=Code Cache-Non-heap memory + * Statistics[5]: typeName=VMMemoryPoolStats instanceName=PS Eden Space-Heap memory + * Statistics[6]: typeName=VMMemoryPoolStats instanceName=PS Survivor Space-Heap memory + * Statistics[7]: typeName=VMMemoryPoolStats instanceName=PS Old Gen-Heap memory + * Statistics[8]: typeName=VMMemoryPoolStats instanceName=PS Perm Gen-Non-heap memory + * Statistics[9]: typeName=VMGCStats instanceName=PS Scavenge + * Statistics[10]: typeName=VMGCStats instanceName=PS MarkSweep + * Statistics[11]: typeName=LinuxSystemStats instanceName=kuwait.gemstone.com + * Statistics[12]: typeName=LinuxProcessStats instanceName=javaApp0-proc + */ + protected void dumpStatistics() throws InterruptedException { + Statistics[] stats = this.statSampler.getStatistics(); + for (int i = 0; i < stats.length; i++) { + logger.info("Statistics[{}]: typeName={} instanceName={}", i, stats[i].getType().getName(), stats[i].getTextId()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsDUnitTest.java deleted file mode 100755 index 1e7a4e4..0000000 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsDUnitTest.java +++ /dev/null @@ -1,947 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.statistics; - -import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.StatisticDescriptor; -import com.gemstone.gemfire.Statistics; -import com.gemstone.gemfire.StatisticsFactory; -import com.gemstone.gemfire.StatisticsType; -import com.gemstone.gemfire.StatisticsTypeFactory; -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.CacheException; -import com.gemstone.gemfire.cache.CacheListener; -import com.gemstone.gemfire.cache.EntryEvent; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.RegionEvent; -import com.gemstone.gemfire.cache.RegionFactory; -import com.gemstone.gemfire.cache.Scope; -import com.gemstone.gemfire.cache.util.CacheListenerAdapter; -import com.gemstone.gemfire.cache.util.RegionMembershipListenerAdapter; -import com.gemstone.gemfire.cache30.CacheSerializableRunnable; -import com.gemstone.gemfire.distributed.DistributedMember; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; -import com.gemstone.gemfire.internal.GemFireStatSampler; -import com.gemstone.gemfire.internal.NanoTimer; -import com.gemstone.gemfire.internal.StatArchiveReader; -import com.gemstone.gemfire.internal.StatArchiveReader.StatSpec; -import com.gemstone.gemfire.internal.StatArchiveReader.StatValue; -import com.gemstone.gemfire.internal.StatSamplerStats; -import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl; -import com.gemstone.gemfire.test.dunit.Assert; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.Host; -import com.gemstone.gemfire.test.dunit.Invoke; -import com.gemstone.gemfire.test.dunit.SerializableCallable; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.dunit.WaitCriterion; -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -/** - * Integration tests for Statistics. VM0 performs puts and VM1 receives - * updates. Both use custom statistics for start/end with increment to - * add up puts and updates. Then validation tests values in stat resource - * instances and uses StatArchiveReader. Both are tested against static - * counters in both VMs. - * <p/> - * This test mimics hydratest/locators/cacheDS.conf in an attempt to reproduce - * bug #45478. So far this test passes consistently. - * - * @since GemFire 7.0 - */ -@SuppressWarnings("serial") -@Category(DistributedTest.class) -public class StatisticsDUnitTest extends JUnit4CacheTestCase { - - private static final String dir = "StatisticsDUnitTest"; - - private static final int MAX_PUTS = 1000; - private static final int NUM_KEYS = 100; - private static final int NUM_PUB_THREADS = 2; - private static final int NUM_PUBS = 2; - private static final boolean RANDOMIZE_PUTS = true; - - private static AtomicInteger updateEvents = new AtomicInteger(); - - private static AtomicInteger puts = new AtomicInteger(); - - private static AtomicReference<PubSubStats> subStatsRef = new AtomicReference<PubSubStats>(); - - private static AtomicReferenceArray<PubSubStats> pubStatsRef = new AtomicReferenceArray<PubSubStats>(NUM_PUB_THREADS); - - /** Thread-safe static reference to RegionMembershipListener instance */ - private static AtomicReference<RegionMembershipListener> rmlRef = - new AtomicReference<RegionMembershipListener>(); - - @SuppressWarnings("unused") /** invoked by reflection */ - private static void cleanup() { - updateEvents.set(0); - rmlRef.set(null); - } - - @SuppressWarnings("unused") /** invoked by reflection */ - private static int getUpdateEvents() { - return updateEvents.get(); - } - - @SuppressWarnings("unused") /** invoked by reflection */ - private static int getPuts() { - return puts.get(); - } - - public StatisticsDUnitTest() { - super(); - } - - @Override - public final void preTearDownCacheTestCase() throws Exception { - Invoke.invokeInEveryVM(getClass(), "cleanup"); - disconnectAllFromDS(); // because this test enabled stat sampling! - } - - @Test - public void testPubAndSubCustomStats() throws Exception { - final String testName = "testPubAndSubCustomStats"; - - final String regionName = "region_" + testName; - final VM[] pubs = new VM[NUM_PUBS]; - for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) { - pubs[pubVM] = Host.getHost(0).getVM(pubVM); - } - final VM sub = Host.getHost(0).getVM(NUM_PUBS); - - final String subArchive = dir + File.separator + testName + "_sub" + ".gfs"; - final String[] pubArchives = new String[NUM_PUBS]; - for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) { - pubArchives[pubVM] = dir + File.separator + testName + "_pub-" + pubVM + ".gfs"; - } - - for (int i = 0; i < NUM_PUBS; i++) { - final int pubVM = i; - pubs[pubVM].invoke(new CacheSerializableRunnable("pub-connect-and-create-data-" + pubVM) { - public void run2() throws CacheException { - new File(dir).mkdir(); - final Properties props = new Properties(); - props.setProperty(STATISTIC_SAMPLING_ENABLED, "true"); - props.setProperty(STATISTIC_SAMPLE_RATE, "1000"); - props.setProperty(STATISTIC_ARCHIVE_FILE, pubArchives[pubVM]); - final InternalDistributedSystem system = getSystem(props); - - // assert that sampler is working as expected - final GemFireStatSampler sampler = system.getStatSampler(); - assertTrue(sampler.isSamplingEnabled()); - assertTrue(sampler.isAlive()); - assertEquals(new File(pubArchives[pubVM]), sampler.getArchiveFileName()); - - final WaitCriterion waitForSampleCollector = new WaitCriterion() { - public boolean done() { - return sampler.getSampleCollector() != null; - } - public String description() { - return "sampler.getSampleCollector() is still null!"; - } - }; - Wait.waitForCriterion(waitForSampleCollector, 4*1000, 10, true); - - final SampleCollector sampleCollector = sampler.getSampleCollector(); - assertNotNull(sampleCollector); - - final StatArchiveHandler archiveHandler = sampleCollector.getStatArchiveHandler(); - assertNotNull(archiveHandler); - assertTrue(archiveHandler.isArchiving()); - - // create cache and region - final Cache cache = getCache(); - final RegionFactory<String, Number> factory = cache.createRegionFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - - final RegionMembershipListener rml = new RegionMembershipListener(); - rmlRef.set(rml); - factory.addCacheListener(rml); - final Region<String, Number> region = factory.create(regionName); - - // create the keys - if (region.getAttributes().getScope() == Scope.DISTRIBUTED_ACK) { - for (int key = 0; key < NUM_KEYS; key++) { - region.create("KEY-"+key, null); - } - } - } - }); - } - - final DistributedMember subMember = (DistributedMember) - sub.invoke(new SerializableCallable("sub-connect-and-create-keys") { - @Override - public Object call() throws Exception { - new File(dir).mkdir(); - final Properties props = new Properties(); - props.setProperty(STATISTIC_SAMPLING_ENABLED, "true"); - props.setProperty(STATISTIC_SAMPLE_RATE, "1000"); - props.setProperty(STATISTIC_ARCHIVE_FILE, subArchive); - final InternalDistributedSystem system = getSystem(props); - - final PubSubStats statistics = new PubSubStats(system, "sub-1", 1); - subStatsRef.set(statistics); - - // assert that sampler is working as expected - final GemFireStatSampler sampler = system.getStatSampler(); - assertTrue(sampler.isSamplingEnabled()); - assertTrue(sampler.isAlive()); - assertEquals(new File(subArchive), sampler.getArchiveFileName()); - - final WaitCriterion waitForSampleCollector = new WaitCriterion() { - public boolean done() { - return sampler.getSampleCollector() != null; - } - public String description() { - return "sampler.getSampleCollector() is still null!"; - } - }; - Wait.waitForCriterion(waitForSampleCollector, 2*1000, 10, true); - - final SampleCollector sampleCollector = sampler.getSampleCollector(); - assertNotNull(sampleCollector); - - final StatArchiveHandler archiveHandler = sampleCollector.getStatArchiveHandler(); - assertNotNull(archiveHandler); - assertTrue(archiveHandler.isArchiving()); - - // create cache and region with UpdateListener - final Cache cache = getCache(); - final RegionFactory<String, Number> factory = cache.createRegionFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - - final CacheListener<String, Number> cl = new UpdateListener(statistics); - factory.addCacheListener(cl); - final Region<String, Number> region = factory.create(regionName); - - // create the keys - if (region.getAttributes().getScope() == Scope.DISTRIBUTED_ACK) { - for (int key = 0; key < NUM_KEYS; key++) { - region.create("KEY-"+key, null); - } - } - - assertEquals(0, statistics.getUpdateEvents()); - return system.getDistributedMember(); - } - }); - - for (int i = 0; i < NUM_PUBS; i++) { - final int pubVM = i; - AsyncInvocation[] publishers = new AsyncInvocation[NUM_PUB_THREADS]; - for (int j = 0; j < NUM_PUB_THREADS; j++) { - final int pubThread = j; - publishers[pubThread] = pubs[pubVM].invokeAsync( - new CacheSerializableRunnable("pub-connect-and-put-data-" + pubVM + "-thread-" + pubThread) { - public void run2() throws CacheException { - final PubSubStats statistics = new PubSubStats(basicGetSystem(), "pub-" + pubThread, pubVM); - pubStatsRef.set(pubThread, statistics); - - final RegionMembershipListener rml = rmlRef.get(); - final Region<String, Number> region = getCache().getRegion(regionName); - - // assert that sub is in rml membership - assertNotNull(rml); - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - return rml.contains(subMember) && rml.size() == NUM_PUBS; - } - public String description() { - return rml.members + " should contain " + subMember; - } - }; - Wait.waitForCriterion(wc, 4*1000, 10, true); - - // publish lots of puts cycling through the NUM_KEYS - assertEquals(0, statistics.getPuts()); - - // cycle through the keys randomly - if (RANDOMIZE_PUTS) { - Random randomGenerator = new Random(); - int key = 0; - for (int i = 0; i < MAX_PUTS; i++) { - final long start = statistics.startPut(); - key = randomGenerator.nextInt(NUM_KEYS); - region.put("KEY-"+key, i); - statistics.endPut(start); - } - - // cycle through he keys in order and wrapping back around - } else { - int key = 0; - for (int i = 0; i < MAX_PUTS; i++) { - final long start = statistics.startPut(); - region.put("KEY-"+key, i); - key++; // cycle through the keys... - if (key >= NUM_KEYS) { - key = 0; - } - statistics.endPut(start); - } - } - assertEquals(MAX_PUTS, statistics.getPuts()); - - // wait for 2 samples to ensure all stats have been archived - final StatisticsType statSamplerType = getSystem().findType("StatSampler"); - final Statistics[] statsArray = getSystem().findStatisticsByType(statSamplerType); - assertEquals(1, statsArray.length); - - final Statistics statSamplerStats = statsArray[0]; - final int initialSampleCount = statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT); - - wc = new WaitCriterion() { - public boolean done() { - return statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT) >= initialSampleCount + 2; - } - public String description() { - return "Waiting for " + StatSamplerStats.SAMPLE_COUNT + " >= " + initialSampleCount + 2; - } - }; - Wait.waitForCriterion(wc, 4*1000, 10, true); - } - }); - } - for (int pubThread = 0; pubThread < publishers.length; pubThread++) { - publishers[pubThread].join(); - if (publishers[pubThread].exceptionOccurred()) { - Assert.fail("Test failed", publishers[pubThread].getException()); - } - } - } - - sub.invoke(new CacheSerializableRunnable("sub-wait-for-samples") { - public void run2() throws CacheException { - // wait for 2 samples to ensure all stats have been archived - final StatisticsType statSamplerType = getSystem().findType("StatSampler"); - final Statistics[] statsArray = getSystem().findStatisticsByType(statSamplerType); - assertEquals(1, statsArray.length); - - final Statistics statSamplerStats = statsArray[0]; - final int initialSampleCount = statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT); - - final WaitCriterion wc = new WaitCriterion() { - public boolean done() { - return statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT) >= initialSampleCount + 2; - } - public String description() { - return "Waiting for " + StatSamplerStats.SAMPLE_COUNT + " >= " + initialSampleCount + 2; - } - }; - Wait.waitForCriterion(wc, 4*1000, 10, true); - - // now post total updateEvents to static - final PubSubStats statistics = subStatsRef.get(); - assertNotNull(statistics); - updateEvents.set(statistics.getUpdateEvents()); - } - }); - - // validate pub values against sub values - final int totalUpdateEvents = sub.invoke(() -> getUpdateEvents()); - - // validate pub values against pub statistics against pub archive - for (int i = 0; i < NUM_PUBS; i++) { - final int pubIdx = i; - pubs[pubIdx].invoke(new CacheSerializableRunnable("pub-validation") { - @SuppressWarnings("unused") - public void run2() throws CacheException { - // add up all the puts - assertEquals(NUM_PUB_THREADS, pubStatsRef.length()); - int totalPuts = 0; - for (int pubThreadIdx = 0; pubThreadIdx < NUM_PUB_THREADS; pubThreadIdx++) { - PubSubStats statistics = pubStatsRef.get(pubThreadIdx); - assertNotNull(statistics); - totalPuts += statistics.getPuts(); - } - - // assert that total puts adds up to max puts times num threads - assertEquals(MAX_PUTS * NUM_PUB_THREADS, totalPuts); - - // assert that archive file contains same values as statistics - File archive = new File(pubArchives[pubIdx]); - assertTrue(archive.exists()); - - StatArchiveReader reader = null; - try { - reader = new StatArchiveReader(new File[]{archive}, null, false); - } catch (IOException e) { - fail("Failed to read " + archive); - } - - double combinedPuts = 0; - - @SuppressWarnings("rawtypes") - List resources = reader.getResourceInstList(); - assertNotNull(resources); - assertFalse(resources.isEmpty()); - - for (@SuppressWarnings("rawtypes") - Iterator iter = resources.iterator(); iter.hasNext();) { - StatArchiveReader.ResourceInst ri = (StatArchiveReader.ResourceInst) iter.next(); - if (!ri.getType().getName().equals(PubSubStats.TYPE_NAME)) { - continue; - } - - StatValue[] statValues = ri.getStatValues(); - for (int i = 0; i < statValues.length; i++) { - String statName = ri.getType().getStats()[i].getName(); - assertNotNull(statName); - - if (statName.equals(PubSubStats.PUTS)) { - StatValue sv = statValues[i]; - sv.setFilter(StatValue.FILTER_NONE); - - double mostRecent = sv.getSnapshotsMostRecent(); - double min = sv.getSnapshotsMinimum(); - double max = sv.getSnapshotsMaximum(); - double maxMinusMin = sv.getSnapshotsMaximum() - sv.getSnapshotsMinimum(); - double mean = sv.getSnapshotsAverage(); - double stdDev = sv.getSnapshotsStandardDeviation(); - - assertEquals(mostRecent, max, 0f); - - double summation = 0; - double[] rawSnapshots = sv.getRawSnapshots(); - for (int j = 0; j < rawSnapshots.length; j++) { - //log.convertToLogWriter().info("DEBUG " + ri.getName() + " " + statName + " rawSnapshots[" + j + "] = " + rawSnapshots[j]); - summation += rawSnapshots[j]; - } - assertEquals(mean, summation / sv.getSnapshotsSize(), 0); - - combinedPuts += mostRecent; - } - } - } - - // assert that sum of mostRecent values for all puts equals totalPuts - assertEquals((double)totalPuts, combinedPuts, 0); - puts.getAndAdd(totalPuts); - } - }); - } - - // validate pub values against sub values - int totalCombinedPuts = 0; - for (int i = 0; i < NUM_PUBS; i++) { - final int pubIdx = i; - final int totalPuts = pubs[pubIdx].invoke(() -> getPuts()); - assertEquals(MAX_PUTS * NUM_PUB_THREADS, totalPuts); - totalCombinedPuts += totalPuts; - } - assertEquals(totalCombinedPuts, totalUpdateEvents); - assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, totalCombinedPuts); - - // validate sub values against sub statistics against sub archive - final int totalPuts = totalCombinedPuts; - sub.invoke(new CacheSerializableRunnable("sub-validation") { - @SuppressWarnings("unused") - public void run2() throws CacheException { - final PubSubStats statistics = subStatsRef.get(); - assertNotNull(statistics); - final int updateEvents = statistics.getUpdateEvents(); - assertEquals(totalPuts, updateEvents); - assertEquals(totalUpdateEvents, updateEvents); - assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, updateEvents); - - // assert that archive file contains same values as statistics - File archive = new File(subArchive); - assertTrue(archive.exists()); - - StatArchiveReader reader = null; - try { - reader = new StatArchiveReader(new File[]{archive}, null, false); - } catch (IOException e) { - fail("Failed to read " + archive); - } - - double combinedUpdateEvents = 0; - - @SuppressWarnings("rawtypes") - List resources = reader.getResourceInstList(); - for (@SuppressWarnings("rawtypes") - Iterator iter = resources.iterator(); iter.hasNext();) { - StatArchiveReader.ResourceInst ri = (StatArchiveReader.ResourceInst) iter.next(); - if (!ri.getType().getName().equals(PubSubStats.TYPE_NAME)) { - continue; - } - - StatValue[] statValues = ri.getStatValues(); - for (int i = 0; i < statValues.length; i++) { - String statName = ri.getType().getStats()[i].getName(); - assertNotNull(statName); - - if (statName.equals(PubSubStats.UPDATE_EVENTS)) { - StatValue sv = statValues[i]; - sv.setFilter(StatValue.FILTER_NONE); - - double mostRecent = sv.getSnapshotsMostRecent(); - double min = sv.getSnapshotsMinimum(); - double max = sv.getSnapshotsMaximum(); - double maxMinusMin = sv.getSnapshotsMaximum() - sv.getSnapshotsMinimum(); - double mean = sv.getSnapshotsAverage(); - double stdDev = sv.getSnapshotsStandardDeviation(); - - assertEquals(mostRecent, max,0); - - double summation = 0; - double[] rawSnapshots = sv.getRawSnapshots(); - for (int j = 0; j < rawSnapshots.length; j++) { - //log.convertToLogWriter().info("DEBUG " + ri.getName() + " " + statName + " rawSnapshots[" + j + "] = " + rawSnapshots[j]); - summation += rawSnapshots[j]; - } - assertEquals(mean, summation / sv.getSnapshotsSize(),0); - - combinedUpdateEvents += mostRecent; - } - } - } - assertEquals((double)totalUpdateEvents, combinedUpdateEvents,0); - } - }); - - final int updateEvents = sub.invoke(() -> readIntStat(new File(subArchive), "PubSubStats", "updateEvents")); - assertTrue(updateEvents > 0); - assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, updateEvents); - - int puts = 0; - for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) { - int currentPubVM = pubVM; - int vmPuts = (int)pubs[pubVM].invoke(() -> readIntStat(new File(pubArchives[currentPubVM]), "PubSubStats", "puts")); - assertTrue(vmPuts > 0); - assertEquals(MAX_PUTS * NUM_PUB_THREADS, vmPuts); - puts += vmPuts; - } - assertTrue(puts > 0); - assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, puts); - - // use regex "testPubAndSubCustomStats" - - final MultipleArchiveReader reader = new MultipleArchiveReader(".*testPubAndSubCustomStats.*\\.gfs"); - - final int combinedUpdateEvents = reader.readIntStat(PubSubStats.TYPE_NAME, PubSubStats.UPDATE_EVENTS); - assertTrue("Failed to read updateEvents stat values", combinedUpdateEvents > 0); - - final int combinedPuts = reader.readIntStat(PubSubStats.TYPE_NAME, PubSubStats.PUTS); - assertTrue("Failed to read puts stat values", combinedPuts > 0); - - assertTrue("updateEvents is " + combinedUpdateEvents + " but puts is " + combinedPuts, - combinedUpdateEvents == combinedPuts); - } - - static int readIntStat(File archive, String typeName, String statName) throws IOException { - final MultipleArchiveReader reader = new MultipleArchiveReader(archive); - return reader.readIntStat(typeName, statName); - } - - public static void main(String[] args) throws Exception { - if (args.length == 2) { - final String statType = args[0]; - final String statName = args[1]; - - final MultipleArchiveReader reader = new MultipleArchiveReader(); - final int value = reader.readIntStat(statType, statName); - System.out.println(statType + "#" + statName + "=" + value); - - } else if (args.length == 3) { - final String archiveName = args[0]; - final String statType = args[1]; - final String statName = args[2]; - - final File archive = new File(archiveName).getAbsoluteFile(); - assertTrue("File " + archive + " does not exist!", archive.exists()); - assertTrue(archive + " exists but is not a file!", archive.isFile()); - - final MultipleArchiveReader reader = new MultipleArchiveReader(archive); - final int value = reader.readIntStat(statType, statName); - System.out.println(archive + ": " + statType + "#" + statName + "=" + value); - - } else if (args.length == 4) { - final String statType1 = args[0]; - final String statName1 = args[1]; - final String statType2 = args[2]; - final String statName2 = args[3]; - - final MultipleArchiveReader reader = new MultipleArchiveReader(); - final int value1 = reader.readIntStat(statType1, statName1); - final int value2 = reader.readIntStat(statType2, statName2); - - assertTrue(statType1 + "#" + statName1 + "=" + value1 - + " does not equal " + statType2 + "#" + statName2 + "=" + value2, - value1 == value2); - } else { - assertEquals("Miminum two args are required: statType statName", 2, args.length); - } - } - - /** - * @since GemFire 7.0 - */ - static class PubSubStats { - - private static final String TYPE_NAME = "PubSubStats"; - private static final String TYPE_DESCRIPTION = "Statistics for StatisticsDUnitTest with Pub/Sub."; - - private static final String INSTANCE_PREFIX = "pubSubStats_"; - - private static final String PUTS = "puts"; - private static final String PUT_TIME = "putTime"; - - private static final String UPDATE_EVENTS = "updateEvents"; - - private static StatisticsType createType(StatisticsFactory f) { - StatisticsTypeFactory stf = StatisticsTypeFactoryImpl.singleton(); - StatisticsType type = stf.createType( - TYPE_NAME, - TYPE_DESCRIPTION, - createDescriptors(f)); - return type; - } - - private static StatisticDescriptor[] createDescriptors(StatisticsFactory f) { - boolean largerIsBetter = true; - return new StatisticDescriptor[] { - f.createIntCounter - ( - PUTS, - "Number of puts completed.", - "operations", - largerIsBetter - ), - f.createLongCounter - ( - PUT_TIME, - "Total time spent doing puts.", - "nanoseconds", - !largerIsBetter - ), - f.createIntCounter - ( - UPDATE_EVENTS, - "Number of update events.", - "events", - largerIsBetter - ) - }; - } - - private final Statistics statistics; - - PubSubStats(StatisticsFactory f, String name, int id) { - this.statistics = f.createAtomicStatistics( - createType(f), INSTANCE_PREFIX + "_" + name, id); - } - - Statistics statistics() { - return this.statistics; - } - - void close() { - this.statistics.close(); - } - - int getUpdateEvents() { - return statistics().getInt(UPDATE_EVENTS); - } - - void incUpdateEvents() { - incUpdateEvents(1); - } - - void incUpdateEvents(int amount) { - incStat(UPDATE_EVENTS, amount); - } - - int getPuts() { - return statistics().getInt(PUTS); - } - - void incPuts() { - incPuts(1); - } - - void incPuts(int amount) { - incStat(PUTS, amount); - } - - void incPutTime(long amount) { - incStat(PUT_TIME, amount); - } - - long startPut() { - return NanoTimer.getTime(); - } - - void endPut(long start) { - endPut(start, 1); - } - - void endPut(long start, int amount) { - long elapsed = NanoTimer.getTime() - start; - incPuts(amount); - incPutTime(elapsed); - } - - private void incStat(String statName, int intValue) { - statistics().incInt(statName, intValue); - } - - private void incStat(String statName, long longValue) { - statistics().incLong(statName, longValue); - } - } - - /** - * @since GemFire 7.0 - */ - static class UpdateListener extends CacheListenerAdapter<String, Number> { - - private final PubSubStats statistics; - - UpdateListener(PubSubStats statistics) { - this.statistics = statistics; - } - - @Override - public void afterUpdate(EntryEvent<String, Number> event) { - this.statistics.incUpdateEvents( 1 ); - } - } - - /** - * @since GemFire 7.0 - */ - static class RegionMembershipListener extends RegionMembershipListenerAdapter<String, Number> { - - private final List<DistributedMember> members = new ArrayList<DistributedMember>(); - - int size() { - return this.members.size(); - } - - List<DistributedMember> getMembers() { - return Collections.unmodifiableList(new ArrayList<DistributedMember>(this.members)); - } - - boolean containsId(DistributedMember member) { - for (DistributedMember peer : getMembers()) { - if (peer.getId().equals(member.getId())) { - return true; - } - } - return false; - } - - boolean contains(DistributedMember member) { - return this.members.contains(member); - } - - String debugContains(DistributedMember member) { - StringBuffer sb = new StringBuffer(); - for (DistributedMember peer : getMembers()) { - if (!peer.equals(member)) { - InternalDistributedMember peerIDM = (InternalDistributedMember)peer; - InternalDistributedMember memberIDM = (InternalDistributedMember)member; - sb.append("peer port=").append(peerIDM.getPort()).append(" "); - sb.append("member port=").append(memberIDM.getPort()).append(" "); - } - } - return sb.toString(); - } - - @Override - public void initialMembers(Region<String, Number> region, DistributedMember[] initialMembers) { - for (int i = 0; i < initialMembers.length; i++) { - this.members.add(initialMembers[i]); - } - } - - @Override - public void afterRemoteRegionCreate(RegionEvent<String, Number> event) { - this.members.add(event.getDistributedMember()); - } - - @Override - public void afterRemoteRegionDeparture(RegionEvent<String, Number> event) { - this.members.remove(event.getDistributedMember()); - } - - @Override - public void afterRemoteRegionCrash(RegionEvent<String, Number> event) { - this.members.remove(event.getDistributedMember()); - } - } - - static class MultipleArchiveReader { - - private final File dir; - private final String regex; - - MultipleArchiveReader(File dir, String regex) { - this.dir = dir; - this.regex = regex; - } - - MultipleArchiveReader(File dir) { - this.dir = dir; - this.regex = null; - } - - MultipleArchiveReader(String regex) { - this(new File(System.getProperty("user.dir")).getAbsoluteFile(), regex); - } - - MultipleArchiveReader() { - this(new File(System.getProperty("user.dir")).getAbsoluteFile(), null); - } - - int readIntStat(String typeName, String statName) throws IOException { - // directory (maybe directories) with one or more archives - if (this.dir.exists() && this.dir.isDirectory()) { - final List<File> archives = findFilesWithSuffix(this.dir, this.regex, ".gfs"); - return readIntStatFromArchives(archives, typeName, statName); - - // one archive file - } else if (this.dir.exists() && this.dir.isFile()) { - final List<File> archives = new ArrayList<File>(); - archives.add(this.dir); - return readIntStatFromArchives(archives, typeName, statName); - - // failure - } else { - throw new IllegalStateException(this.dir + " does not exist!"); - } - } - - private int readIntStatFromArchives(List<File> archives, String typeName, String statName) throws IOException { - final StatValue[] statValues = readStatValues(archives, typeName, statName); - assertNotNull("statValues is null!", statValues); - assertTrue("statValues is empty!", statValues.length > 0); - - int value = 0; - for (int i = 0; i < statValues.length; i++) { - statValues[i].setFilter(StatValue.FILTER_NONE); - value += (int)statValues[i].getSnapshotsMaximum(); - } - return value; - } - - private static List<File> findFilesWithSuffix(final File dir, final String regex, final String suffix) { - Pattern p = null; - if (regex != null) { - p = Pattern.compile(regex); - } - final Pattern pattern = p; - - return findFiles( - dir, - new FileFilter() { - public boolean accept(File file) { - boolean accept = true; - if (regex != null) { - final Matcher matcher = pattern.matcher(file.getName()); - accept = matcher.matches(); - } - if (suffix != null) { - accept = accept && file.getName().endsWith(suffix); - } - return accept; - } - }, - true); - } - - private static List<File> findFiles(File dir, FileFilter filter, boolean recursive) { - File[] tmpfiles = dir.listFiles(filter); - List<File> matches; - if (tmpfiles == null) { - matches = new ArrayList<File>(); - } else { - matches = new ArrayList<File>(Arrays.asList(tmpfiles)); - } - if (recursive) { - File[] files = dir.listFiles(); - if (files != null) { - for (int i = 0; i < files.length; i++) { - File file = files[i]; - if (file.isDirectory()) { - matches.addAll(findFiles(file, filter, recursive)); - } - } - } - } - return matches; - } - - private static StatValue[] readStatValues(final List<File> archives, final String typeName, final String statName) throws IOException { - System.out.println("\nKIRK readStatValues reading archives:\n\n" + archives + "\n"); - final StatSpec statSpec = new StatSpec() { - @Override - public boolean archiveMatches(File value) { - return true; - } - @Override - public boolean typeMatches(String value) { - return typeName.equals(value); - } - @Override - public boolean statMatches(String value) { - return statName.equals(value); - } - @Override - public boolean instanceMatches(String textId, long numericId) { - return true; - } - @Override - public int getCombineType() { - return StatSpec.FILE; - } - }; - - final File[] archiveFiles = archives.toArray(new File[archives.size()]); - final StatSpec[] filters = new StatSpec[] { statSpec }; - final StatArchiveReader reader = new StatArchiveReader(archiveFiles, filters, true); - final StatValue[] values = reader.matchSpec(statSpec); - return values; - } - } -}