This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch micrometer in repository https://gitbox.apache.org/repos/asf/geode.git
commit d0a1f78c9a1d9948f2decdfa58a90d77efbf92a1 Author: Udo Kohlmeyer <[email protected]> AuthorDate: Fri Jan 5 17:47:08 2018 -0800 Adding Micrometer to PartitionedRegionStats --- geode-core/build.gradle | 388 +++--- .../cache/MicrometerPartitionRegionStats.kt | 380 ++++++ .../apache/geode/internal/cache/MicrometerStats.kt | 27 + .../internal/cache/PartitionedRegionStatsImpl.java | 1297 ++++++++++++++++++++ .../cache/TimedMicrometerPartitionedRegionStats.kt | 67 + geode-protobuf/build.gradle | 1 + .../statistics/MicrometerClientStatsImpl.kt | 15 +- .../v1/acceptance/CacheOperationsJUnitTest.java | 14 +- 8 files changed, 2000 insertions(+), 189 deletions(-) diff --git a/geode-core/build.gradle b/geode-core/build.gradle index 426840f..4363762 100755 --- a/geode-core/build.gradle +++ b/geode-core/build.gradle @@ -17,227 +17,257 @@ apply plugin: 'antlr' +apply plugin: 'kotlin' apply plugin: 'me.champeau.gradle.jmh' sourceSets { - jca { - compileClasspath += configurations.compile - runtimeClasspath += configurations.runtime - } + jca { + compileClasspath += configurations.compile + runtimeClasspath += configurations.runtime + } } configurations { - //declaring new configuration that will be used to associate with artifacts - archives + //declaring new configuration that will be used to associate with artifacts + archives } dependencies { - // Source Dependencies - antlr 'antlr:antlr:' + project.'antlr.version' - - // External - provided files("${System.getProperty('java.home')}/../lib/tools.jar") - compile 'com.github.stephenc.findbugs:findbugs-annotations:' + project.'stephenc-findbugs.version' - compile 'org.jgroups:jgroups:' + project.'jgroups.version' - compile 'antlr:antlr:' + project.'antlr.version' - compile 'com.fasterxml.jackson.core:jackson-annotations:' + project.'jackson.version' - compile 'com.fasterxml.jackson.core:jackson-databind:' + project.'jackson.version' - compile 'commons-io:commons-io:' + project.'commons-io.version' - compile 'commons-validator:commons-validator:' + project.'commons-validator.version' - compile 'commons-digester:commons-digester:' + project.'commons-digester.version' - - compile 'commons-lang:commons-lang:' + project.'commons-lang.version' - compile ('commons-modeler:commons-modeler:' + project.'commons-modeler.version') { - exclude module: 'commons-digester' - exclude module: 'commons-logging-api' - exclude module: 'mx4j-jmx' - exclude module: 'xml-apis' - ext.optional = true - } - compile ('io.netty:netty-all:' + project.'netty-all.version') { - ext.optional = true - } - compile 'it.unimi.dsi:fastutil:' + project.'fastutil.version' - compile ('javax.mail:javax.mail-api:' + project.'javax.mail-api.version') { - ext.optional = true; - } - compile 'javax.resource:javax.resource-api:' + project.'javax.resource-api.version' - compile ('mx4j:mx4j:' + project.'mx4j.version') { - ext.optional = true; - } - compile ('mx4j:mx4j-remote:' + project.'mx4j-remote.version') { - ext.optional = true; - } - compile ('mx4j:mx4j-tools:' + project.'mx4j-tools.version') { - ext.optional = true; - } - compile ('net.java.dev.jna:jna:' + project.'jna.version') - - compile ('net.sf.jopt-simple:jopt-simple:' + project.'jopt-simple.version') - - compile 'org.apache.logging.log4j:log4j-api:' + project.'log4j.version' - compile 'org.apache.logging.log4j:log4j-core:' + project.'log4j.version' - runtime ('org.fusesource.jansi:jansi:' + project.'jansi.version') { - ext.optional = true - } - runtime 'org.slf4j:slf4j-api:' + project.'slf4j-api.version' - - runtime ('org.apache.logging.log4j:log4j-slf4j-impl:' + project.'log4j.version') { - exclude module: 'slf4j-api' - ext.optional = true - } - runtime ('org.apache.logging.log4j:log4j-jcl:' + project.'log4j.version') { - ext.optional = true - } - runtime ('org.apache.logging.log4j:log4j-jul:' + project.'log4j.version') { - ext.optional = true - } - compile ('org.eclipse.jetty:jetty-webapp:' + project.'jetty.version') { - ext.optional = true - } - provided ('org.springframework:spring-webmvc:' + project.'springframework.version') { - exclude module: 'aopalliance' - exclude module: 'spring-aop' - ext.optional = true - } - compile ('org.springframework:spring-core:' + project.'springframework.version') { - ext.optional = true - } - compile ('org.springframework.shell:spring-shell:' + project.'spring-shell.version') { - exclude module: 'aopalliance' - exclude module: 'asm' - exclude module: 'cglib' - exclude module: 'guava' - exclude module: 'spring-aop' - exclude module: 'spring-context-support' - ext.optional = true - } - compile ('org.iq80.snappy:snappy:' + project.'snappy-java.version') { - ext.optional = true - } - - compile 'org.apache.shiro:shiro-core:' + project.'shiro.version' - - // https://mvnrepository.com/artifact/io.github.lukehutch/fast-classpath-scanner - compile 'io.github.lukehutch:fast-classpath-scanner:' + project.'fast-classpath-scanner.version' - - compile 'com.healthmarketscience.rmiio:rmiio:' + project.'rmiio.version' - - compile project(':geode-common') - compile project(':geode-json') - - jcaCompile sourceSets.main.output - - testCompile project(':geode-junit') - testCompile project(':geode-old-versions') - - // Test Dependencies - // External - testCompile 'com.jayway.jsonpath:json-path-assert:' + project.'json-path-assert.version' - testCompile 'org.apache.bcel:bcel:' + project.'bcel.version' - testRuntime 'org.apache.derby:derby:' + project.'derby.version' - testCompile 'org.mockito:mockito-core:' + project.'mockito-core.version' - testRuntime 'commons-collections:commons-collections:' + project.'commons-collections.version' - testRuntime 'commons-configuration:commons-configuration:' + project.'commons-configuration.version' - testRuntime 'commons-io:commons-io:' + project.'commons-io.version' - testRuntime 'commons-validator:commons-validator:' + project.'commons-validator.version' - testCompile 'net.spy:spymemcached:' + project.'spymemcached.version' - testCompile 'redis.clients:jedis:' + project.'jedis.version' - testCompile 'org.springframework:spring-test:' + project.'springframework.version' - - - testCompile 'com.pholser:junit-quickcheck-core:' + project.'junit-quickcheck.version' - testCompile 'com.pholser:junit-quickcheck-generators:' + project.'junit-quickcheck.version' - testCompile 'com.pholser:junit-quickcheck-guava:' + project.'junit-quickcheck.version' - - testRuntime 'xerces:xercesImpl:' + project.'xercesImpl.version' - testCompile project(':geode-concurrency-test') + // Source Dependencies + antlr 'antlr:antlr:' + project.'antlr.version' + + // External + provided files("${System.getProperty('java.home')}/../lib/tools.jar") + + compile group: 'io.micrometer', name: 'micrometer-core', version: '1.0.0-rc.5' + compile group: 'io.micrometer', name: 'micrometer-registry-atlas', version: '1.0.0-rc.5' + compile group: 'io.micrometer', name: 'micrometer-registry-influx', version: '1.0.0-rc.5' + compile group: 'io.micrometer', name: 'micrometer-registry-jmx', version: '1.0.0-rc.5' + + + compile 'com.github.stephenc.findbugs:findbugs-annotations:' + project.'stephenc-findbugs.version' + compile 'org.jgroups:jgroups:' + project.'jgroups.version' + compile 'antlr:antlr:' + project.'antlr.version' + compile 'com.fasterxml.jackson.core:jackson-annotations:' + project.'jackson.version' + compile 'com.fasterxml.jackson.core:jackson-databind:' + project.'jackson.version' + compile 'commons-io:commons-io:' + project.'commons-io.version' + compile 'commons-validator:commons-validator:' + project.'commons-validator.version' + compile 'commons-digester:commons-digester:' + project.'commons-digester.version' + + compile 'commons-lang:commons-lang:' + project.'commons-lang.version' + compile('commons-modeler:commons-modeler:' + project.'commons-modeler.version') { + exclude module: 'commons-digester' + exclude module: 'commons-logging-api' + exclude module: 'mx4j-jmx' + exclude module: 'xml-apis' + ext.optional = true + } + compile('io.netty:netty-all:' + project.'netty-all.version') { + ext.optional = true + } + compile 'it.unimi.dsi:fastutil:' + project.'fastutil.version' + compile('javax.mail:javax.mail-api:' + project.'javax.mail-api.version') { + ext.optional = true; + } + compile 'javax.resource:javax.resource-api:' + project.'javax.resource-api.version' + compile('mx4j:mx4j:' + project.'mx4j.version') { + ext.optional = true; + } + compile('mx4j:mx4j-remote:' + project.'mx4j-remote.version') { + ext.optional = true; + } + compile('mx4j:mx4j-tools:' + project.'mx4j-tools.version') { + ext.optional = true; + } + compile('net.java.dev.jna:jna:' + project.'jna.version') + + compile('net.sf.jopt-simple:jopt-simple:' + project.'jopt-simple.version') + + compile 'org.apache.logging.log4j:log4j-api:' + project.'log4j.version' + compile 'org.apache.logging.log4j:log4j-core:' + project.'log4j.version' + runtime('org.fusesource.jansi:jansi:' + project.'jansi.version') { + ext.optional = true + } + runtime 'org.slf4j:slf4j-api:' + project.'slf4j-api.version' + + runtime('org.apache.logging.log4j:log4j-slf4j-impl:' + project.'log4j.version') { + exclude module: 'slf4j-api' + ext.optional = true + } + runtime('org.apache.logging.log4j:log4j-jcl:' + project.'log4j.version') { + ext.optional = true + } + runtime('org.apache.logging.log4j:log4j-jul:' + project.'log4j.version') { + ext.optional = true + } + compile('org.eclipse.jetty:jetty-webapp:' + project.'jetty.version') { + ext.optional = true + } + provided('org.springframework:spring-webmvc:' + project.'springframework.version') { + exclude module: 'aopalliance' + exclude module: 'spring-aop' + ext.optional = true + } + compile('org.springframework:spring-core:' + project.'springframework.version') { + ext.optional = true + } + compile('org.springframework.shell:spring-shell:' + project.'spring-shell.version') { + exclude module: 'aopalliance' + exclude module: 'asm' + exclude module: 'cglib' + exclude module: 'guava' + exclude module: 'spring-aop' + exclude module: 'spring-context-support' + ext.optional = true + } + compile('org.iq80.snappy:snappy:' + project.'snappy-java.version') { + ext.optional = true + } + + compile 'org.apache.shiro:shiro-core:' + project.'shiro.version' + + // https://mvnrepository.com/artifact/io.github.lukehutch/fast-classpath-scanner + compile 'io.github.lukehutch:fast-classpath-scanner:' + project.'fast-classpath-scanner.version' + + compile 'com.healthmarketscience.rmiio:rmiio:' + project.'rmiio.version' + + compile project(':geode-common') + compile project(':geode-json') + + jcaCompile sourceSets.main.output + + testCompile project(':geode-junit') + testCompile project(':geode-old-versions') + + // Test Dependencies + // External + testCompile 'com.jayway.jsonpath:json-path-assert:' + project.'json-path-assert.version' + testCompile 'org.apache.bcel:bcel:' + project.'bcel.version' + testRuntime 'org.apache.derby:derby:' + project.'derby.version' + testCompile 'org.mockito:mockito-core:' + project.'mockito-core.version' + testRuntime 'commons-collections:commons-collections:' + project.'commons-collections.version' + testRuntime 'commons-configuration:commons-configuration:' + project.'commons-configuration.version' + testRuntime 'commons-io:commons-io:' + project.'commons-io.version' + testRuntime 'commons-validator:commons-validator:' + project.'commons-validator.version' + testCompile 'net.spy:spymemcached:' + project.'spymemcached.version' + testCompile 'redis.clients:jedis:' + project.'jedis.version' + testCompile 'org.springframework:spring-test:' + project.'springframework.version' + + + testCompile 'com.pholser:junit-quickcheck-core:' + project.'junit-quickcheck.version' + testCompile 'com.pholser:junit-quickcheck-generators:' + project.'junit-quickcheck.version' + testCompile 'com.pholser:junit-quickcheck-guava:' + project.'junit-quickcheck.version' + + testRuntime 'xerces:xercesImpl:' + project.'xercesImpl.version' + testCompile project(':geode-concurrency-test') } def generatedResources = "$buildDir/generated-resources/main" sourceSets { - main { - output.dir(generatedResources, builtBy: 'createVersionPropertiesFile') - } - test { - output.dir(generatedResources, builtBy: 'createVersionPropertiesFile') - } + main { + output.dir(generatedResources, builtBy: 'createVersionPropertiesFile') + } + test { + output.dir(generatedResources, builtBy: 'createVersionPropertiesFile') + } } jmh { - include = project.hasProperty('include') ? project.getProperties().get('include') : '.*' - duplicateClassesStrategy = 'warn' + include = project.hasProperty('include') ? project.getProperties().get('include') : '.*' + duplicateClassesStrategy = 'warn' } // Creates the version properties file and writes it to the classes dir task createVersionPropertiesFile { - def propertiesFile = file(generatedResources + "/org/apache/geode/internal/GemFireVersion.properties"); - outputs.file propertiesFile - inputs.dir compileJava.destinationDir - - doLast { - def props = [ - "Product-Name" : productName, - "Product-Version" : version, - "Build-Id" : "${System.env.USER} ${buildId}".toString(), - "Build-Date" : new Date().format('yyyy-MM-dd HH:mm:ss Z'), - "Build-Platform" : "${System.properties['os.name']} ${System.properties['os.version']} ${System.properties['os.arch']}".toString(), - "Build-Java-Version": System.properties['java.version'] - ] as Properties - props.putAll(readScmInfo()) - - propertiesFile.getParentFile().mkdirs(); - new FileOutputStream(propertiesFile).withStream { fos -> - props.store(fos, '') + def propertiesFile = file(generatedResources + "/org/apache/geode/internal/GemFireVersion.properties"); + outputs.file propertiesFile + inputs.dir compileJava.destinationDir + + doLast { + def props = [ + "Product-Name" : productName, + "Product-Version" : version, + "Build-Id" : "${System.env.USER} ${buildId}".toString(), + "Build-Date" : new Date().format('yyyy-MM-dd HH:mm:ss Z'), + "Build-Platform" : "${System.properties['os.name']} ${System.properties['os.version']} ${System.properties['os.arch']}".toString(), + "Build-Java-Version": System.properties['java.version'] + ] as Properties + props.putAll(readScmInfo()) + + propertiesFile.getParentFile().mkdirs(); + new FileOutputStream(propertiesFile).withStream { fos -> + props.store(fos, '') + } } - } } jar { - from sourceSets.main.output - from sourceSets.jca.output + from sourceSets.main.output + from sourceSets.jca.output - exclude 'org/apache/geode/management/internal/web/**' - exclude 'org/apache/geode/internal/i18n/StringIdResourceBundle_ja.txt' - exclude 'org/apache/geode/admin/doc-files/ds4_0.dtd' + exclude 'org/apache/geode/management/internal/web/**' + exclude 'org/apache/geode/internal/i18n/StringIdResourceBundle_ja.txt' + exclude 'org/apache/geode/admin/doc-files/ds4_0.dtd' } -task webJar (type: Jar, dependsOn: classes) { - description 'Assembles the jar archive containing the gemfire management web classes.' - from sourceSets.main.output - baseName 'geode-web' - include 'org/apache/geode/management/internal/web/**' +task webJar(type: Jar, dependsOn: classes) { + description 'Assembles the jar archive containing the gemfire management web classes.' + from sourceSets.main.output + baseName 'geode-web' + include 'org/apache/geode/management/internal/web/**' } -task raJar (type: Jar, dependsOn: classes) { - description 'Assembles the jar archive that contains the JCA classes' - from sourceSets.jca.output - exclude 'org/apache/geode/ra/**' - archiveName 'ra.jar' +task raJar(type: Jar, dependsOn: classes) { + description 'Assembles the jar archive that contains the JCA classes' + from sourceSets.jca.output + exclude 'org/apache/geode/ra/**' + archiveName 'ra.jar' } -task jcaJar (type: Jar, dependsOn: raJar) { - description 'Assembles the jar archive that contains the JCA bundle' - baseName 'geode-jca' - extension 'rar' - metaInf { from 'src/jca/ra.xml' } - from raJar.archivePath +task jcaJar(type: Jar, dependsOn: raJar) { + description 'Assembles the jar archive that contains the JCA bundle' + baseName 'geode-jca' + extension 'rar' + metaInf { from 'src/jca/ra.xml' } + from raJar.archivePath } configurations { - classesOutput { - extendsFrom compile - description 'a dependency that exposes the compiled classes' - } + classesOutput { + extendsFrom compile + description 'a dependency that exposes the compiled classes' + } } dependencies { - classesOutput sourceSets.main.output + classesOutput sourceSets.main.output } tasks.eclipse.dependsOn(generateGrammarSource) +buildscript { + ext.kotlin_version = '1.2.10' + repositories { + mavenCentral() + } + dependencies { + classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" + } +} +repositories { + mavenCentral() +} +compileKotlin { + kotlinOptions { + jvmTarget = "1.8" + } +} +compileTestKotlin { + kotlinOptions { + jvmTarget = "1.8" + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/MicrometerPartitionRegionStats.kt b/geode-core/src/main/java/org/apache/geode/internal/cache/MicrometerPartitionRegionStats.kt new file mode 100644 index 0000000..dcb695d --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/MicrometerPartitionRegionStats.kt @@ -0,0 +1,380 @@ +package org.apache.geode.internal.cache + +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.Tag +import java.util.concurrent.atomic.AtomicInteger + +open class MicrometerPartitionRegionStats(val regionName: String) : MicrometerStats() { + + @Suppress("PropertyName") + protected val PARTITIONED_REGION = "PartitionedRegion" + + private val putCounter = constructCounterForMetric("put") + private val putAllCounter = constructCounterForMetric("putAll") + private val createCounter = constructCounterForMetric("create") + private val removeAllCounter = constructCounterForMetric("removeAll") + private val getCounter = constructCounterForMetric("get") + private val destroyCounter = constructCounterForMetric("destroy") + private val invalidateCounter = constructCounterForMetric("invalidate") + private val containsKeyCounter = constructCounterForMetric("containsKey") + private val containValueForKeyCounter = constructCounterForMetric("containValueForKey") + private val containsKeyValueRetriesCounter = constructCounterForMetric("containsKeyValueRetries") + private val containsKeyValueOpsRetriedCounter = constructCounterForMetric("containsKeyValueOpsRetried") + private val incInvalidateRetriesCounter = constructCounterForMetric("incInvalidateRetries") + private val incInvalidateOpsRetriedCounter = constructCounterForMetric("incInvalidateOpsRetried") + private val incDestroyRetriesCounter = constructCounterForMetric("incDestroyRetries") + private val incDestroyOpsRetriedCounter = constructCounterForMetric("incDestroyOpsRetried") + private val incPutRetriesCounter = constructCounterForMetric("incPutRetries") + private val incPutOpsRetriedCounter = constructCounterForMetric("incPutOpsRetried") + private val incGetOpsRetriedCounter = constructCounterForMetric("incGetOpsRetried") + private val incGetRetriesCounter = constructCounterForMetric("incGetRetries") + private val incCreateOpsRetriedCounter = constructCounterForMetric("incCreateOpsRetried") + private val incCreateRetriesCounter = constructCounterForMetric("incCreateRetries") + private val incPreferredReadLocalCounter = constructCounterForMetric("incPreferredReadLocal") + private val incPreferredReadRemoteCounter = constructCounterForMetric("incPreferredReadRemote") + private val incPutAllRetriesCounter = constructCounterForMetric("incPutAllRetries") + private val incPutAllMsgsRetriedCounter = constructCounterForMetric("incPutAllMsgsRetried") + private val incRemoveAllRetriesCounter = constructCounterForMetric("incRemoveAllRetries") + private val incRemoveAllMsgsRetriedCounter = constructCounterForMetric("incRemoveAllMsgsRetried") + private val incPartitionMessagesSentCounter = constructCounterForMetric("incPartitionMessagesSent") + private val incBucketCountCounter = constructCounterForMetric("incBucketCount") + + private fun constructCounterForMetric(metricName: String): Counter = + metrics.counter("${metricName}Counter", regionName, PARTITIONED_REGION) + + private fun constructAtomicIntegerToMonitor(metricName: String): AtomicInteger = + metrics.gauge("${metricName}Gauge",listOf(regionName,PARTITIONED_REGION), AtomicInteger(0),AtomicInteger::get) + + open fun endPut(startTimeInNanos: Long) = putCounter.increment() + open fun endPutAll(startTimeInNanos: Long) = putAllCounter.increment() + open fun endCreate(startTimeInNanos: Long) = createCounter.increment() + open fun endRemoveAll(startTimeInNanos: Long) = removeAllCounter.increment() + open fun endGet(startTimeInNanos: Long) = getCounter.increment() + open fun endDestroy(startTimeInNanos: Long) = destroyCounter.increment() + open fun endInvalidate(startTimeInNanos: Long) = invalidateCounter.increment() + open fun endContainsKey(startTimeInNanos: Long) = containsKeyCounter.increment() + open fun endContainsValueForKey(startTimeInNanos: Long) = containValueForKeyCounter.increment() + fun incContainsKeyValueRetries() = containsKeyValueRetriesCounter.increment() + fun incContainsKeyValueOpsRetried() = containsKeyValueOpsRetriedCounter.increment() + fun incInvalidateRetries() = incInvalidateRetriesCounter.increment() + fun incInvalidateOpsRetried() = incInvalidateOpsRetriedCounter.increment() + fun incDestroyRetries() = incDestroyRetriesCounter.increment() + fun incDestroyOpsRetried() = incDestroyOpsRetriedCounter.increment() + fun incPutRetries() = incPutRetriesCounter.increment() + fun incPutOpsRetried() = incPutOpsRetriedCounter.increment() + fun incGetOpsRetried() = incGetOpsRetriedCounter.increment() + fun incGetRetries() = incGetRetriesCounter.increment() + fun incCreateOpsRetried() = incCreateOpsRetriedCounter.increment() + fun incCreateRetries() = incCreateRetriesCounter.increment() + fun incPreferredReadLocal() = incPreferredReadLocalCounter.increment() + fun incPreferredReadRemote() = incPreferredReadRemoteCounter.increment() + fun incPutAllRetries() = incPutAllRetriesCounter.increment() + fun incPutAllMsgsRetried() = incPutAllMsgsRetriedCounter.increment() + fun incRemoveAllRetries() = incRemoveAllRetriesCounter.increment() + fun incRemoveAllMsgsRetried() = incRemoveAllMsgsRetriedCounter.increment() + fun incPartitionMessagesSent() = incPartitionMessagesSentCounter.increment() + fun incBucketCount(bucketCount: Int) = incBucketCountGauge.increment(bucketCount.toDouble()) + fun incLowRedundancyBucketCount(`val`: Int) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun incNoCopiesBucketCount(`val`: Int) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun incTotalNumBuckets(`val`: Int) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun incPrimaryBucketCount(`val`: Int) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun incVolunteeringThreads(`val`: Int) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun incPRMetaDataSentCount() { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun incDataStoreEntryCount(amt: Int) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun incBytesInUse(delta: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getVolunteeringInProgress(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun startPartitionMessageProcessing(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endPartitionMessagesProcessing(start: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun setBucketCount(i: Int) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + + fun getDataStoreEntryCount(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + + fun getDataStoreBytesInUse(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getTotalBucketCount(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getVolunteeringBecamePrimary(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getVolunteeringBecamePrimaryTime(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getVolunteeringOtherPrimary(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getVolunteeringOtherPrimaryTime(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getVolunteeringClosed(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getVolunteeringClosedTime(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun startVolunteering(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endVolunteeringBecamePrimary(start: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endVolunteeringOtherPrimary(start: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endVolunteeringClosed(start: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getTotalNumBuckets(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + + fun getPrimaryBucketCount(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + + fun getVolunteeringThreads(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + + fun getLowRedundancyBucketCount(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getNoCopiesBucketCount(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + + fun getConfiguredRedundantCopies(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun setConfiguredRedundantCopies(`val`: Int) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun setLocalMaxMemory(l: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getActualRedundantCopies(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun setActualRedundantCopies(`val`: Int) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun putStartTime(key: Any?, startTime: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun removeStartTime(key: Any?): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endGetEntry(startTime: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endGetEntry(start: Long, numInc: Int) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun startRecovery(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endRecovery(start: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun startBucketCreate(isRebalance: Boolean): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endBucketCreate(start: Long, success: Boolean, isRebalance: Boolean) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun startPrimaryTransfer(isRebalance: Boolean): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endPrimaryTransfer(start: Long, success: Boolean, isRebalance: Boolean) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getBucketCreatesInProgress(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getBucketCreatesCompleted(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getBucketCreatesFailed(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getBucketCreateTime(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getPrimaryTransfersInProgress(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getPrimaryTransfersCompleted(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getPrimaryTransfersFailed(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getPrimaryTransferTime(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun startRebalanceBucketCreate() { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endRebalanceBucketCreate(start: Long, end: Long, success: Boolean) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun startRebalancePrimaryTransfer() { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endRebalancePrimaryTransfer(start: Long, end: Long, success: Boolean) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getRebalanceBucketCreatesInProgress(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getRebalanceBucketCreatesCompleted(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getRebalanceBucketCreatesFailed(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getRebalanceBucketCreateTime(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getRebalancePrimaryTransfersInProgress(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getRebalancePrimaryTransfersCompleted(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getRebalancePrimaryTransfersFailed(): Int { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun getRebalancePrimaryTransferTime(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun startApplyReplication(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endApplyReplication(start: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun startSendReplication(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endSendReplication(start: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun startPutRemote(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endPutRemote(start: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun startPutLocal(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun endPutLocal(start: Long) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + + fun getPRMetaDataSentCount(): Long { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } +} \ No newline at end of file diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/MicrometerStats.kt b/geode-core/src/main/java/org/apache/geode/internal/cache/MicrometerStats.kt new file mode 100644 index 0000000..b083052 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/MicrometerStats.kt @@ -0,0 +1,27 @@ +package org.apache.geode.internal.cache + +import io.micrometer.core.instrument.Clock +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.composite.CompositeMeterRegistry +import io.micrometer.influx.InfluxConfig +import io.micrometer.influx.InfluxMeterRegistry +import io.micrometer.jmx.JmxMeterRegistry +import java.time.Duration + +abstract class MicrometerStats { + protected val metrics = CompositeMeterRegistry(Clock.SYSTEM) + private val influxMetrics: MeterRegistry = InfluxMeterRegistry(object : InfluxConfig { + override fun step(): Duration = Duration.ofSeconds(1) + override fun db(): String = "mydb" + override fun get(k: String): String? = null + override fun uri(): String = "http://localhost:8086" + }, Clock.SYSTEM) + + private val jmxMetrics: MeterRegistry = JmxMeterRegistry() + + init { + metrics.add(influxMetrics) +// metrics.add(atlasMetrics) + metrics.add(jmxMetrics) + } +} \ No newline at end of file diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStatsImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStatsImpl.java new file mode 100644 index 0000000..24cd598 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStatsImpl.java @@ -0,0 +1,1297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.geode.StatisticDescriptor; +import org.apache.geode.Statistics; +import org.apache.geode.StatisticsFactory; +import org.apache.geode.StatisticsType; +import org.apache.geode.StatisticsTypeFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl; + +/** + * Represents a statistics type that can be archived to vsd. Loading of this class automatically + * triggers statistics archival. + * <p> + * + * A singleton instance can be requested with the initSingleton(...) and getSingleton() methods. + * <p> + * + * Individual instances can be created with the constructor. + * <p> + * + * To manipulate the statistic values, use (inc|dec|set|get)<fieldName> methods. + * + * @since GemFire 5.0 + */ +public class PartitionedRegionStatsImpl implements PartitionedRegionStats { + + private static final StatisticsType type; + + private static final int dataStoreEntryCountId; + private static final int dataStoreBytesInUseId; + private static final int bucketCountId; + + private static final int putsCompletedId; + private static final int putOpsRetriedId; + private static final int putRetriesId; + + private static final int createsCompletedId; + private static final int createOpsRetriedId; + private static final int createRetriesId; + + private static final int preferredReadLocalId; + private static final int preferredReadRemoteId; + + private static final int getsCompletedId; + private static final int getOpsRetriedId; + private static final int getRetriesId; + + private static final int destroysCompletedId; + private static final int destroyOpsRetriedId; + private static final int destroyRetriesId; + + private static final int invalidatesCompletedId; + private static final int invalidateOpsRetriedId; + private static final int invalidateRetriesId; + + private static final int containsKeyCompletedId; + private static final int containsKeyOpsRetriedId; + private static final int containsKeyRetriesId; + + private static final int containsValueForKeyCompletedId; + + private static final int partitionMessagesSentId; + private static final int partitionMessagesReceivedId; + private static final int partitionMessagesProcessedId; + + private static final int putTimeId; + private static final int createTimeId; + private static final int getTimeId; + private static final int destroyTimeId; + private static final int invalidateTimeId; + private static final int containsKeyTimeId; + private static final int containsValueForKeyTimeId; + private static final int partitionMessagesProcessingTimeId; + + private static final String PUTALLS_COMPLETED = "putAllsCompleted"; + private static final String PUTALL_MSGS_RETRIED = "putAllMsgsRetried"; + private static final String PUTALL_RETRIES = "putAllRetries"; + private static final String PUTALL_TIME = "putAllTime"; + + private static final int fieldId_PUTALLS_COMPLETED; + private static final int fieldId_PUTALL_MSGS_RETRIED; + private static final int fieldId_PUTALL_RETRIES; + private static final int fieldId_PUTALL_TIME; + + private static final String REMOVE_ALLS_COMPLETED = "removeAllsCompleted"; + private static final String REMOVE_ALL_MSGS_RETRIED = "removeAllMsgsRetried"; + private static final String REMOVE_ALL_RETRIES = "removeAllRetries"; + private static final String REMOVE_ALL_TIME = "removeAllTime"; + + private static final int fieldId_REMOVE_ALLS_COMPLETED; + private static final int fieldId_REMOVE_ALL_MSGS_RETRIED; + private static final int fieldId_REMOVE_ALL_RETRIES; + private static final int fieldId_REMOVE_ALL_TIME; + + private static final int volunteeringInProgressId; // count of volunteering in progress + private static final int volunteeringBecamePrimaryId; // ended as primary + private static final int volunteeringBecamePrimaryTimeId; // time spent that ended as primary + private static final int volunteeringOtherPrimaryId; // ended as not primary + private static final int volunteeringOtherPrimaryTimeId; // time spent that ended as not primary + private static final int volunteeringClosedId; // ended as closed + private static final int volunteeringClosedTimeId; // time spent that ended as closed + + private static final int applyReplicationCompletedId; + private static final int applyReplicationInProgressId; + private static final int applyReplicationTimeId; + private static final int sendReplicationCompletedId; + private static final int sendReplicationInProgressId; + private static final int sendReplicationTimeId; + private static final int putRemoteCompletedId; + private static final int putRemoteInProgressId; + private static final int putRemoteTimeId; + private static final int putLocalCompletedId; + private static final int putLocalInProgressId; + private static final int putLocalTimeId; + + private static final int totalNumBucketsId; // total number of buckets + private static final int primaryBucketCountId; // number of hosted primary buckets + private static final int volunteeringThreadsId; // number of threads actively volunteering + private static final int lowRedundancyBucketCountId; // number of buckets currently without full + // redundancy + private static final int noCopiesBucketCountId; // number of buckets currently without any + // redundancy + + private static final int configuredRedundantCopiesId; + private static final int actualRedundantCopiesId; + + private static final int getEntriesCompletedId; + private static final int getEntryTimeId; + + private static final int recoveriesInProgressId; + private static final int recoveriesCompletedId; + private static final int recoveriesTimeId; + private static final int bucketCreatesInProgressId; + private static final int bucketCreatesCompletedId; + private static final int bucketCreatesFailedId; + private static final int bucketCreateTimeId; + + private static final int rebalanceBucketCreatesInProgressId; + private static final int rebalanceBucketCreatesCompletedId; + private static final int rebalanceBucketCreatesFailedId; + private static final int rebalanceBucketCreateTimeId; + + private static final int primaryTransfersInProgressId; + private static final int primaryTransfersCompletedId; + private static final int primaryTransfersFailedId; + private static final int primaryTransferTimeId; + + private static final int rebalancePrimaryTransfersInProgressId; + private static final int rebalancePrimaryTransfersCompletedId; + private static final int rebalancePrimaryTransfersFailedId; + private static final int rebalancePrimaryTransferTimeId; + + private static final int prMetaDataSentCountId; + + private static final int localMaxMemoryId; + + static { + final boolean largerIsBetter = true; + StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton(); + type = f.createType("PartitionedRegionStatsImpl", + "Statistics for operations and connections in the Partitioned Region", + new StatisticDescriptor[] { + + f.createIntGauge("bucketCount", "Number of buckets in this node.", "buckets"), + f.createIntCounter("putsCompleted", "Number of puts completed.", "operations", + largerIsBetter), + f.createIntCounter("putOpsRetried", + "Number of put operations which had to be retried due to failures.", "operations", + false), + f.createIntCounter("putRetries", + "Total number of times put operations had to be retried.", "retry attempts", false), + f.createIntCounter("createsCompleted", "Number of creates completed.", "operations", + largerIsBetter), + f.createIntCounter("createOpsRetried", + "Number of create operations which had to be retried due to failures.", + "operations", false), + f.createIntCounter("createRetries", + "Total number of times put operations had to be retried.", "retry attempts", false), + f.createIntCounter("preferredReadLocal", "Number of reads satisfied from local store", + "operations", largerIsBetter), + f.createIntCounter(PUTALLS_COMPLETED, "Number of putAlls completed.", "operations", + largerIsBetter), + f.createIntCounter(PUTALL_MSGS_RETRIED, + "Number of putAll messages which had to be retried due to failures.", "operations", + false), + f.createIntCounter(PUTALL_RETRIES, + "Total number of times putAll messages had to be retried.", "retry attempts", + false), + f.createLongCounter(PUTALL_TIME, "Total time spent doing putAlls.", "nanoseconds", + !largerIsBetter), + f.createIntCounter(REMOVE_ALLS_COMPLETED, "Number of removeAlls completed.", + "operations", largerIsBetter), + f.createIntCounter(REMOVE_ALL_MSGS_RETRIED, + "Number of removeAll messages which had to be retried due to failures.", + "operations", false), + f.createIntCounter(REMOVE_ALL_RETRIES, + "Total number of times removeAll messages had to be retried.", "retry attempts", + false), + f.createLongCounter(REMOVE_ALL_TIME, "Total time spent doing removeAlls.", + "nanoseconds", !largerIsBetter), + f.createIntCounter("preferredReadRemote", "Number of reads satisfied from remote store", + "operations", false), + f.createIntCounter("getsCompleted", "Number of gets completed.", "operations", + largerIsBetter), + f.createIntCounter("getOpsRetried", + "Number of get operations which had to be retried due to failures.", "operations", + false), + f.createIntCounter("getRetries", + "Total number of times get operations had to be retried.", "retry attempts", false), + f.createIntCounter("destroysCompleted", "Number of destroys completed.", "operations", + largerIsBetter), + f.createIntCounter("destroyOpsRetried", + "Number of destroy operations which had to be retried due to failures.", + "operations", false), + f.createIntCounter("destroyRetries", + "Total number of times destroy operations had to be retried.", "retry attempts", + false), + f.createIntCounter("invalidatesCompleted", "Number of invalidates completed.", + "operations", largerIsBetter), + + f.createIntCounter("invalidateOpsRetried", + "Number of invalidate operations which had to be retried due to failures.", + "operations", false), + f.createIntCounter("invalidateRetries", + "Total number of times invalidate operations had to be retried.", "retry attempts", + false), + f.createIntCounter("containsKeyCompleted", "Number of containsKeys completed.", + "operations", largerIsBetter), + + f.createIntCounter("containsKeyOpsRetried", + "Number of containsKey or containsValueForKey operations which had to be retried due to failures.", + "operations", false), + f.createIntCounter("containsKeyRetries", + "Total number of times containsKey or containsValueForKey operations had to be retried.", + "operations", false), + f.createIntCounter("containsValueForKeyCompleted", + "Number of containsValueForKeys completed.", "operations", largerIsBetter), + f.createIntCounter("PartitionMessagesSent", "Number of PartitionMessages Sent.", + "operations", largerIsBetter), + f.createIntCounter("PartitionMessagesReceived", "Number of PartitionMessages Received.", + "operations", largerIsBetter), + f.createIntCounter("PartitionMessagesProcessed", + "Number of PartitionMessages Processed.", "operations", largerIsBetter), + f.createLongCounter("putTime", "Total time spent doing puts.", "nanoseconds", false), + f.createLongCounter("createTime", "Total time spent doing create operations.", + "nanoseconds", false), + f.createLongCounter("getTime", "Total time spent performing get operations.", + "nanoseconds", false), + f.createLongCounter("destroyTime", "Total time spent doing destroys.", "nanoseconds", + false), + f.createLongCounter("invalidateTime", "Total time spent doing invalidates.", + "nanoseconds", false), + f.createLongCounter("containsKeyTime", + "Total time spent performing containsKey operations.", "nanoseconds", false), + f.createLongCounter("containsValueForKeyTime", + "Total time spent performing containsValueForKey operations.", "nanoseconds", + false), + f.createLongCounter("partitionMessagesProcessingTime", + "Total time spent on PartitionMessages processing.", "nanoseconds", false), + f.createIntGauge("dataStoreEntryCount", + "The number of entries stored in this Cache for the named Partitioned Region. This does not include entries which are tombstones. See CachePerfStats.tombstoneCount.", + "entries"), + f.createLongGauge("dataStoreBytesInUse", + "The current number of bytes stored in this Cache for the named Partitioned Region", + "bytes"), + f.createIntGauge("volunteeringInProgress", + "Current number of attempts to volunteer for primary of a bucket.", "operations"), + f.createIntCounter("volunteeringBecamePrimary", + "Total number of attempts to volunteer that ended when this member became primary.", + "operations"), + f.createLongCounter("volunteeringBecamePrimaryTime", + "Total time spent volunteering that ended when this member became primary.", + "nanoseconds", false), + f.createIntCounter("volunteeringOtherPrimary", + "Total number of attempts to volunteer that ended when this member discovered other primary.", + "operations"), + f.createLongCounter("volunteeringOtherPrimaryTime", + "Total time spent volunteering that ended when this member discovered other primary.", + "nanoseconds", false), + f.createIntCounter("volunteeringClosed", + "Total number of attempts to volunteer that ended when this member's bucket closed.", + "operations"), + f.createLongCounter("volunteeringClosedTime", + "Total time spent volunteering that ended when this member's bucket closed.", + "nanoseconds", false), + f.createIntGauge("totalNumBuckets", "The total number of buckets.", "buckets"), + f.createIntGauge("primaryBucketCount", + "Current number of primary buckets hosted locally.", "buckets"), + f.createIntGauge("volunteeringThreads", + "Current number of threads volunteering for primary.", "threads"), + f.createIntGauge("lowRedundancyBucketCount", + "Current number of buckets without full redundancy.", "buckets"), + f.createIntGauge("noCopiesBucketCount", + "Current number of buckets without any copies remaining.", "buckets"), + f.createIntGauge("configuredRedundantCopies", + "Configured number of redundant copies for this partitioned region.", "copies"), + f.createIntGauge("actualRedundantCopies", + "Actual number of redundant copies for this partitioned region.", "copies"), + f.createIntCounter("getEntryCompleted", "Number of getEntry operations completed.", + "operations", largerIsBetter), + f.createLongCounter("getEntryTime", "Total time spent performing getEntry operations.", + "nanoseconds", false), + + f.createIntGauge("recoveriesInProgress", + "Current number of redundancy recovery operations in progress for this region.", + "operations"), + f.createIntCounter("recoveriesCompleted", + "Total number of redundancy recovery operations performed on this region.", + "operations"), + f.createLongCounter("recoveryTime", "Total number time spent recovering redundancy.", + "operations"), + f.createIntGauge("bucketCreatesInProgress", + "Current number of bucket create operations being performed for rebalancing.", + "operations"), + f.createIntCounter("bucketCreatesCompleted", + "Total number of bucket create operations performed for rebalancing.", + "operations"), + f.createIntCounter("bucketCreatesFailed", + "Total number of bucket create operations performed for rebalancing that failed.", + "operations"), + f.createLongCounter("bucketCreateTime", + "Total time spent performing bucket create operations for rebalancing.", + "nanoseconds", false), + f.createIntGauge("primaryTransfersInProgress", + "Current number of primary transfer operations being performed for rebalancing.", + "operations"), + f.createIntCounter("primaryTransfersCompleted", + "Total number of primary transfer operations performed for rebalancing.", + "operations"), + f.createIntCounter("primaryTransfersFailed", + "Total number of primary transfer operations performed for rebalancing that failed.", + "operations"), + f.createLongCounter("primaryTransferTime", + "Total time spent performing primary transfer operations for rebalancing.", + "nanoseconds", false), + + f.createIntCounter("applyReplicationCompleted", + "Total number of replicated values sent from a primary to this redundant data store.", + "operations", largerIsBetter), + f.createIntGauge("applyReplicationInProgress", + "Current number of replication operations in progress on this redundant data store.", + "operations", !largerIsBetter), + f.createLongCounter("applyReplicationTime", + "Total time spent storing replicated values on this redundant data store.", + "nanoseconds", !largerIsBetter), + f.createIntCounter("sendReplicationCompleted", + "Total number of replicated values sent from this primary to a redundant data store.", + "operations", largerIsBetter), + f.createIntGauge("sendReplicationInProgress", + "Current number of replication operations in progress from this primary.", + "operations", !largerIsBetter), + f.createLongCounter("sendReplicationTime", + "Total time spent replicating values from this primary to a redundant data store.", + "nanoseconds", !largerIsBetter), + f.createIntCounter("putRemoteCompleted", + "Total number of completed puts that did not originate in the primary. These puts require an extra network hop to the primary.", + "operations", largerIsBetter), + f.createIntGauge("putRemoteInProgress", + "Current number of puts in progress that did not originate in the primary.", + "operations", !largerIsBetter), + f.createLongCounter("putRemoteTime", + "Total time spent doing puts that did not originate in the primary.", "nanoseconds", + !largerIsBetter), + f.createIntCounter("putLocalCompleted", + "Total number of completed puts that did originate in the primary. These puts are optimal.", + "operations", largerIsBetter), + f.createIntGauge("putLocalInProgress", + "Current number of puts in progress that did originate in the primary.", + "operations", !largerIsBetter), + f.createLongCounter("putLocalTime", + "Total time spent doing puts that did originate in the primary.", "nanoseconds", + !largerIsBetter), + + f.createIntGauge("rebalanceBucketCreatesInProgress", + "Current number of bucket create operations being performed for rebalancing.", + "operations"), + f.createIntCounter("rebalanceBucketCreatesCompleted", + "Total number of bucket create operations performed for rebalancing.", + "operations"), + f.createIntCounter("rebalanceBucketCreatesFailed", + "Total number of bucket create operations performed for rebalancing that failed.", + "operations"), + f.createLongCounter("rebalanceBucketCreateTime", + "Total time spent performing bucket create operations for rebalancing.", + "nanoseconds", false), + f.createIntGauge("rebalancePrimaryTransfersInProgress", + "Current number of primary transfer operations being performed for rebalancing.", + "operations"), + f.createIntCounter("rebalancePrimaryTransfersCompleted", + "Total number of primary transfer operations performed for rebalancing.", + "operations"), + f.createIntCounter("rebalancePrimaryTransfersFailed", + "Total number of primary transfer operations performed for rebalancing that failed.", + "operations"), + f.createLongCounter("rebalancePrimaryTransferTime", + "Total time spent performing primary transfer operations for rebalancing.", + "nanoseconds", false), + f.createLongCounter("prMetaDataSentCount", + "total number of times meta data refreshed sent on client's request.", "operation", + false), + + f.createLongGauge("localMaxMemory", + "local max memory in bytes for this region on this member", "bytes") + + }); + + bucketCountId = type.nameToId("bucketCount"); + + putsCompletedId = type.nameToId("putsCompleted"); + putOpsRetriedId = type.nameToId("putOpsRetried"); + putRetriesId = type.nameToId("putRetries"); + createsCompletedId = type.nameToId("createsCompleted"); + createOpsRetriedId = type.nameToId("createOpsRetried"); + createRetriesId = type.nameToId("createRetries"); + getsCompletedId = type.nameToId("getsCompleted"); + preferredReadLocalId = type.nameToId("preferredReadLocal"); + preferredReadRemoteId = type.nameToId("preferredReadRemote"); + getOpsRetriedId = type.nameToId("getOpsRetried"); + getRetriesId = type.nameToId("getRetries"); + destroysCompletedId = type.nameToId("destroysCompleted"); + destroyOpsRetriedId = type.nameToId("destroyOpsRetried"); + destroyRetriesId = type.nameToId("destroyRetries"); + invalidatesCompletedId = type.nameToId("invalidatesCompleted"); + invalidateOpsRetriedId = type.nameToId("invalidateOpsRetried"); + invalidateRetriesId = type.nameToId("invalidateRetries"); + containsKeyCompletedId = type.nameToId("containsKeyCompleted"); + containsKeyOpsRetriedId = type.nameToId("containsKeyOpsRetried"); + containsKeyRetriesId = type.nameToId("containsKeyRetries"); + containsValueForKeyCompletedId = type.nameToId("containsValueForKeyCompleted"); + partitionMessagesSentId = type.nameToId("PartitionMessagesSent"); + partitionMessagesReceivedId = type.nameToId("PartitionMessagesReceived"); + partitionMessagesProcessedId = type.nameToId("PartitionMessagesProcessed"); + fieldId_PUTALLS_COMPLETED = type.nameToId(PUTALLS_COMPLETED); + fieldId_PUTALL_MSGS_RETRIED = type.nameToId(PUTALL_MSGS_RETRIED); + fieldId_PUTALL_RETRIES = type.nameToId(PUTALL_RETRIES); + fieldId_PUTALL_TIME = type.nameToId(PUTALL_TIME); + fieldId_REMOVE_ALLS_COMPLETED = type.nameToId(REMOVE_ALLS_COMPLETED); + fieldId_REMOVE_ALL_MSGS_RETRIED = type.nameToId(REMOVE_ALL_MSGS_RETRIED); + fieldId_REMOVE_ALL_RETRIES = type.nameToId(REMOVE_ALL_RETRIES); + fieldId_REMOVE_ALL_TIME = type.nameToId(REMOVE_ALL_TIME); + putTimeId = type.nameToId("putTime"); + createTimeId = type.nameToId("createTime"); + getTimeId = type.nameToId("getTime"); + destroyTimeId = type.nameToId("destroyTime"); + invalidateTimeId = type.nameToId("invalidateTime"); + containsKeyTimeId = type.nameToId("containsKeyTime"); + containsValueForKeyTimeId = type.nameToId("containsValueForKeyTime"); + partitionMessagesProcessingTimeId = type.nameToId("partitionMessagesProcessingTime"); + dataStoreEntryCountId = type.nameToId("dataStoreEntryCount"); + dataStoreBytesInUseId = type.nameToId("dataStoreBytesInUse"); + + volunteeringInProgressId = type.nameToId("volunteeringInProgress"); + volunteeringBecamePrimaryId = type.nameToId("volunteeringBecamePrimary"); + volunteeringBecamePrimaryTimeId = type.nameToId("volunteeringBecamePrimaryTime"); + volunteeringOtherPrimaryId = type.nameToId("volunteeringOtherPrimary"); + volunteeringOtherPrimaryTimeId = type.nameToId("volunteeringOtherPrimaryTime"); + volunteeringClosedId = type.nameToId("volunteeringClosed"); + volunteeringClosedTimeId = type.nameToId("volunteeringClosedTime"); + + totalNumBucketsId = type.nameToId("totalNumBuckets"); + primaryBucketCountId = type.nameToId("primaryBucketCount"); + volunteeringThreadsId = type.nameToId("volunteeringThreads"); + lowRedundancyBucketCountId = type.nameToId("lowRedundancyBucketCount"); + noCopiesBucketCountId = type.nameToId("noCopiesBucketCount"); + + getEntriesCompletedId = type.nameToId("getEntryCompleted"); + getEntryTimeId = type.nameToId("getEntryTime"); + + configuredRedundantCopiesId = type.nameToId("configuredRedundantCopies"); + actualRedundantCopiesId = type.nameToId("actualRedundantCopies"); + + recoveriesCompletedId = type.nameToId("recoveriesCompleted"); + recoveriesInProgressId = type.nameToId("recoveriesInProgress"); + recoveriesTimeId = type.nameToId("recoveryTime"); + bucketCreatesInProgressId = type.nameToId("bucketCreatesInProgress"); + bucketCreatesCompletedId = type.nameToId("bucketCreatesCompleted"); + bucketCreatesFailedId = type.nameToId("bucketCreatesFailed"); + bucketCreateTimeId = type.nameToId("bucketCreateTime"); + primaryTransfersInProgressId = type.nameToId("primaryTransfersInProgress"); + primaryTransfersCompletedId = type.nameToId("primaryTransfersCompleted"); + primaryTransfersFailedId = type.nameToId("primaryTransfersFailed"); + primaryTransferTimeId = type.nameToId("primaryTransferTime"); + + rebalanceBucketCreatesInProgressId = type.nameToId("rebalanceBucketCreatesInProgress"); + rebalanceBucketCreatesCompletedId = type.nameToId("rebalanceBucketCreatesCompleted"); + rebalanceBucketCreatesFailedId = type.nameToId("rebalanceBucketCreatesFailed"); + rebalanceBucketCreateTimeId = type.nameToId("rebalanceBucketCreateTime"); + rebalancePrimaryTransfersInProgressId = type.nameToId("rebalancePrimaryTransfersInProgress"); + rebalancePrimaryTransfersCompletedId = type.nameToId("rebalancePrimaryTransfersCompleted"); + rebalancePrimaryTransfersFailedId = type.nameToId("rebalancePrimaryTransfersFailed"); + rebalancePrimaryTransferTimeId = type.nameToId("rebalancePrimaryTransferTime"); + + applyReplicationCompletedId = type.nameToId("applyReplicationCompleted"); + applyReplicationInProgressId = type.nameToId("applyReplicationInProgress"); + applyReplicationTimeId = type.nameToId("applyReplicationTime"); + sendReplicationCompletedId = type.nameToId("sendReplicationCompleted"); + sendReplicationInProgressId = type.nameToId("sendReplicationInProgress"); + sendReplicationTimeId = type.nameToId("sendReplicationTime"); + putRemoteCompletedId = type.nameToId("putRemoteCompleted"); + putRemoteInProgressId = type.nameToId("putRemoteInProgress"); + putRemoteTimeId = type.nameToId("putRemoteTime"); + putLocalCompletedId = type.nameToId("putLocalCompleted"); + putLocalInProgressId = type.nameToId("putLocalInProgress"); + putLocalTimeId = type.nameToId("putLocalTime"); + + prMetaDataSentCountId = type.nameToId("prMetaDataSentCount"); + + localMaxMemoryId = type.nameToId("localMaxMemory"); + } + + private final Statistics stats; + + /** + * Utility map for temporarily holding stat start times. + * <p> + * This was originally added to avoid having to add a long volunteeringStarted variable to every + * instance of BucketAdvisor. Majority of BucketAdvisors never volunteer and an instance of + * BucketAdvisor exists for every bucket defined in a PartitionedRegion which could result in a + * lot of unused longs. Volunteering is a rare event and thus the performance implications of a + * HashMap lookup is small and preferrable to so many longs. Key: BucketAdvisor, Value: Long + */ + private final Map startTimeMap; + + public PartitionedRegionStatsImpl(StatisticsFactory factory, String name) { + this.stats = factory.createAtomicStatistics(type, name /* fixes bug 42343 */); + + if (CachePerfStats.enableClockStats) { + this.startTimeMap = new ConcurrentHashMap(); + } else { + this.startTimeMap = Collections.EMPTY_MAP; + } + } + + @Override + public void close() { + this.stats.close(); + } + + public Statistics getStats() { + return this.stats; + } + + // ------------------------------------------------------------------------ + // region op stats + // ------------------------------------------------------------------------ + + @Override + public void endPut(long start) { + endPut(start, 1); + } + + /** + * This method sets the end time for putAll and updates the counters + * + * @param start + */ + @Override + public void endPutAll(long start) { + endPutAll(start, 1); + } + + @Override + public void endRemoveAll(long start) { + endRemoveAll(start, 1); + } + + @Override + public void endCreate(long start) { + endCreate(start, 1); + } + + @Override + public void endGet(long start) { + endGet(start, 1); + } + + @Override + public void endContainsKey(long start) { + endContainsKey(start, 1); + } + + @Override + public void endContainsValueForKey(long start) { + endContainsValueForKey(start, 1); + } + + @Override + public void endPut(long start, int numInc) { + if (CachePerfStats.enableClockStats) { + long delta = CachePerfStats.getStatTime() - start; + this.stats.incLong(putTimeId, delta); + } + this.stats.incInt(putsCompletedId, numInc); + } + + /** + * This method sets the end time for putAll and updates the counters + * + * @param start + * @param numInc + */ + @Override + public void endPutAll(long start, int numInc) { + if (CachePerfStats.enableClockStats) { + long delta = CachePerfStats.getStatTime() - start; + this.stats.incLong(fieldId_PUTALL_TIME, delta); + // this.putStatsHistogram.endOp(delta); + + } + this.stats.incInt(fieldId_PUTALLS_COMPLETED, numInc); + } + + @Override + public void endRemoveAll(long start, int numInc) { + if (CachePerfStats.enableClockStats) { + long delta = CachePerfStats.getStatTime() - start; + this.stats.incLong(fieldId_REMOVE_ALL_TIME, delta); + } + this.stats.incInt(fieldId_REMOVE_ALLS_COMPLETED, numInc); + } + + @Override + public void endCreate(long start, int numInc) { + if (CachePerfStats.enableClockStats) { + this.stats.incLong(createTimeId, CachePerfStats.getStatTime() - start); + } + this.stats.incInt(createsCompletedId, numInc); + } + + @Override + public void endGet(long start, int numInc) { + if (CachePerfStats.enableClockStats) { + final long delta = CachePerfStats.getStatTime() - start; + this.stats.incLong(getTimeId, delta); + } + this.stats.incInt(getsCompletedId, numInc); + } + + @Override + public void endDestroy(long start) { + if (CachePerfStats.enableClockStats) { + this.stats.incLong(destroyTimeId, CachePerfStats.getStatTime() - start); + } + this.stats.incInt(destroysCompletedId, 1); + } + + @Override + public void endInvalidate(long start) { + if (CachePerfStats.enableClockStats) { + this.stats.incLong(invalidateTimeId, CachePerfStats.getStatTime() - start); + } + this.stats.incInt(invalidatesCompletedId, 1); + } + + @Override + public void endContainsKey(long start, int numInc) { + if (CachePerfStats.enableClockStats) { + this.stats.incLong(containsKeyTimeId, CachePerfStats.getStatTime() - start); + } + this.stats.incInt(containsKeyCompletedId, numInc); + } + + @Override + public void endContainsValueForKey(long start, int numInc) { + if (CachePerfStats.enableClockStats) { + this.stats.incLong(containsValueForKeyTimeId, CachePerfStats.getStatTime() - start); + } + this.stats.incInt(containsValueForKeyCompletedId, numInc); + } + + @Override + public void incContainsKeyValueRetries() { + this.stats.incInt(containsKeyRetriesId, 1); + } + + @Override + public void incContainsKeyValueOpsRetried() { + this.stats.incInt(containsKeyOpsRetriedId, 1); + } + + @Override + public void incInvalidateRetries() { + this.stats.incInt(invalidateRetriesId, 1); + } + + @Override + public void incInvalidateOpsRetried() { + this.stats.incInt(invalidateOpsRetriedId, 1); + } + + @Override + public void incDestroyRetries() { + this.stats.incInt(destroyRetriesId, 1); + } + + @Override + public void incDestroyOpsRetried() { + this.stats.incInt(destroyOpsRetriedId, 1); + } + + @Override + public void incPutRetries() { + this.stats.incInt(putRetriesId, 1); + } + + @Override + public void incPutOpsRetried() { + this.stats.incInt(putOpsRetriedId, 1); + } + + @Override + public void incGetOpsRetried() { + this.stats.incInt(getOpsRetriedId, 1); + } + + @Override + public void incGetRetries() { + this.stats.incInt(getRetriesId, 1); + } + + @Override + public void incCreateOpsRetried() { + this.stats.incInt(createOpsRetriedId, 1); + } + + @Override + public void incCreateRetries() { + this.stats.incInt(createRetriesId, 1); + } + + // ------------------------------------------------------------------------ + // preferred read stats + // ------------------------------------------------------------------------ + + @Override + public void incPreferredReadLocal() { + this.stats.incInt(preferredReadLocalId, 1); + } + + @Override + public void incPreferredReadRemote() { + this.stats.incInt(preferredReadRemoteId, 1); + } + + // ------------------------------------------------------------------------ + // messaging stats + // ------------------------------------------------------------------------ + + @Override + public long startPartitionMessageProcessing() { + this.stats.incInt(partitionMessagesReceivedId, 1); + return startTime(); + } + + @Override + public void endPartitionMessagesProcessing(long start) { + if (CachePerfStats.enableClockStats) { + long delta = CachePerfStats.getStatTime() - start; + this.stats.incLong(partitionMessagesProcessingTimeId, delta); + } + this.stats.incInt(partitionMessagesProcessedId, 1); + } + + @Override + public void incPartitionMessagesSent() { + this.stats.incInt(partitionMessagesSentId, 1); + } + + // ------------------------------------------------------------------------ + // datastore stats + // ------------------------------------------------------------------------ + + @Override + public void incBucketCount(int delta) { + this.stats.incInt(bucketCountId, delta); + } + + @Override + public void setBucketCount(int i) { + this.stats.setInt(bucketCountId, i); + } + + @Override + public void incDataStoreEntryCount(int amt) { + this.stats.incInt(dataStoreEntryCountId, amt); + } + + @Override + public int getDataStoreEntryCount() { + return this.stats.getInt(dataStoreEntryCountId); + } + + @Override + public void incBytesInUse(long delta) { + this.stats.incLong(dataStoreBytesInUseId, delta); + } + + @Override + public long getDataStoreBytesInUse() { + return this.stats.getLong(dataStoreBytesInUseId); + } + + @Override + public int getTotalBucketCount() { + int bucketCount = this.stats.getInt(bucketCountId); + return bucketCount; + } + + @Override + public void incPutAllRetries() { + this.stats.incInt(fieldId_PUTALL_RETRIES, 1); + } + + @Override + public void incPutAllMsgsRetried() { + this.stats.incInt(fieldId_PUTALL_MSGS_RETRIED, 1); + } + + @Override + public void incRemoveAllRetries() { + this.stats.incInt(fieldId_REMOVE_ALL_RETRIES, 1); + } + + @Override + public void incRemoveAllMsgsRetried() { + this.stats.incInt(fieldId_REMOVE_ALL_MSGS_RETRIED, 1); + } + + // ------------------------------------------------------------------------ + // stats for volunteering/discovering/becoming primary + // ------------------------------------------------------------------------ + + @Override + public int getVolunteeringInProgress() { + return this.stats.getInt(volunteeringInProgressId); + } + + @Override + public int getVolunteeringBecamePrimary() { + return this.stats.getInt(volunteeringBecamePrimaryId); + } + + @Override + public long getVolunteeringBecamePrimaryTime() { + return this.stats.getLong(volunteeringBecamePrimaryTimeId); + } + + @Override + public int getVolunteeringOtherPrimary() { + return this.stats.getInt(volunteeringOtherPrimaryId); + } + + @Override + public long getVolunteeringOtherPrimaryTime() { + return this.stats.getLong(volunteeringOtherPrimaryTimeId); + } + + @Override + public int getVolunteeringClosed() { + return this.stats.getInt(volunteeringClosedId); + } + + @Override + public long getVolunteeringClosedTime() { + return this.stats.getLong(volunteeringClosedTimeId); + } + + @Override + public long startVolunteering() { + this.stats.incInt(volunteeringInProgressId, 1); + return CachePerfStats.getStatTime(); + } + + @Override + public void endVolunteeringBecamePrimary(long start) { + long ts = CachePerfStats.getStatTime(); + this.stats.incInt(volunteeringInProgressId, -1); + this.stats.incInt(volunteeringBecamePrimaryId, 1); + if (CachePerfStats.enableClockStats) { + long time = ts - start; + this.stats.incLong(volunteeringBecamePrimaryTimeId, time); + } + } + + @Override + public void endVolunteeringOtherPrimary(long start) { + long ts = CachePerfStats.getStatTime(); + this.stats.incInt(volunteeringInProgressId, -1); + this.stats.incInt(volunteeringOtherPrimaryId, 1); + if (CachePerfStats.enableClockStats) { + long time = ts - start; + this.stats.incLong(volunteeringOtherPrimaryTimeId, time); + } + } + + @Override + public void endVolunteeringClosed(long start) { + long ts = CachePerfStats.getStatTime(); + this.stats.incInt(volunteeringInProgressId, -1); + this.stats.incInt(volunteeringClosedId, 1); + if (CachePerfStats.enableClockStats) { + long time = ts - start; + this.stats.incLong(volunteeringClosedTimeId, time); + } + } + + @Override + public int getTotalNumBuckets() { + return this.stats.getInt(totalNumBucketsId); + } + + @Override + public void incTotalNumBuckets(int val) { + this.stats.incInt(totalNumBucketsId, val); + } + + @Override + public int getPrimaryBucketCount() { + return this.stats.getInt(primaryBucketCountId); + } + + @Override + public void incPrimaryBucketCount(int val) { + this.stats.incInt(primaryBucketCountId, val); + } + + @Override + public int getVolunteeringThreads() { + return this.stats.getInt(volunteeringThreadsId); + } + + @Override + public void incVolunteeringThreads(int val) { + this.stats.incInt(volunteeringThreadsId, val); + } + + @Override + public int getLowRedundancyBucketCount() { + return this.stats.getInt(lowRedundancyBucketCountId); + } + + @Override + public int getNoCopiesBucketCount() { + return this.stats.getInt(noCopiesBucketCountId); + } + + @Override + public void incLowRedundancyBucketCount(int val) { + this.stats.incInt(lowRedundancyBucketCountId, val); + } + + @Override + public void incNoCopiesBucketCount(int val) { + this.stats.incInt(noCopiesBucketCountId, val); + } + + @Override + public int getConfiguredRedundantCopies() { + return this.stats.getInt(configuredRedundantCopiesId); + } + + @Override + public void setConfiguredRedundantCopies(int val) { + this.stats.setInt(configuredRedundantCopiesId, val); + } + + @Override + public void setLocalMaxMemory(long l) { + this.stats.setLong(localMaxMemoryId, l); + } + + @Override + public int getActualRedundantCopies() { + return this.stats.getInt(actualRedundantCopiesId); + } + + @Override + public void setActualRedundantCopies(int val) { + this.stats.setInt(actualRedundantCopiesId, val); + } + + // ------------------------------------------------------------------------ + // startTimeMap methods + // ------------------------------------------------------------------------ + + /** Put stat start time in holding map for later removal and use by caller */ + @Override + public void putStartTime(Object key, long startTime) { + if (CachePerfStats.enableClockStats) { + this.startTimeMap.put(key, Long.valueOf(startTime)); + } + } + + /** Remove stat start time from holding map to complete a clock stat */ + @Override + public long removeStartTime(Object key) { + Long startTime = (Long) this.startTimeMap.remove(key); + return startTime == null ? 0 : startTime.longValue(); + } + + /** + * Statistic to track the {@link Region#getEntry(Object)} call + * + * @param startTime the time the getEntry operation started + */ + @Override + public void endGetEntry(long startTime) { + endGetEntry(startTime, 1); + } + + /** + * This method sets the end time for update and updates the counters + * + * @param start + * @param numInc + */ + @Override + public void endGetEntry(long start, int numInc) { + if (CachePerfStats.enableClockStats) { + this.stats.incLong(getEntryTimeId, CachePerfStats.getStatTime() - start); + } + this.stats.incInt(getEntriesCompletedId, numInc); + } + + // ------------------------------------------------------------------------ + // bucket creation, primary transfer stats (see also rebalancing stats below) + // ------------------------------------------------------------------------ + @Override + public long startRecovery() { + this.stats.incInt(recoveriesInProgressId, 1); + return getStatTime(); + } + + @Override + public void endRecovery(long start) { + long ts = getStatTime(); + this.stats.incInt(recoveriesInProgressId, -1); + if (CachePerfStats.enableClockStats) { + this.stats.incLong(recoveriesTimeId, ts - start); + } + this.stats.incInt(recoveriesCompletedId, 1); + } + + @Override + public long startBucketCreate(boolean isRebalance) { + this.stats.incInt(bucketCreatesInProgressId, 1); + if (isRebalance) { + startRebalanceBucketCreate(); + } + return getStatTime(); + } + + @Override + public void endBucketCreate(long start, boolean success, boolean isRebalance) { + long ts = getStatTime(); + this.stats.incInt(bucketCreatesInProgressId, -1); + if (CachePerfStats.enableClockStats) { + this.stats.incLong(bucketCreateTimeId, ts - start); + } + if (success) { + this.stats.incInt(bucketCreatesCompletedId, 1); + } else { + this.stats.incInt(bucketCreatesFailedId, 1); + } + if (isRebalance) { + endRebalanceBucketCreate(start, ts, success); + } + } + + @Override + public long startPrimaryTransfer(boolean isRebalance) { + this.stats.incInt(primaryTransfersInProgressId, 1); + if (isRebalance) { + startRebalancePrimaryTransfer(); + } + return getStatTime(); + } + + @Override + public void endPrimaryTransfer(long start, boolean success, boolean isRebalance) { + long ts = getStatTime(); + this.stats.incInt(primaryTransfersInProgressId, -1); + if (CachePerfStats.enableClockStats) { + this.stats.incLong(primaryTransferTimeId, ts - start); + } + if (success) { + this.stats.incInt(primaryTransfersCompletedId, 1); + } else { + this.stats.incInt(primaryTransfersFailedId, 1); + } + if (isRebalance) { + endRebalancePrimaryTransfer(start, ts, success); + } + } + + @Override + public int getBucketCreatesInProgress() { + return this.stats.getInt(bucketCreatesInProgressId); + } + + @Override + public int getBucketCreatesCompleted() { + return this.stats.getInt(bucketCreatesCompletedId); + } + + @Override + public int getBucketCreatesFailed() { + return this.stats.getInt(bucketCreatesFailedId); + } + + @Override + public long getBucketCreateTime() { + return this.stats.getLong(bucketCreateTimeId); + } + + @Override + public int getPrimaryTransfersInProgress() { + return this.stats.getInt(primaryTransfersInProgressId); + } + + @Override + public int getPrimaryTransfersCompleted() { + return this.stats.getInt(primaryTransfersCompletedId); + } + + @Override + public int getPrimaryTransfersFailed() { + return this.stats.getInt(primaryTransfersFailedId); + } + + @Override + public long getPrimaryTransferTime() { + return this.stats.getLong(primaryTransferTimeId); + } + + // ------------------------------------------------------------------------ + // rebalancing stats + // ------------------------------------------------------------------------ + + private void startRebalanceBucketCreate() { + this.stats.incInt(rebalanceBucketCreatesInProgressId, 1); + } + + private void endRebalanceBucketCreate(long start, long end, boolean success) { + this.stats.incInt(rebalanceBucketCreatesInProgressId, -1); + if (CachePerfStats.enableClockStats) { + this.stats.incLong(rebalanceBucketCreateTimeId, end - start); + } + if (success) { + this.stats.incInt(rebalanceBucketCreatesCompletedId, 1); + } else { + this.stats.incInt(rebalanceBucketCreatesFailedId, 1); + } + } + + private void startRebalancePrimaryTransfer() { + this.stats.incInt(rebalancePrimaryTransfersInProgressId, 1); + } + + private void endRebalancePrimaryTransfer(long start, long end, boolean success) { + this.stats.incInt(rebalancePrimaryTransfersInProgressId, -1); + if (CachePerfStats.enableClockStats) { + this.stats.incLong(rebalancePrimaryTransferTimeId, end - start); + } + if (success) { + this.stats.incInt(rebalancePrimaryTransfersCompletedId, 1); + } else { + this.stats.incInt(rebalancePrimaryTransfersFailedId, 1); + } + } + + @Override + public int getRebalanceBucketCreatesInProgress() { + return this.stats.getInt(rebalanceBucketCreatesInProgressId); + } + + @Override + public int getRebalanceBucketCreatesCompleted() { + return this.stats.getInt(rebalanceBucketCreatesCompletedId); + } + + @Override + public int getRebalanceBucketCreatesFailed() { + return this.stats.getInt(rebalanceBucketCreatesFailedId); + } + + @Override + public long getRebalanceBucketCreateTime() { + return this.stats.getLong(rebalanceBucketCreateTimeId); + } + + @Override + public int getRebalancePrimaryTransfersInProgress() { + return this.stats.getInt(rebalancePrimaryTransfersInProgressId); + } + + @Override + public int getRebalancePrimaryTransfersCompleted() { + return this.stats.getInt(rebalancePrimaryTransfersCompletedId); + } + + @Override + public int getRebalancePrimaryTransfersFailed() { + return this.stats.getInt(rebalancePrimaryTransfersFailedId); + } + + @Override + public long getRebalancePrimaryTransferTime() { + return this.stats.getLong(rebalancePrimaryTransferTimeId); + } + + @Override + public long startApplyReplication() { + stats.incInt(applyReplicationInProgressId, 1); + return CachePerfStats.getStatTime(); + } + + @Override + public void endApplyReplication(long start) { + long delta = CachePerfStats.getStatTime() - start; + stats.incInt(applyReplicationInProgressId, -1); + stats.incInt(applyReplicationCompletedId, 1); + stats.incLong(applyReplicationTimeId, delta); + } + + @Override + public long startSendReplication() { + stats.incInt(sendReplicationInProgressId, 1); + return CachePerfStats.getStatTime(); + } + + @Override + public void endSendReplication(long start) { + long delta = CachePerfStats.getStatTime() - start; + stats.incInt(sendReplicationInProgressId, -1); + stats.incInt(sendReplicationCompletedId, 1); + stats.incLong(sendReplicationTimeId, delta); + } + + @Override + public long startPutRemote() { + stats.incInt(putRemoteInProgressId, 1); + return CachePerfStats.getStatTime(); + } + + @Override + public void endPutRemote(long start) { + long delta = CachePerfStats.getStatTime() - start; + stats.incInt(putRemoteInProgressId, -1); + stats.incInt(putRemoteCompletedId, 1); + stats.incLong(putRemoteTimeId, delta); + } + + @Override + public long startPutLocal() { + stats.incInt(putLocalInProgressId, 1); + return CachePerfStats.getStatTime(); + } + + @Override + public void endPutLocal(long start) { + long delta = CachePerfStats.getStatTime() - start; + stats.incInt(putLocalInProgressId, -1); + stats.incInt(putLocalCompletedId, 1); + stats.incLong(putLocalTimeId, delta); + } + + @Override + public void incPRMetaDataSentCount() { + this.stats.incLong(prMetaDataSentCountId, 1); + } + + @Override + public long getPRMetaDataSentCount() { + return this.stats.getLong(prMetaDataSentCountId); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TimedMicrometerPartitionedRegionStats.kt b/geode-core/src/main/java/org/apache/geode/internal/cache/TimedMicrometerPartitionedRegionStats.kt new file mode 100644 index 0000000..b9018db --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TimedMicrometerPartitionedRegionStats.kt @@ -0,0 +1,67 @@ +package org.apache.geode.internal.cache + +import io.micrometer.core.instrument.Timer +import java.util.concurrent.TimeUnit + +class TimedMicrometerPartitionedRegionStats(regionName: String) : MicrometerPartitionRegionStats(regionName) { + private fun constructTimerForMetric(metricName: String): Timer = + metrics.timer("${metricName}Latency", regionName, PARTITIONED_REGION) + + private val putTimer = constructTimerForMetric("put") + private val putAllTimer = constructTimerForMetric("putAll") + private val createTimer = constructTimerForMetric("create") + private val removeAllTimer = constructTimerForMetric("removeAll") + private val getTimer = constructTimerForMetric("get") + private val destroyTimer = constructTimerForMetric("destroy") + private val invalidateTimer = constructTimerForMetric("invalidate") + private val containsKeyTimer = constructTimerForMetric("containsKey") + private val containValueForKeyTimer = constructTimerForMetric("containValueForKey") + + override fun endPut(startTimeInNanos: Long) { + super.endPut(startTimeInNanos) + updateTimer(startTimeInNanos, putTimer) + } + + override fun endPutAll(startTimeInNanos: Long) { + super.endPutAll(startTimeInNanos) + updateTimer(startTimeInNanos, putAllTimer) + } + + override fun endCreate(startTimeInNanos: Long) { + super.endCreate(startTimeInNanos) + updateTimer(startTimeInNanos, createTimer) + } + + override fun endRemoveAll(startTimeInNanos: Long) { + super.endRemoveAll(startTimeInNanos) + updateTimer(startTimeInNanos, removeAllTimer) + } + + override fun endGet(startTimeInNanos: Long) { + super.endGet(startTimeInNanos) + updateTimer(startTimeInNanos, getTimer) + } + + override fun endDestroy(startTimeInNanos: Long) { + super.endDestroy(startTimeInNanos) + updateTimer(startTimeInNanos, destroyTimer) + } + + override fun endInvalidate(startTimeInNanos: Long) { + super.endInvalidate(startTimeInNanos) + updateTimer(startTimeInNanos, invalidateTimer) + } + + override fun endContainsKey(startTimeInNanos: Long) { + super.endContainsKey(startTimeInNanos) + updateTimer(startTimeInNanos, containsKeyTimer) + } + override fun endContainsValueForKey(startTimeInNanos: Long) { + super.endContainsValueForKey(startTimeInNanos) + updateTimer(startTimeInNanos, containValueForKeyTimer) + } + + private fun updateTimer(startTimeInNanos: Long, timer: Timer) { + timer.record((System.nanoTime() - startTimeInNanos), TimeUnit.NANOSECONDS) + } +} \ No newline at end of file diff --git a/geode-protobuf/build.gradle b/geode-protobuf/build.gradle index 13368a9..8c55681 100644 --- a/geode-protobuf/build.gradle +++ b/geode-protobuf/build.gradle @@ -31,6 +31,7 @@ dependencies { compile group: 'io.micrometer', name: 'micrometer-registry-atlas', version: '1.0.0-rc.5' compile group: 'io.micrometer', name: 'micrometer-registry-influx', version: '1.0.0-rc.5' compile group: 'io.micrometer', name: 'micrometer-registry-graphite', version: '1.0.0-rc.5' + compile group: 'io.micrometer', name: 'micrometer-registry-jmx', version: '1.0.0-rc.5' compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" } buildscript { diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/MicrometerClientStatsImpl.kt b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/MicrometerClientStatsImpl.kt index b726ed3..bcf2ff1 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/MicrometerClientStatsImpl.kt +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/MicrometerClientStatsImpl.kt @@ -1,10 +1,12 @@ package org.apache.geode.internal.protocol.protobuf.statistics import io.micrometer.core.instrument.Clock +import io.micrometer.core.instrument.DistributionSummary import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.composite.CompositeMeterRegistry import io.micrometer.influx.InfluxConfig import io.micrometer.influx.InfluxMeterRegistry +import io.micrometer.jmx.JmxMeterRegistry import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics import java.time.Duration import java.util.concurrent.atomic.AtomicInteger @@ -14,7 +16,7 @@ class MicrometerClientStatsImpl : ProtocolClientStatistics { private val clientsConnected = AtomicInteger(0) private val influxMetrics: MeterRegistry = InfluxMeterRegistry(object : InfluxConfig { - override fun step(): Duration = Duration.ofSeconds(10) + override fun step(): Duration = Duration.ofSeconds(1) override fun db(): String = "mydb" override fun get(k: String): String? = null override fun uri(): String = "http://localhost:8086" @@ -27,16 +29,19 @@ class MicrometerClientStatsImpl : ProtocolClientStatistics { // override fun step(): Duration = Duration.ofSeconds(10) // }, Clock.SYSTEM) + private val jmxMetrics: MeterRegistry = JmxMeterRegistry() + private val metrics = CompositeMeterRegistry(Clock.SYSTEM) init { metrics.add(influxMetrics) // metrics.add(atlasMetrics) + metrics.add(jmxMetrics) } val clientConnectedCounter = metrics.gauge("clientConnected", clientsConnected) - val messageReceivedCounter = metrics.counter("messageReceived") - val messageSentCounter = metrics.counter("messageSent") + val messageReceivedCounter = metrics.summary("messageReceived") + val messageSentCounter = metrics.summary("messageSent") val authorizationViolationsCounter = metrics.counter("authorizationViolations") val authenticationFailureCounter = metrics.counter("authenticationFailures") @@ -49,11 +54,11 @@ class MicrometerClientStatsImpl : ProtocolClientStatistics { } override fun messageReceived(bytes: Int) { - messageReceivedCounter.increment(bytes.toDouble()) + messageReceivedCounter.record(bytes.toDouble()) } override fun messageSent(bytes: Int) { - messageSentCounter.increment(bytes.toDouble()) + messageSentCounter.record(bytes.toDouble()) } override fun incAuthorizationViolations() { diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java index cac6ba3..14613ab 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java @@ -155,7 +155,7 @@ public class CacheOperationsJUnitTest { private static String randomLengthString() { Random random = new Random(); StringBuffer stringBuffer = new StringBuffer(); - int length = (int) (random.nextInt(1024000)*(1.75*random.nextInt(10))); + int length = (int) (random.nextInt(1024000) * (1.75 * random.nextInt(10))); for (int i = 0; i < (length); i++) { stringBuffer.append("a"); } @@ -173,8 +173,10 @@ public class CacheOperationsJUnitTest { randomLengthString())); putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY2, randomLengthString())); - putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY3, - randomLengthString())); + if (new Random().nextInt() % 2 == 0) { + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY3, + randomLengthString())); + } ClientProtocol.Message putAllMessage = ProtobufUtilities.createProtobufMessage( ProtobufRequestUtilities.createPutAllRequest(TEST_REGION, putEntries)); protobufProtocolSerializer.serialize(putAllMessage, outputStream); @@ -182,8 +184,10 @@ public class CacheOperationsJUnitTest { Set<BasicTypes.EncodedValue> getEntries = new HashSet<>(); getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY1)); -// getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY2)); -// getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY3)); + if(new Random().nextInt() % 5 == 0) { + getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY2)); + getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY3)); + } RegionAPI.GetAllRequest getAllRequest = ProtobufRequestUtilities.createGetAllRequest(TEST_REGION, getEntries); -- To stop receiving notification emails like this one, please contact [email protected].
