This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch micrometer in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/micrometer by this push: new a6db167 Updated code to make it work with multi-region and timed metrics a6db167 is described below commit a6db167b7f7753d19cb3214fcb9a81c9e9451d21 Author: Udo <ukohlme...@pivotal.io> AuthorDate: Mon Mar 12 13:59:25 2018 -0700 Updated code to make it work with multi-region and timed metrics --- .../cache/MicrometerPartitionRegionStats.kt | 46 +- .../internal/cache/PRHARedundancyProvider.java | 4 +- .../geode/internal/cache/PartitionedRegion.java | 5 +- .../internal/cache/PartitionedRegionStats.java | 103 +++- .../internal/cache/PartitionedRegionStatsImpl.java | 315 +++++++--- .../cache/TimedMicrometerPartitionedRegionStats.kt | 3 +- .../cache/partitioned/CreateBucketMessage.java | 2 +- .../internal/beans/MemberMBeanBridge.java | 6 +- .../management/OffHeapManagementDUnitTest.java | 4 +- .../bean/stats/MemberLevelStatsJUnitTest.java | 5 +- .../bean/stats/RegionStatsJUnitTest.java | 2 +- .../cli/commands/ShowMetricsDUnitTest.java | 4 +- .../cli/commands/ShowMetricsJUnitTest.java | 6 +- .../v1/acceptance/CacheOperationsJUnitTest.java | 657 +++++++++++---------- 14 files changed, 705 insertions(+), 457 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/MicrometerPartitionRegionStats.kt b/geode-core/src/main/java/org/apache/geode/internal/cache/MicrometerPartitionRegionStats.kt index 0ddf647..28ff1cb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/MicrometerPartitionRegionStats.kt +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/MicrometerPartitionRegionStats.kt @@ -1,19 +1,15 @@ package org.apache.geode.internal.cache -import com.netflix.spectator.impl.AtomicDouble import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Tag import org.apache.geode.Statistics -import java.lang.Number -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.DoubleAdder +import java.util.concurrent.atomic.LongAdder open class MicrometerPartitionRegionStats(val meterRegistry: MeterRegistry, val regionName: String) : PartitionedRegionStats { - override fun getStats(): Statistics? { - //we do nothing here... because we don't need to - return null; - } + override fun getStats(): Statistics? = null @Suppress("PropertyName") protected val PARTITIONED_REGION = "PartitionedRegion" @@ -26,21 +22,21 @@ open class MicrometerPartitionRegionStats(val meterRegistry: MeterRegistry, val private fun <T> constructGaugeForMetric(metricName: String, atomic: T, function: (T) -> Double): Gauge = Gauge.builder(metricName, atomic, function).tags(tags).register(meterRegistry) - private fun incrementAtomic(atomic: AtomicInteger, value: Int) { - atomic.addAndGet(value) + private fun incrementAtomic(atomic: LongAdder, value: Int) { + atomic.add(value.toLong()) } - private fun incrementAtomic(atomic: AtomicDouble, value: Double) { - atomic.addAndGet(value) + private fun incrementAtomic(atomic: DoubleAdder, value: Double) { + atomic.add(value) } //Atomic values to track - private val bucketCountAtomic = AtomicInteger(0) - private val lowBucketCountAtomic = AtomicInteger(0) - private val numberCopiesBucketCountAtomic = AtomicInteger(0) - private val totalNumberOfBucketsAtomic = AtomicInteger(0) - private val primaryBucketCountAtomic = AtomicInteger(0) - private val numberVolunteeringThreadsAtomic = AtomicInteger(0) + private val bucketCountAtomic = LongAdder() + private val lowBucketCountAtomic = LongAdder() + private val numberCopiesBucketCountAtomic = LongAdder() + private val totalNumberOfBucketsAtomic = LongAdder() + private val primaryBucketCountAtomic = LongAdder() + private val numberVolunteeringThreadsAtomic = LongAdder() //Micrometer Meters private val putCounter = constructCounterForMetric("put") @@ -72,12 +68,12 @@ open class MicrometerPartitionRegionStats(val meterRegistry: MeterRegistry, val private val removeAllMsgsRetriedCounter = constructCounterForMetric("removeAllMsgsRetried") private val partitionMessagesSentCounter = constructCounterForMetric("partitionMessagesSent") private val prMetaDataSentCounter = constructCounterForMetric("prMetaDataSentCounter") - private val bucketCountGauge = constructGaugeForMetric("bucketCount", bucketCountAtomic, { it.get().toDouble() }) - private val lowBucketCountGauge = constructGaugeForMetric("lowBucketCount", lowBucketCountAtomic, { it.get().toDouble() }) - private val numberCopiesBucketCountGauge = constructGaugeForMetric("numberCopiesBucketCount", numberCopiesBucketCountAtomic, { it.get().toDouble() }) - private val totalNumberOfBucketsGauge = constructGaugeForMetric("totalNumberOfBuckets", totalNumberOfBucketsAtomic, { it.get().toDouble() }) - private val primaryBucketCountGauge = constructGaugeForMetric("primaryBucketCount", primaryBucketCountAtomic, { it.get().toDouble() }) - private val numberVolunteeringThreadsGauge = constructGaugeForMetric("numberVolunteeringThreads", numberVolunteeringThreadsAtomic, { it.get().toDouble() }) + private val bucketCountGauge = constructGaugeForMetric("bucketCount", bucketCountAtomic, { it.toDouble() }) + private val lowBucketCountGauge = constructGaugeForMetric("lowBucketCount", lowBucketCountAtomic, { it.toDouble() }) + private val numberCopiesBucketCountGauge = constructGaugeForMetric("numberCopiesBucketCount", numberCopiesBucketCountAtomic, { it.toDouble() }) + private val totalNumberOfBucketsGauge = constructGaugeForMetric("totalNumberOfBuckets", totalNumberOfBucketsAtomic, { it.toDouble() }) + private val primaryBucketCountGauge = constructGaugeForMetric("primaryBucketCount", primaryBucketCountAtomic, { it.toDouble() }) + private val numberVolunteeringThreadsGauge = constructGaugeForMetric("numberVolunteeringThreads", numberVolunteeringThreadsAtomic, { it.toDouble() }) override fun close() { //Noop @@ -111,8 +107,8 @@ open class MicrometerPartitionRegionStats(val meterRegistry: MeterRegistry, val override fun incRemoveAllRetries() = removeAllRetriesCounter.increment() override fun incRemoveAllMsgsRetried() = removeAllMsgsRetriedCounter.increment() override fun incPartitionMessagesSent() = partitionMessagesSentCounter.increment() - override fun incBucketCount(bucketCount: Int) = incrementAtomic(bucketCountAtomic,bucketCount) - override fun incLowRedundancyBucketCount(lowBucketCount: Int) = incrementAtomic(lowBucketCountAtomic,lowBucketCount) + override fun incBucketCount(bucketCount: Int) = incrementAtomic(bucketCountAtomic, bucketCount) + override fun incLowRedundancyBucketCount(lowBucketCount: Int) = incrementAtomic(lowBucketCountAtomic, lowBucketCount) override fun incNoCopiesBucketCount(numberCopiesBucketCount: Int) = incrementAtomic(numberCopiesBucketCountAtomic, numberCopiesBucketCount) override fun incTotalNumBuckets(totalNumberOfBuckets: Int) = incrementAtomic(totalNumberOfBucketsAtomic, totalNumberOfBuckets) override fun incPrimaryBucketCount(primaryBucketCount: Int) = incrementAtomic(primaryBucketCountAtomic, primaryBucketCount) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java index e68f147..3532fed 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java @@ -495,8 +495,8 @@ public class PRHARedundancyProvider { * region referred to in the query. */ public InternalDistributedMember createBucketAtomically(final int bucketId, - final int newBucketSize, final boolean finishIncompleteCreation, - String partitionName) throws PartitionedRegionStorageException, PartitionedRegionException, + final int newBucketSize, final boolean finishIncompleteCreation, String partitionName) + throws PartitionedRegionStorageException, PartitionedRegionException, PartitionOfflineException { final boolean isDebugEnabled = logger.isDebugEnabled(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 9317bf2..37b9be3 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -733,7 +733,7 @@ public class PartitionedRegion extends LocalRegion this.node = initializeNode(); this.prStats = new TimedMicrometerPartitionedRegionStats(getFullPath()); -// this.prStats = new PartitionedRegionStatsImpl(cache.getDistributedSystem(), getFullPath()); + // this.prStats = new PartitionedRegionStatsImpl(cache.getDistributedSystem(), getFullPath()); this.regionIdentifier = getFullPath().replace('/', '#'); if (logger.isDebugEnabled()) { @@ -3265,8 +3265,7 @@ public class PartitionedRegion extends LocalRegion // Potentially no storage assigned, start bucket creation, be careful of race // conditions if (isDataStore()) { - ret = this.redundancyProvider.createBucketAtomically(bucketId, size, false, - partitionName); + ret = this.redundancyProvider.createBucketAtomically(bucketId, size, false, partitionName); } else { ret = this.redundancyProvider.createBucketOnDataStore(bucketId, size, snoozer); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java index 03717ed..cd97a3f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java @@ -36,109 +36,210 @@ public interface PartitionedRegionStats { default long startTime() { return CachePerfStats.getStatTime(); } + default long getStatTime() { return CachePerfStats.getStatTime(); } + void close(); + void endPut(long start); + void endPutAll(long start); + void endRemoveAll(long start); + void endCreate(long start); + void endGet(long start); + void endContainsKey(long start); + void endContainsValueForKey(long start); + void endDestroy(long start); + void endInvalidate(long start); + void incContainsKeyValueRetries(); + void incContainsKeyValueOpsRetried(); + void incInvalidateRetries(); + void incInvalidateOpsRetried(); + void incDestroyRetries(); + void incDestroyOpsRetried(); + void incPutRetries(); + void incPutOpsRetried(); + void incGetOpsRetried(); + void incGetRetries(); + void incCreateOpsRetried(); + void incCreateRetries(); + void incPreferredReadLocal(); + void incPreferredReadRemote(); + long startPartitionMessageProcessing(); + void endPartitionMessagesProcessing(long start); + void incPartitionMessagesSent(); + void incBucketCount(int delta); + void setBucketCount(int i); + void incDataStoreEntryCount(int amt); + int getDataStoreEntryCount(); + void incBytesInUse(long delta); + long getDataStoreBytesInUse(); + int getTotalBucketCount(); + void incPutAllRetries(); + void incPutAllMsgsRetried(); + void incRemoveAllRetries(); + void incRemoveAllMsgsRetried(); + int getVolunteeringInProgress(); + int getVolunteeringBecamePrimary(); + long getVolunteeringBecamePrimaryTime(); + int getVolunteeringOtherPrimary(); + long getVolunteeringOtherPrimaryTime(); + int getVolunteeringClosed(); + long getVolunteeringClosedTime(); + long startVolunteering(); + void endVolunteeringBecamePrimary(long start); + void endVolunteeringOtherPrimary(long start); + void endVolunteeringClosed(long start); + int getTotalNumBuckets(); + void incTotalNumBuckets(int val); + int getPrimaryBucketCount(); + void incPrimaryBucketCount(int val); + int getVolunteeringThreads(); + void incVolunteeringThreads(int val); + int getLowRedundancyBucketCount(); + int getNoCopiesBucketCount(); + void incLowRedundancyBucketCount(int val); + void incNoCopiesBucketCount(int val); + int getConfiguredRedundantCopies(); + void setConfiguredRedundantCopies(int val); + void setLocalMaxMemory(long l); + int getActualRedundantCopies(); + void setActualRedundantCopies(int val); + void putStartTime(Object key, long startTime); + long removeStartTime(Object key); + void endGetEntry(long startTime); + void endGetEntry(long start, int numInc); + long startRecovery(); + void endRecovery(long start); + long startBucketCreate(boolean isRebalance); + void endBucketCreate(long start, boolean success, boolean isRebalance); + long startPrimaryTransfer(boolean isRebalance); + void endPrimaryTransfer(long start, boolean success, boolean isRebalance); + int getBucketCreatesInProgress(); + int getBucketCreatesCompleted(); + int getBucketCreatesFailed(); + long getBucketCreateTime(); + int getPrimaryTransfersInProgress(); + int getPrimaryTransfersCompleted(); + int getPrimaryTransfersFailed(); + long getPrimaryTransferTime(); + int getRebalanceBucketCreatesInProgress(); + int getRebalanceBucketCreatesCompleted(); + int getRebalanceBucketCreatesFailed(); + long getRebalanceBucketCreateTime(); + int getRebalancePrimaryTransfersInProgress(); + int getRebalancePrimaryTransfersCompleted(); + int getRebalancePrimaryTransfersFailed(); + long getRebalancePrimaryTransferTime(); + long startApplyReplication(); + void endApplyReplication(long start); + long startSendReplication(); + void endSendReplication(long start); + long startPutRemote(); + void endPutRemote(long start); + long startPutLocal(); + void endPutLocal(long start); + void incPRMetaDataSentCount(); + long getPRMetaDataSentCount(); - Statistics getStats(); + Statistics getStats(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStatsImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStatsImpl.java index aa43145..91993dc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStatsImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStatsImpl.java @@ -181,97 +181,236 @@ public class PartitionedRegionStatsImpl implements PartitionedRegionStats { new StatisticDescriptor[] { f.createIntGauge("bucketCount", "Number of buckets in this node.", "buckets"), f.createIntCounter("putsCompleted", "Number of puts completed.", "operations", true), - f.createIntCounter("putOpsRetried", "Number of put operations which had to be retried due to failures.", "operations", false), - f.createIntCounter("putRetries", "Total number of times put operations had to be retried.", "retry attempts", false), - f.createIntCounter("createsCompleted", "Number of creates completed.", "operations", true), - f.createIntCounter("createOpsRetried", "Number of create operations which had to be retried due to failures.", "operations", false), - f.createIntCounter("createRetries", "Total number of times put operations had to be retried.", "retry attempts", false), - f.createIntCounter("preferredReadLocal", "Number of reads satisfied from local store", "operations", true), - f.createIntCounter(PUTALLS_COMPLETED, "Number of putAlls completed.", "operations", true), - f.createIntCounter(PUTALL_MSGS_RETRIED, "Number of putAll messages which had to be retried due to failures.", "operations", false), - f.createIntCounter(PUTALL_RETRIES, "Total number of times putAll messages had to be retried.", "retry attempts", false), - f.createLongCounter(PUTALL_TIME, "Total time spent doing putAlls.", "nanoseconds", false), - f.createIntCounter(REMOVE_ALLS_COMPLETED, "Number of removeAlls completed.", "operations", true), - f.createIntCounter(REMOVE_ALL_MSGS_RETRIED, "Number of removeAll messages which had to be retried due to failures.", "operations", false), - f.createIntCounter(REMOVE_ALL_RETRIES, "Total number of times removeAll messages had to be retried.", "retry attempts", false), - f.createLongCounter(REMOVE_ALL_TIME, "Total time spent doing removeAlls.", "nanoseconds", false), - f.createIntCounter("preferredReadRemote", "Number of reads satisfied from remote store", "operations", false), + f.createIntCounter("putOpsRetried", + "Number of put operations which had to be retried due to failures.", "operations", + false), + f.createIntCounter("putRetries", + "Total number of times put operations had to be retried.", "retry attempts", false), + f.createIntCounter("createsCompleted", "Number of creates completed.", "operations", + true), + f.createIntCounter("createOpsRetried", + "Number of create operations which had to be retried due to failures.", + "operations", false), + f.createIntCounter("createRetries", + "Total number of times put operations had to be retried.", "retry attempts", false), + f.createIntCounter("preferredReadLocal", "Number of reads satisfied from local store", + "operations", true), + f.createIntCounter(PUTALLS_COMPLETED, "Number of putAlls completed.", "operations", + true), + f.createIntCounter(PUTALL_MSGS_RETRIED, + "Number of putAll messages which had to be retried due to failures.", "operations", + false), + f.createIntCounter(PUTALL_RETRIES, + "Total number of times putAll messages had to be retried.", "retry attempts", + false), + f.createLongCounter(PUTALL_TIME, "Total time spent doing putAlls.", "nanoseconds", + false), + f.createIntCounter(REMOVE_ALLS_COMPLETED, "Number of removeAlls completed.", + "operations", true), + f.createIntCounter(REMOVE_ALL_MSGS_RETRIED, + "Number of removeAll messages which had to be retried due to failures.", + "operations", false), + f.createIntCounter(REMOVE_ALL_RETRIES, + "Total number of times removeAll messages had to be retried.", "retry attempts", + false), + f.createLongCounter(REMOVE_ALL_TIME, "Total time spent doing removeAlls.", + "nanoseconds", false), + f.createIntCounter("preferredReadRemote", "Number of reads satisfied from remote store", + "operations", false), f.createIntCounter("getsCompleted", "Number of gets completed.", "operations", true), - f.createIntCounter("getOpsRetried", "Number of get operations which had to be retried due to failures.", "operations", false), - f.createIntCounter("getRetries", "Total number of times get operations had to be retried.", "retry attempts", false), - f.createIntCounter("destroysCompleted", "Number of destroys completed.", "operations", true), - f.createIntCounter("destroyOpsRetried", "Number of destroy operations which had to be retried due to failures.", "operations", false), - f.createIntCounter("destroyRetries", "Total number of times destroy operations had to be retried.", "retry attempts", false), - f.createIntCounter("invalidatesCompleted", "Number of invalidates completed.", "operations", true), - f.createIntCounter("invalidateOpsRetried", "Number of invalidate operations which had to be retried due to failures.", "operations", false), - f.createIntCounter("invalidateRetries", "Total number of times invalidate operations had to be retried.", "retry attempts", false), - f.createIntCounter("containsKeyCompleted", "Number of containsKeys completed.", "operations", true), - f.createIntCounter("containsKeyOpsRetried", "Number of containsKey or containsValueForKey operations which had to be retried due to failures.", "operations", false), - f.createIntCounter("containsKeyRetries", "Total number of times containsKey or containsValueForKey operations had to be retried.", "operations", false), - f.createIntCounter("containsValueForKeyCompleted", "Number of containsValueForKeys completed.", "operations", true), - f.createIntCounter("PartitionMessagesSent", "Number of PartitionMessages Sent.", "operations", true), - f.createIntCounter("PartitionMessagesReceived", "Number of PartitionMessages Received.", "operations", true), - f.createIntCounter("PartitionMessagesProcessed", "Number of PartitionMessages Processed.", "operations", true), + f.createIntCounter("getOpsRetried", + "Number of get operations which had to be retried due to failures.", "operations", + false), + f.createIntCounter("getRetries", + "Total number of times get operations had to be retried.", "retry attempts", false), + f.createIntCounter("destroysCompleted", "Number of destroys completed.", "operations", + true), + f.createIntCounter("destroyOpsRetried", + "Number of destroy operations which had to be retried due to failures.", + "operations", false), + f.createIntCounter("destroyRetries", + "Total number of times destroy operations had to be retried.", "retry attempts", + false), + f.createIntCounter("invalidatesCompleted", "Number of invalidates completed.", + "operations", true), + f.createIntCounter("invalidateOpsRetried", + "Number of invalidate operations which had to be retried due to failures.", + "operations", false), + f.createIntCounter("invalidateRetries", + "Total number of times invalidate operations had to be retried.", "retry attempts", + false), + f.createIntCounter("containsKeyCompleted", "Number of containsKeys completed.", + "operations", true), + f.createIntCounter("containsKeyOpsRetried", + "Number of containsKey or containsValueForKey operations which had to be retried due to failures.", + "operations", false), + f.createIntCounter("containsKeyRetries", + "Total number of times containsKey or containsValueForKey operations had to be retried.", + "operations", false), + f.createIntCounter("containsValueForKeyCompleted", + "Number of containsValueForKeys completed.", "operations", true), + f.createIntCounter("PartitionMessagesSent", "Number of PartitionMessages Sent.", + "operations", true), + f.createIntCounter("PartitionMessagesReceived", "Number of PartitionMessages Received.", + "operations", true), + f.createIntCounter("PartitionMessagesProcessed", + "Number of PartitionMessages Processed.", "operations", true), f.createLongCounter("putTime", "Total time spent doing puts.", "nanoseconds", false), - f.createLongCounter("createTime", "Total time spent doing create operations.", "nanoseconds", false), - f.createLongCounter("getTime", "Total time spent performing get operations.", "nanoseconds", false), - f.createLongCounter("destroyTime", "Total time spent doing destroys.", "nanoseconds", false), - f.createLongCounter("invalidateTime", "Total time spent doing invalidates.", "nanoseconds", false), - f.createLongCounter("containsKeyTime", "Total time spent performing containsKey operations.", "nanoseconds", false), - f.createLongCounter("containsValueForKeyTime", "Total time spent performing containsValueForKey operations.", "nanoseconds", false), - f.createLongCounter("partitionMessagesProcessingTime", "Total time spent on PartitionMessages processing.", "nanoseconds", false), - f.createIntGauge("dataStoreEntryCount", "The number of entries stored in this Cache for the named Partitioned Region. This does not include entries which are tombstones. See CachePerfStats.tombstoneCount.", "entries"), - f.createLongGauge("dataStoreBytesInUse", "The current number of bytes stored in this Cache for the named Partitioned Region", "bytes"), - f.createIntGauge("volunteeringInProgress","Current number of attempts to volunteer for primary of a bucket.", "operations"), - f.createIntCounter("volunteeringBecamePrimary", "Total number of attempts to volunteer that ended when this member became primary.", "operations"), - f.createLongCounter("volunteeringBecamePrimaryTime", "Total time spent volunteering that ended when this member became primary.", "nanoseconds", false), - f.createIntCounter("volunteeringOtherPrimary", "Total number of attempts to volunteer that ended when this member discovered other primary.", "operations"), - f.createLongCounter("volunteeringOtherPrimaryTime", "Total time spent volunteering that ended when this member discovered other primary.", "nanoseconds", false), - f.createIntCounter("volunteeringClosed", "Total number of attempts to volunteer that ended when this member's bucket closed.", "operations"), - f.createLongCounter("volunteeringClosedTime", "Total time spent volunteering that ended when this member's bucket closed.", "nanoseconds", false), + f.createLongCounter("createTime", "Total time spent doing create operations.", + "nanoseconds", false), + f.createLongCounter("getTime", "Total time spent performing get operations.", + "nanoseconds", false), + f.createLongCounter("destroyTime", "Total time spent doing destroys.", "nanoseconds", + false), + f.createLongCounter("invalidateTime", "Total time spent doing invalidates.", + "nanoseconds", false), + f.createLongCounter("containsKeyTime", + "Total time spent performing containsKey operations.", "nanoseconds", false), + f.createLongCounter("containsValueForKeyTime", + "Total time spent performing containsValueForKey operations.", "nanoseconds", + false), + f.createLongCounter("partitionMessagesProcessingTime", + "Total time spent on PartitionMessages processing.", "nanoseconds", false), + f.createIntGauge("dataStoreEntryCount", + "The number of entries stored in this Cache for the named Partitioned Region. This does not include entries which are tombstones. See CachePerfStats.tombstoneCount.", + "entries"), + f.createLongGauge("dataStoreBytesInUse", + "The current number of bytes stored in this Cache for the named Partitioned Region", + "bytes"), + f.createIntGauge("volunteeringInProgress", + "Current number of attempts to volunteer for primary of a bucket.", "operations"), + f.createIntCounter("volunteeringBecamePrimary", + "Total number of attempts to volunteer that ended when this member became primary.", + "operations"), + f.createLongCounter("volunteeringBecamePrimaryTime", + "Total time spent volunteering that ended when this member became primary.", + "nanoseconds", false), + f.createIntCounter("volunteeringOtherPrimary", + "Total number of attempts to volunteer that ended when this member discovered other primary.", + "operations"), + f.createLongCounter("volunteeringOtherPrimaryTime", + "Total time spent volunteering that ended when this member discovered other primary.", + "nanoseconds", false), + f.createIntCounter("volunteeringClosed", + "Total number of attempts to volunteer that ended when this member's bucket closed.", + "operations"), + f.createLongCounter("volunteeringClosedTime", + "Total time spent volunteering that ended when this member's bucket closed.", + "nanoseconds", false), f.createIntGauge("totalNumBuckets", "The total number of buckets.", "buckets"), - f.createIntGauge("primaryBucketCount", "Current number of primary buckets hosted locally.", "buckets"), - f.createIntGauge("volunteeringThreads", "Current number of threads volunteering for primary.", "threads"), - f.createIntGauge("lowRedundancyBucketCount", "Current number of buckets without full redundancy.", "buckets"), - f.createIntGauge("noCopiesBucketCount", "Current number of buckets without any copies remaining.", "buckets"), - f.createIntGauge("configuredRedundantCopies", "Configured number of redundant copies for this partitioned region.", "copies"), - f.createIntGauge("actualRedundantCopies", "Actual number of redundant copies for this partitioned region.", "copies"), - f.createIntCounter("getEntryCompleted", "Number of getEntry operations completed.", "operations", true), - f.createLongCounter("getEntryTime", "Total time spent performing getEntry operations.", "nanoseconds", false), - f.createIntGauge("recoveriesInProgress", "Current number of redundancy recovery operations in progress for this region.", "operations"), - f.createIntCounter("recoveriesCompleted", "Total number of redundancy recovery operations performed on this region.", "operations"), - f.createLongCounter("recoveryTime", "Total number time spent recovering redundancy.", "operations"), - f.createIntGauge("bucketCreatesInProgress", "Current number of bucket create operations being performed for rebalancing.", "operations"), - f.createIntCounter("bucketCreatesCompleted", "Total number of bucket create operations performed for rebalancing.", "operations"), - f.createIntCounter("bucketCreatesFailed", "Total number of bucket create operations performed for rebalancing that failed.", "operations"), - f.createLongCounter("bucketCreateTime", "Total time spent performing bucket create operations for rebalancing.", "nanoseconds", false), - f.createIntGauge("primaryTransfersInProgress", "Current number of primary transfer operations being performed for rebalancing.", "operations"), - f.createIntCounter("primaryTransfersCompleted", "Total number of primary transfer operations performed for rebalancing.", "operations"), - f.createIntCounter("primaryTransfersFailed", "Total number of primary transfer operations performed for rebalancing that failed.", "operations"), - f.createLongCounter("primaryTransferTime", "Total time spent performing primary transfer operations for rebalancing.", "nanoseconds", false), - f.createIntCounter("applyReplicationCompleted", "Total number of replicated values sent from a primary to this redundant data store.", "operations", true), - f.createIntGauge("applyReplicationInProgress", "Current number of replication operations in progress on this redundant data store.", "operations", false), - f.createLongCounter("applyReplicationTime", "Total time spent storing replicated values on this redundant data store.", "nanoseconds", false), - f.createIntCounter("sendReplicationCompleted", "Total number of replicated values sent from this primary to a redundant data store.", "operations", true), - f.createIntGauge("sendReplicationInProgress", "Current number of replication operations in progress from this primary.", "operations", false), - f.createLongCounter("sendReplicationTime", "Total time spent replicating values from this primary to a redundant data store.", "nanoseconds", false), - f.createIntCounter("putRemoteCompleted", "Total number of completed puts that did not originate in the primary. These puts require an extra network hop to the primary.", "operations", true), - f.createIntGauge("putRemoteInProgress", "Current number of puts in progress that did not originate in the primary.", "operations", false), - f.createLongCounter("putRemoteTime", "Total time spent doing puts that did not originate in the primary.", "nanoseconds", false), - f.createIntCounter("putLocalCompleted", "Total number of completed puts that did originate in the primary. These puts are optimal.", "operations", true), - f.createIntGauge("putLocalInProgress", "Current number of puts in progress that did originate in the primary.", "operations", false), - f.createLongCounter("putLocalTime", "Total time spent doing puts that did originate in the primary.", "nanoseconds", false), - f.createIntGauge("rebalanceBucketCreatesInProgress", "Current number of bucket create operations being performed for rebalancing.", "operations"), - f.createIntCounter("rebalanceBucketCreatesCompleted", "Total number of bucket create operations performed for rebalancing.", "operations"), - f.createIntCounter("rebalanceBucketCreatesFailed", "Total number of bucket create operations performed for rebalancing that failed.", "operations"), - f.createLongCounter("rebalanceBucketCreateTime", "Total time spent performing bucket create operations for rebalancing.", "nanoseconds", false), - f.createIntGauge("rebalancePrimaryTransfersInProgress", "Current number of primary transfer operations being performed for rebalancing.", "operations"), - f.createIntCounter("rebalancePrimaryTransfersCompleted", "Total number of primary transfer operations performed for rebalancing.", "operations"), - f.createIntCounter("rebalancePrimaryTransfersFailed", "Total number of primary transfer operations performed for rebalancing that failed.", "operations"), - f.createLongCounter("rebalancePrimaryTransferTime", "Total time spent performing primary transfer operations for rebalancing.", "nanoseconds", false), - f.createLongCounter("prMetaDataSentCount", "total number of times meta data refreshed sent on client's request.", "operation", false), - f.createLongGauge("localMaxMemory", "local max memory in bytes for this region on this member", "bytes") - }); + f.createIntGauge("primaryBucketCount", + "Current number of primary buckets hosted locally.", "buckets"), + f.createIntGauge("volunteeringThreads", + "Current number of threads volunteering for primary.", "threads"), + f.createIntGauge("lowRedundancyBucketCount", + "Current number of buckets without full redundancy.", "buckets"), + f.createIntGauge("noCopiesBucketCount", + "Current number of buckets without any copies remaining.", "buckets"), + f.createIntGauge("configuredRedundantCopies", + "Configured number of redundant copies for this partitioned region.", "copies"), + f.createIntGauge("actualRedundantCopies", + "Actual number of redundant copies for this partitioned region.", "copies"), + f.createIntCounter("getEntryCompleted", "Number of getEntry operations completed.", + "operations", true), + f.createLongCounter("getEntryTime", "Total time spent performing getEntry operations.", + "nanoseconds", false), + f.createIntGauge("recoveriesInProgress", + "Current number of redundancy recovery operations in progress for this region.", + "operations"), + f.createIntCounter("recoveriesCompleted", + "Total number of redundancy recovery operations performed on this region.", + "operations"), + f.createLongCounter("recoveryTime", "Total number time spent recovering redundancy.", + "operations"), + f.createIntGauge("bucketCreatesInProgress", + "Current number of bucket create operations being performed for rebalancing.", + "operations"), + f.createIntCounter("bucketCreatesCompleted", + "Total number of bucket create operations performed for rebalancing.", + "operations"), + f.createIntCounter("bucketCreatesFailed", + "Total number of bucket create operations performed for rebalancing that failed.", + "operations"), + f.createLongCounter("bucketCreateTime", + "Total time spent performing bucket create operations for rebalancing.", + "nanoseconds", false), + f.createIntGauge("primaryTransfersInProgress", + "Current number of primary transfer operations being performed for rebalancing.", + "operations"), + f.createIntCounter("primaryTransfersCompleted", + "Total number of primary transfer operations performed for rebalancing.", + "operations"), + f.createIntCounter("primaryTransfersFailed", + "Total number of primary transfer operations performed for rebalancing that failed.", + "operations"), + f.createLongCounter("primaryTransferTime", + "Total time spent performing primary transfer operations for rebalancing.", + "nanoseconds", false), + f.createIntCounter("applyReplicationCompleted", + "Total number of replicated values sent from a primary to this redundant data store.", + "operations", true), + f.createIntGauge("applyReplicationInProgress", + "Current number of replication operations in progress on this redundant data store.", + "operations", false), + f.createLongCounter("applyReplicationTime", + "Total time spent storing replicated values on this redundant data store.", + "nanoseconds", false), + f.createIntCounter("sendReplicationCompleted", + "Total number of replicated values sent from this primary to a redundant data store.", + "operations", true), + f.createIntGauge("sendReplicationInProgress", + "Current number of replication operations in progress from this primary.", + "operations", false), + f.createLongCounter("sendReplicationTime", + "Total time spent replicating values from this primary to a redundant data store.", + "nanoseconds", false), + f.createIntCounter("putRemoteCompleted", + "Total number of completed puts that did not originate in the primary. These puts require an extra network hop to the primary.", + "operations", true), + f.createIntGauge("putRemoteInProgress", + "Current number of puts in progress that did not originate in the primary.", + "operations", false), + f.createLongCounter("putRemoteTime", + "Total time spent doing puts that did not originate in the primary.", "nanoseconds", + false), + f.createIntCounter("putLocalCompleted", + "Total number of completed puts that did originate in the primary. These puts are optimal.", + "operations", true), + f.createIntGauge("putLocalInProgress", + "Current number of puts in progress that did originate in the primary.", + "operations", false), + f.createLongCounter("putLocalTime", + "Total time spent doing puts that did originate in the primary.", "nanoseconds", + false), + f.createIntGauge("rebalanceBucketCreatesInProgress", + "Current number of bucket create operations being performed for rebalancing.", + "operations"), + f.createIntCounter("rebalanceBucketCreatesCompleted", + "Total number of bucket create operations performed for rebalancing.", + "operations"), + f.createIntCounter("rebalanceBucketCreatesFailed", + "Total number of bucket create operations performed for rebalancing that failed.", + "operations"), + f.createLongCounter("rebalanceBucketCreateTime", + "Total time spent performing bucket create operations for rebalancing.", + "nanoseconds", false), + f.createIntGauge("rebalancePrimaryTransfersInProgress", + "Current number of primary transfer operations being performed for rebalancing.", + "operations"), + f.createIntCounter("rebalancePrimaryTransfersCompleted", + "Total number of primary transfer operations performed for rebalancing.", + "operations"), + f.createIntCounter("rebalancePrimaryTransfersFailed", + "Total number of primary transfer operations performed for rebalancing that failed.", + "operations"), + f.createLongCounter("rebalancePrimaryTransferTime", + "Total time spent performing primary transfer operations for rebalancing.", + "nanoseconds", false), + f.createLongCounter("prMetaDataSentCount", + "total number of times meta data refreshed sent on client's request.", "operation", + false), + f.createLongGauge("localMaxMemory", + "local max memory in bytes for this region on this member", "bytes")}); bucketCountId = type.nameToId("bucketCount"); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TimedMicrometerPartitionedRegionStats.kt b/geode-core/src/main/java/org/apache/geode/internal/cache/TimedMicrometerPartitionedRegionStats.kt index b9f4a70..6cfe587 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TimedMicrometerPartitionedRegionStats.kt +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TimedMicrometerPartitionedRegionStats.kt @@ -67,6 +67,7 @@ class TimedMicrometerPartitionedRegionStats(meterRegistry: MeterRegistry, region } private fun updateTimer(startTimeInNanos: Long, timer: Timer) { - timer.record((System.nanoTime() - startTimeInNanos), TimeUnit.NANOSECONDS) + val diff = System.nanoTime() - startTimeInNanos + timer.record(diff, TimeUnit.NANOSECONDS) } } \ No newline at end of file diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java index db6264b..9248e39 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java @@ -143,7 +143,7 @@ public class CreateBucketMessage extends PartitionMessage { } r.checkReadiness(); InternalDistributedMember primary = r.getRedundancyProvider().createBucketAtomically(bucketId, - bucketSize, false, partitionName); + bucketSize, false, partitionName); r.getPrStats().endPartitionMessagesProcessing(startTime); CreateBucketReplyMessage.sendResponse(getSender(), getProcessorId(), dm, primary); return false; diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java index d2e4d13..f9b8446 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java @@ -756,9 +756,9 @@ public class MemberMBeanBridge { } /** - * All OS meterRegistry are not present in java.lang.management.OperatingSystemMXBean It has to be cast - * to com.sun.management.OperatingSystemMXBean. To avoid the cast using dynamic call so that Java - * platform will take care of the details in a native manner; + * All OS meterRegistry are not present in java.lang.management.OperatingSystemMXBean It has to be + * cast to com.sun.management.OperatingSystemMXBean. To avoid the cast using dynamic call so that + * Java platform will take care of the details in a native manner; * * @return Some basic OS meterRegistry at the particular instance */ diff --git a/geode-core/src/test/java/org/apache/geode/management/OffHeapManagementDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/OffHeapManagementDUnitTest.java index a4ef96d..823f29d 100644 --- a/geode-core/src/test/java/org/apache/geode/management/OffHeapManagementDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/OffHeapManagementDUnitTest.java @@ -199,8 +199,8 @@ public class OffHeapManagementDUnitTest extends CacheTestCase { assertOffHeapMetricsOnVm(vm, TOTAL_MEMORY, 0, 0, 0); /* - * Perform ops on the off-heap region and assert that the off-heap meterRegistry correctly reflect the - * ops + * Perform ops on the off-heap region and assert that the off-heap meterRegistry correctly + * reflect the ops */ doPutOnVm(vm, KEY, VALUE, OFF_HEAP_REGION_NAME, false); assertOffHeapMetricsOnVm(vm, (TOTAL_MEMORY - OBJECT_SIZE), OBJECT_SIZE, 1, 0); diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/MemberLevelStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/MemberLevelStatsJUnitTest.java index 98a75a1..545ed65 100644 --- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/MemberLevelStatsJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/MemberLevelStatsJUnitTest.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import org.apache.geode.internal.cache.*; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -31,6 +30,7 @@ import org.apache.geode.distributed.internal.DistributionStats; import org.apache.geode.distributed.internal.locks.DLockStats; import org.apache.geode.internal.NanoTimer; import org.apache.geode.internal.OSProcess; +import org.apache.geode.internal.cache.*; import org.apache.geode.internal.cache.execute.FunctionServiceStats; import org.apache.geode.internal.statistics.VMStatsContract; import org.apache.geode.internal.stats50.VMStats50; @@ -91,7 +91,8 @@ public class MemberLevelStatsJUnitTest extends MBeanStatsTestCase { } for (int i = 0; i < 4; i++) { - PartitionedRegionStats stats = new PartitionedRegionStatsImpl(system, name.getMethodName() + i); + PartitionedRegionStats stats = + new PartitionedRegionStatsImpl(system, name.getMethodName() + i); parRegionStatsList.add(stats); bridge.addPartionRegionStats(stats); } diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/RegionStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/RegionStatsJUnitTest.java index db7dce2..a29e334 100644 --- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/RegionStatsJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/RegionStatsJUnitTest.java @@ -17,13 +17,13 @@ package org.apache.geode.management.bean.stats; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import org.apache.geode.internal.cache.PartitionedRegionStatsImpl; import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.internal.cache.CachePerfStats; import org.apache.geode.internal.cache.DiskRegionStats; import org.apache.geode.internal.cache.PartitionedRegionStats; +import org.apache.geode.internal.cache.PartitionedRegionStatsImpl; import org.apache.geode.management.internal.beans.DiskRegionBridge; import org.apache.geode.management.internal.beans.PartitionedRegionBridge; import org.apache.geode.management.internal.beans.RegionMBeanBridge; diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ShowMetricsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ShowMetricsDUnitTest.java index 80c2bef..822a17b 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ShowMetricsDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ShowMetricsDUnitTest.java @@ -149,8 +149,8 @@ public class ShowMetricsDUnitTest { @Test public void testShowMetricsRegionFromMember() throws Exception { - gfsh.executeAndAssertThat("show meterRegistry --member=" + server.getName() + " --region=REGION1") - .statusIsSuccess(); + gfsh.executeAndAssertThat( + "show meterRegistry --member=" + server.getName() + " --region=REGION1").statusIsSuccess(); assertThat(gfsh.getGfshOutput()).contains("Metrics for region:/REGION1 On Member server-1"); } } diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ShowMetricsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ShowMetricsJUnitTest.java index b625b1d..fef4fe9 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ShowMetricsJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ShowMetricsJUnitTest.java @@ -45,7 +45,8 @@ public class ShowMetricsJUnitTest { @Test public void testPortOnly() throws Exception { ShowMetricsCommand command = spy(ShowMetricsCommand.class); - CommandResult result = parser.executeCommandWithInstance(command, "show meterRegistry --port=0"); + CommandResult result = + parser.executeCommandWithInstance(command, "show meterRegistry --port=0"); assertThat(result.getStatus()).isEqualTo(Result.Status.ERROR); assertThat(result.getContent().toString()).contains( "If the --port parameter is specified, then the --member parameter must also be specified."); @@ -54,7 +55,8 @@ public class ShowMetricsJUnitTest { @Test public void invalidPortNumber() throws Exception { ShowMetricsCommand command = spy(ShowMetricsCommand.class); - CommandResult result = parser.executeCommandWithInstance(command, "show meterRegistry --port=abc"); + CommandResult result = + parser.executeCommandWithInstance(command, "show meterRegistry --port=abc"); assertThat(result.getStatus()).isEqualTo(Result.Status.ERROR); // When relying on Spring's converters, any command that does not parse is "Invalid" assertThat(result.getContent().toString()).contains("Invalid command"); diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java index b35c12e..10f6138 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java @@ -48,6 +48,7 @@ import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.Scope; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.ConfigurationProperties; @@ -77,336 +78,344 @@ import org.apache.geode.util.test.TestUtil; */ @Category(IntegrationTest.class) public class CacheOperationsJUnitTest { - private final String TEST_KEY = "testKey"; - private final String TEST_REGION = "testRegion"; - - private final String DEFAULT_STORE = "default.keystore"; - private final String SSL_PROTOCOLS = "any"; - private final String SSL_CIPHERS = "any"; - - private final String TEST_MULTIOP_KEY1 = "multiopKey1"; - private final String TEST_MULTIOP_KEY2 = "multiopKey2"; - private final String TEST_MULTIOP_KEY3 = "multiopKey3"; - private final String TEST_MULTIOP_VALUE1 = "multiopValue1"; - private final String TEST_MULTIOP_VALUE2 = "multiopValue2"; - private final String TEST_MULTIOP_VALUE3 = "multiopValue3"; - - private Cache cache; - private int cacheServerPort; - private SerializationService serializationService; - private Socket socket; - private OutputStream outputStream; - - @Rule - public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); - - @Rule - public TestName testName = new TestName(); - - @Before - public void setup() throws Exception { - // Test names prefixed with useSSL_ will setup the cache and socket to use SSL transport - boolean useSSL = testName.getMethodName().startsWith("useSSL_"); - - Properties properties = new Properties(); - if (useSSL) { - updatePropertiesForSSLCache(properties); - } - - CacheFactory cacheFactory = new CacheFactory(properties); - cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0"); - cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false"); - cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false"); - cache = cacheFactory.create(); - - CacheServer cacheServer = cache.addCacheServer(); - cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); - cacheServer.setPort(cacheServerPort); - cacheServer.start(); - - RegionFactory<Object, Object> regionFactory = cache.createRegionFactory(); - regionFactory.create(TEST_REGION); - - System.setProperty("geode.feature-protobuf-protocol", "true"); - - if (useSSL) { - socket = getSSLSocket(); - } else { - socket = new Socket("localhost", cacheServerPort); - } - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); - outputStream = socket.getOutputStream(); - - MessageUtil.performAndVerifyHandshake(socket); - - serializationService = new ProtobufSerializationService(); + private final String TEST_KEY = "testKey"; + private final String TEST_REGION = "testRegion"; + private final String TEST_REGION2 = "testRegion2"; + + private final String DEFAULT_STORE = "default.keystore"; + private final String SSL_PROTOCOLS = "any"; + private final String SSL_CIPHERS = "any"; + + private final String TEST_MULTIOP_KEY1 = "multiopKey1"; + private final String TEST_MULTIOP_KEY2 = "multiopKey2"; + private final String TEST_MULTIOP_KEY3 = "multiopKey3"; + private final String TEST_MULTIOP_VALUE1 = "multiopValue1"; + private final String TEST_MULTIOP_VALUE2 = "multiopValue2"; + private final String TEST_MULTIOP_VALUE3 = "multiopValue3"; + + private Cache cache; + private int cacheServerPort; + private SerializationService serializationService; + private Socket socket; + private OutputStream outputStream; + + @Rule + public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + + @Rule + public TestName testName = new TestName(); + + @Before + public void setup() throws Exception { + // Test names prefixed with useSSL_ will setup the cache and socket to use SSL transport + boolean useSSL = testName.getMethodName().startsWith("useSSL_"); + + Properties properties = new Properties(); + if (useSSL) { + updatePropertiesForSSLCache(properties); } - @After - public void cleanup() throws IOException { - cache.close(); - socket.close(); - SocketCreatorFactory.close(); + CacheFactory cacheFactory = new CacheFactory(properties); + cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0"); + cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false"); + cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false"); + cacheFactory.set(ConfigurationProperties.ENABLE_TIME_STATISTICS, "true"); + cache = cacheFactory.create(); + + CacheServer cacheServer = cache.addCacheServer(); + cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); + cacheServer.setPort(cacheServerPort); + cacheServer.start(); + + RegionFactory<Object, Object> regionFactory = cache.createRegionFactory(RegionShortcut.PARTITION); + regionFactory.create(TEST_REGION); + regionFactory = cache.createRegionFactory(RegionShortcut.PARTITION); + regionFactory.create(TEST_REGION2); + + System.setProperty("geode.feature-protobuf-protocol", "true"); + + if (useSSL) { + socket = getSSLSocket(); + } else { + socket = new Socket("localhost", cacheServerPort); } - - private static String randomLengthString() { - Random random = new Random(); - StringBuffer stringBuffer = new StringBuffer(); - int length = (int) (random.nextInt(1024000) * (1.75 * random.nextInt(10))); - for (int i = 0; i < (length); i++) { - stringBuffer.append("a"); - } - return stringBuffer.toString(); - } - - @Test - public void testNewProtocolWithMultikeyOperations() throws Exception { - System.setProperty("geode.feature-protobuf-protocol", "true"); - for (int i = 0; i < 10000000; i++) { - - ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - Set<BasicTypes.Entry> putEntries = new HashSet<>(); - putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY1, - randomLengthString())); - putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY2, - randomLengthString())); - if (new Random().nextInt() % 2 == 0) { - putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY3, - randomLengthString())); - } - ClientProtocol.Message putAllMessage = ProtobufUtilities.createProtobufMessage( - ProtobufRequestUtilities.createPutAllRequest(TEST_REGION, putEntries)); - protobufProtocolSerializer.serialize(putAllMessage, outputStream); - validatePutAllResponse(socket, protobufProtocolSerializer, new HashSet<>()); - - Set<BasicTypes.EncodedValue> getEntries = new HashSet<>(); - getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY1)); - if(new Random().nextInt() % 5 == 0) { - getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY2)); - getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY3)); - } - - RegionAPI.GetAllRequest getAllRequest = - ProtobufRequestUtilities.createGetAllRequest(TEST_REGION, getEntries); - - ClientProtocol.Message getAllMessage = ProtobufUtilities.createProtobufMessage( - ProtobufUtilities.createProtobufRequestWithGetAllRequest(getAllRequest)); - Thread.sleep(100); - protobufProtocolSerializer.serialize(getAllMessage, outputStream); - validateGetAllResponse(socket, protobufProtocolSerializer); - } + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); + outputStream = socket.getOutputStream(); + + MessageUtil.performAndVerifyHandshake(socket); + + serializationService = new ProtobufSerializationService(); + } + + @After + public void cleanup() throws IOException { + cache.close(); + socket.close(); + SocketCreatorFactory.close(); + } + + private static String randomLengthString() { + Random random = new Random(); + StringBuffer stringBuffer = new StringBuffer(); + int length = (int) (random.nextInt(1024000) * (1.75 * random.nextInt(10))); + for (int i = 0; i < (length); i++) { + stringBuffer.append("a"); } - - @Test - public void multiKeyOperationErrorsWithClasscastException() throws Exception { - RegionFactory<Float, Object> regionFactory = cache.createRegionFactory(); - regionFactory.setKeyConstraint(Float.class); - String regionName = "constraintRegion"; - regionFactory.create(regionName); - System.setProperty("geode.feature-protobuf-protocol", "true"); - - ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - Set<BasicTypes.Entry> putEntries = new HashSet<>(); - putEntries.add(ProtobufUtilities.createEntry(serializationService, 2.2f, TEST_MULTIOP_VALUE1)); - putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY2, - TEST_MULTIOP_VALUE2)); + return stringBuffer.toString(); + } + + @Test + public void testNewProtocolWithMultikeyOperations() throws Exception { + System.setProperty("geode.feature-protobuf-protocol", "true"); + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + for (int i = 0; i < 10000000; i++) { + + + Set<BasicTypes.Entry> putEntries = new HashSet<>(); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY1, + randomLengthString())); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY2, + randomLengthString())); + if (new Random().nextInt() % 2 == 0) { putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY3, - TEST_MULTIOP_VALUE3)); - ClientProtocol.Message putAllMessage = ProtobufUtilities.createProtobufMessage( - ProtobufRequestUtilities.createPutAllRequest(regionName, putEntries)); - - protobufProtocolSerializer.serialize(putAllMessage, outputStream); - HashSet<BasicTypes.EncodedValue> expectedFailedKeys = new HashSet<BasicTypes.EncodedValue>(); - expectedFailedKeys - .add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY2)); - expectedFailedKeys - .add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY3)); - validatePutAllResponse(socket, protobufProtocolSerializer, expectedFailedKeys); - - ClientProtocol.Message getMessage = - MessageUtil.makeGetRequestMessage(serializationService, 2.2f, regionName); - protobufProtocolSerializer.serialize(getMessage, outputStream); - validateGetResponse(socket, protobufProtocolSerializer, TEST_MULTIOP_VALUE1); - - ClientProtocol.Message removeMessage = - ProtobufUtilities.createProtobufMessage(ProtobufRequestUtilities.createRemoveRequest( - TEST_REGION, ProtobufUtilities.createEncodedValue(serializationService, TEST_KEY))); - protobufProtocolSerializer.serialize(removeMessage, outputStream); - validateRemoveResponse(socket, protobufProtocolSerializer); - } - - @Test - public void testResponseToGetWithNoData() throws Exception { - // Get request without any data set must return a null - ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - ClientProtocol.Message getMessage = - MessageUtil.makeGetRequestMessage(serializationService, TEST_KEY, TEST_REGION); - protobufProtocolSerializer.serialize(getMessage, outputStream); - - ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); - assertEquals(ClientProtocol.Response.ResponseAPICase.GETRESPONSE, - response.getResponseAPICase()); - RegionAPI.GetResponse getResponse = response.getGetResponse(); - - assertFalse(getResponse.hasResult()); - } - - @Test - public void testNewProtocolGetRegionNamesCallSucceeds() throws Exception { - ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - RegionAPI.GetRegionNamesRequest getRegionNamesRequest = - ProtobufRequestUtilities.createGetRegionNamesRequest(); - - ClientProtocol.Message getRegionsMessage = - ProtobufUtilities.createProtobufMessage(ClientProtocol.Request.newBuilder() - .setGetRegionNamesRequest(getRegionNamesRequest).build()); - protobufProtocolSerializer.serialize(getRegionsMessage, outputStream); - validateGetRegionNamesResponse(socket, protobufProtocolSerializer); - } - - @Test - public void testNewProtocolGetRegionCall() throws Exception { - System.setProperty("geode.feature-protobuf-protocol", "true"); - - ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - ClientProtocol.Message getRegionMessage = MessageUtil.makeGetRegionRequestMessage(TEST_REGION); - protobufProtocolSerializer.serialize(getRegionMessage, outputStream); - ClientProtocol.Message message = - protobufProtocolSerializer.deserialize(socket.getInputStream()); - assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase()); - ClientProtocol.Response response = message.getResponse(); - assertEquals(ClientProtocol.Response.ResponseAPICase.GETREGIONRESPONSE, - response.getResponseAPICase()); - RegionAPI.GetRegionResponse getRegionResponse = response.getGetRegionResponse(); - BasicTypes.Region region = getRegionResponse.getRegion(); - - assertEquals(TEST_REGION, region.getName()); - assertEquals(0, region.getSize()); - assertEquals(false, region.getPersisted()); - assertEquals(DataPolicy.NORMAL.toString(), region.getDataPolicy()); - assertEquals("", region.getKeyConstraint()); - assertEquals("", region.getValueConstraint()); - assertEquals(Scope.DISTRIBUTED_NO_ACK, Scope.fromString(region.getScope())); - } - - private void validateGetResponse(Socket socket, - ProtobufProtocolSerializer protobufProtocolSerializer, Object expectedValue) - throws InvalidProtocolMessageException, IOException { - ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); - - assertEquals(ClientProtocol.Response.ResponseAPICase.GETRESPONSE, - response.getResponseAPICase()); - RegionAPI.GetResponse getResponse = response.getGetResponse(); - BasicTypes.EncodedValue result = getResponse.getResult(); - assertEquals(BasicTypes.EncodedValue.ValueCase.STRINGRESULT, result.getValueCase()); - assertEquals(expectedValue, result.getStringResult()); - } - - private ClientProtocol.Response deserializeResponse(Socket socket, - ProtobufProtocolSerializer protobufProtocolSerializer) - throws InvalidProtocolMessageException, IOException { - ClientProtocol.Message message = - protobufProtocolSerializer.deserialize(socket.getInputStream()); - assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase()); - return message.getResponse(); - } - - private void validateGetRegionNamesResponse(Socket socket, - ProtobufProtocolSerializer protobufProtocolSerializer) - throws InvalidProtocolMessageException, IOException { - ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); - - assertEquals(ClientProtocol.Response.ResponseAPICase.GETREGIONNAMESRESPONSE, - response.getResponseAPICase()); - RegionAPI.GetRegionNamesResponse getRegionsResponse = response.getGetRegionNamesResponse(); - assertEquals(1, getRegionsResponse.getRegionsCount()); - assertEquals(TEST_REGION, getRegionsResponse.getRegions(0)); - } - - private void validatePutAllResponse(Socket socket, - ProtobufProtocolSerializer protobufProtocolSerializer, - Collection<BasicTypes.EncodedValue> expectedFailedKeys) throws Exception { - ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); - -// assertEquals(ClientProtocol.Response.ResponseAPICase.PUTALLRESPONSE, -// response.getResponseAPICase()); -// assertEquals(expectedFailedKeys.size(), response.getPutAllResponse().getFailedKeysCount()); - -// Stream<BasicTypes.EncodedValue> failedKeyStream = response.getPutAllResponse() -// .getFailedKeysList().stream().map(BasicTypes.KeyedError::getKey); -// assertTrue(failedKeyStream.allMatch(expectedFailedKeys::contains)); - - } - - private void validateGetAllResponse(Socket socket, - ProtobufProtocolSerializer protobufProtocolSerializer) throws InvalidProtocolMessageException, - IOException, UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { - ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); -// assertEquals(ClientProtocol.Response.ResponseAPICase.GETALLRESPONSE, -// response.getResponseAPICase()); - RegionAPI.GetAllResponse getAllResponse = response.getGetAllResponse(); -// assertEquals(3, getAllResponse.getEntriesCount()); -// for (BasicTypes.Entry result : getAllResponse.getEntriesList()) { -// String key = (String) ProtobufUtilities.decodeValue(serializationService, result.getKey()); -// String value = -// (String) ProtobufUtilities.decodeValue(serializationService, result.getValue()); -// switch (key) { -// case TEST_MULTIOP_KEY1: -// assertEquals(TEST_MULTIOP_VALUE1, value); -// break; -// case TEST_MULTIOP_KEY2: -// assertEquals(TEST_MULTIOP_VALUE2, value); -// break; -// case TEST_MULTIOP_KEY3: -// assertEquals(TEST_MULTIOP_VALUE3, value); -// break; -// default: -// Assert.fail("Unexpected key found by getAll: " + key); -// } -// } - } - - private void validateRemoveResponse(Socket socket, - ProtobufProtocolSerializer protobufProtocolSerializer) throws Exception { - ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); - assertEquals(ClientProtocol.Response.ResponseAPICase.REMOVERESPONSE, - response.getResponseAPICase()); - } - - private void updatePropertiesForSSLCache(Properties properties) { - String keyStore = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); - String trustStore = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); - - properties.put(SSL_ENABLED_COMPONENTS, "server"); - properties.put(ConfigurationProperties.SSL_PROTOCOLS, SSL_PROTOCOLS); - properties.put(ConfigurationProperties.SSL_CIPHERS, SSL_CIPHERS); - properties.put(SSL_REQUIRE_AUTHENTICATION, String.valueOf(true)); - - properties.put(SSL_KEYSTORE_TYPE, "jks"); - properties.put(SSL_KEYSTORE, keyStore); - properties.put(SSL_KEYSTORE_PASSWORD, "password"); - properties.put(SSL_TRUSTSTORE, trustStore); - properties.put(SSL_TRUSTSTORE_PASSWORD, "password"); - } - - private Socket getSSLSocket() throws IOException { - String keyStorePath = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); - String trustStorePath = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); - - SSLConfig sslConfig = new SSLConfig(); - sslConfig.setEnabled(true); - sslConfig.setCiphers(SSL_CIPHERS); - sslConfig.setProtocols(SSL_PROTOCOLS); - sslConfig.setRequireAuth(true); - sslConfig.setKeystoreType("jks"); - sslConfig.setKeystore(keyStorePath); - sslConfig.setKeystorePassword("password"); - sslConfig.setTruststore(trustStorePath); - sslConfig.setKeystorePassword("password"); - - SocketCreator socketCreator = new SocketCreator(sslConfig); - return socketCreator.connectForClient("localhost", cacheServerPort, 5000); + randomLengthString())); + } + String regionName = new Random().nextBoolean() ? TEST_REGION : TEST_REGION2; + ClientProtocol.Message putAllMessage = ProtobufUtilities.createProtobufMessage( + ProtobufRequestUtilities.createPutAllRequest(regionName, putEntries)); + protobufProtocolSerializer.serialize(putAllMessage, outputStream); + validatePutAllResponse(socket, protobufProtocolSerializer, new HashSet<>()); + + Set<BasicTypes.EncodedValue> getEntries = new HashSet<>(); + getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY1)); + if (new Random().nextInt() % 5 == 0) { + getEntries + .add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY2)); + getEntries + .add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY3)); + } + + RegionAPI.GetAllRequest getAllRequest = + ProtobufRequestUtilities.createGetAllRequest(regionName, getEntries); + + ClientProtocol.Message getAllMessage = ProtobufUtilities.createProtobufMessage( + ProtobufUtilities.createProtobufRequestWithGetAllRequest(getAllRequest)); + Thread.sleep(100); + protobufProtocolSerializer.serialize(getAllMessage, outputStream); + validateGetAllResponse(socket, protobufProtocolSerializer); } + } + + @Test + public void multiKeyOperationErrorsWithClasscastException() throws Exception { + RegionFactory<Float, Object> regionFactory = cache.createRegionFactory(); + regionFactory.setKeyConstraint(Float.class); + String regionName = "constraintRegion"; + regionFactory.create(regionName); + System.setProperty("geode.feature-protobuf-protocol", "true"); + + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + Set<BasicTypes.Entry> putEntries = new HashSet<>(); + putEntries.add(ProtobufUtilities.createEntry(serializationService, 2.2f, TEST_MULTIOP_VALUE1)); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY2, + TEST_MULTIOP_VALUE2)); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY3, + TEST_MULTIOP_VALUE3)); + ClientProtocol.Message putAllMessage = ProtobufUtilities.createProtobufMessage( + ProtobufRequestUtilities.createPutAllRequest(regionName, putEntries)); + + protobufProtocolSerializer.serialize(putAllMessage, outputStream); + HashSet<BasicTypes.EncodedValue> expectedFailedKeys = new HashSet<BasicTypes.EncodedValue>(); + expectedFailedKeys + .add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY2)); + expectedFailedKeys + .add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY3)); + validatePutAllResponse(socket, protobufProtocolSerializer, expectedFailedKeys); + + ClientProtocol.Message getMessage = + MessageUtil.makeGetRequestMessage(serializationService, 2.2f, regionName); + protobufProtocolSerializer.serialize(getMessage, outputStream); + validateGetResponse(socket, protobufProtocolSerializer, TEST_MULTIOP_VALUE1); + + ClientProtocol.Message removeMessage = + ProtobufUtilities.createProtobufMessage(ProtobufRequestUtilities.createRemoveRequest( + TEST_REGION, ProtobufUtilities.createEncodedValue(serializationService, TEST_KEY))); + protobufProtocolSerializer.serialize(removeMessage, outputStream); + validateRemoveResponse(socket, protobufProtocolSerializer); + } + + @Test + public void testResponseToGetWithNoData() throws Exception { + // Get request without any data set must return a null + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + ClientProtocol.Message getMessage = + MessageUtil.makeGetRequestMessage(serializationService, TEST_KEY, TEST_REGION); + protobufProtocolSerializer.serialize(getMessage, outputStream); + + ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); + assertEquals(ClientProtocol.Response.ResponseAPICase.GETRESPONSE, + response.getResponseAPICase()); + RegionAPI.GetResponse getResponse = response.getGetResponse(); + + assertFalse(getResponse.hasResult()); + } + + @Test + public void testNewProtocolGetRegionNamesCallSucceeds() throws Exception { + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + RegionAPI.GetRegionNamesRequest getRegionNamesRequest = + ProtobufRequestUtilities.createGetRegionNamesRequest(); + + ClientProtocol.Message getRegionsMessage = + ProtobufUtilities.createProtobufMessage(ClientProtocol.Request.newBuilder() + .setGetRegionNamesRequest(getRegionNamesRequest).build()); + protobufProtocolSerializer.serialize(getRegionsMessage, outputStream); + validateGetRegionNamesResponse(socket, protobufProtocolSerializer); + } + + @Test + public void testNewProtocolGetRegionCall() throws Exception { + System.setProperty("geode.feature-protobuf-protocol", "true"); + + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + ClientProtocol.Message getRegionMessage = MessageUtil.makeGetRegionRequestMessage(TEST_REGION); + protobufProtocolSerializer.serialize(getRegionMessage, outputStream); + ClientProtocol.Message message = + protobufProtocolSerializer.deserialize(socket.getInputStream()); + assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase()); + ClientProtocol.Response response = message.getResponse(); + assertEquals(ClientProtocol.Response.ResponseAPICase.GETREGIONRESPONSE, + response.getResponseAPICase()); + RegionAPI.GetRegionResponse getRegionResponse = response.getGetRegionResponse(); + BasicTypes.Region region = getRegionResponse.getRegion(); + + assertEquals(TEST_REGION, region.getName()); + assertEquals(0, region.getSize()); + assertEquals(false, region.getPersisted()); + assertEquals(DataPolicy.NORMAL.toString(), region.getDataPolicy()); + assertEquals("", region.getKeyConstraint()); + assertEquals("", region.getValueConstraint()); + assertEquals(Scope.DISTRIBUTED_NO_ACK, Scope.fromString(region.getScope())); + } + + private void validateGetResponse(Socket socket, + ProtobufProtocolSerializer protobufProtocolSerializer, Object expectedValue) + throws InvalidProtocolMessageException, IOException { + ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); + + assertEquals(ClientProtocol.Response.ResponseAPICase.GETRESPONSE, + response.getResponseAPICase()); + RegionAPI.GetResponse getResponse = response.getGetResponse(); + BasicTypes.EncodedValue result = getResponse.getResult(); + assertEquals(BasicTypes.EncodedValue.ValueCase.STRINGRESULT, result.getValueCase()); + assertEquals(expectedValue, result.getStringResult()); + } + + private ClientProtocol.Response deserializeResponse(Socket socket, + ProtobufProtocolSerializer protobufProtocolSerializer) + throws InvalidProtocolMessageException, IOException { + ClientProtocol.Message message = + protobufProtocolSerializer.deserialize(socket.getInputStream()); + assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase()); + return message.getResponse(); + } + + private void validateGetRegionNamesResponse(Socket socket, + ProtobufProtocolSerializer protobufProtocolSerializer) + throws InvalidProtocolMessageException, IOException { + ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); + + assertEquals(ClientProtocol.Response.ResponseAPICase.GETREGIONNAMESRESPONSE, + response.getResponseAPICase()); + RegionAPI.GetRegionNamesResponse getRegionsResponse = response.getGetRegionNamesResponse(); + assertEquals(1, getRegionsResponse.getRegionsCount()); + assertEquals(TEST_REGION, getRegionsResponse.getRegions(0)); + } + + private void validatePutAllResponse(Socket socket, + ProtobufProtocolSerializer protobufProtocolSerializer, + Collection<BasicTypes.EncodedValue> expectedFailedKeys) throws Exception { + ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); + + // assertEquals(ClientProtocol.Response.ResponseAPICase.PUTALLRESPONSE, + // response.getResponseAPICase()); + // assertEquals(expectedFailedKeys.size(), response.getPutAllResponse().getFailedKeysCount()); + + // Stream<BasicTypes.EncodedValue> failedKeyStream = response.getPutAllResponse() + // .getFailedKeysList().stream().map(BasicTypes.KeyedError::getKey); + // assertTrue(failedKeyStream.allMatch(expectedFailedKeys::contains)); + + } + + private void validateGetAllResponse(Socket socket, + ProtobufProtocolSerializer protobufProtocolSerializer) throws InvalidProtocolMessageException, + IOException, UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { + ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); + // assertEquals(ClientProtocol.Response.ResponseAPICase.GETALLRESPONSE, + // response.getResponseAPICase()); + RegionAPI.GetAllResponse getAllResponse = response.getGetAllResponse(); + // assertEquals(3, getAllResponse.getEntriesCount()); + // for (BasicTypes.Entry result : getAllResponse.getEntriesList()) { + // String key = (String) ProtobufUtilities.decodeValue(serializationService, result.getKey()); + // String value = + // (String) ProtobufUtilities.decodeValue(serializationService, result.getValue()); + // switch (key) { + // case TEST_MULTIOP_KEY1: + // assertEquals(TEST_MULTIOP_VALUE1, value); + // break; + // case TEST_MULTIOP_KEY2: + // assertEquals(TEST_MULTIOP_VALUE2, value); + // break; + // case TEST_MULTIOP_KEY3: + // assertEquals(TEST_MULTIOP_VALUE3, value); + // break; + // default: + // Assert.fail("Unexpected key found by getAll: " + key); + // } + // } + } + + private void validateRemoveResponse(Socket socket, + ProtobufProtocolSerializer protobufProtocolSerializer) throws Exception { + ClientProtocol.Response response = deserializeResponse(socket, protobufProtocolSerializer); + assertEquals(ClientProtocol.Response.ResponseAPICase.REMOVERESPONSE, + response.getResponseAPICase()); + } + + private void updatePropertiesForSSLCache(Properties properties) { + String keyStore = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); + String trustStore = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); + + properties.put(SSL_ENABLED_COMPONENTS, "server"); + properties.put(ConfigurationProperties.SSL_PROTOCOLS, SSL_PROTOCOLS); + properties.put(ConfigurationProperties.SSL_CIPHERS, SSL_CIPHERS); + properties.put(SSL_REQUIRE_AUTHENTICATION, String.valueOf(true)); + + properties.put(SSL_KEYSTORE_TYPE, "jks"); + properties.put(SSL_KEYSTORE, keyStore); + properties.put(SSL_KEYSTORE_PASSWORD, "password"); + properties.put(SSL_TRUSTSTORE, trustStore); + properties.put(SSL_TRUSTSTORE_PASSWORD, "password"); + } + + private Socket getSSLSocket() throws IOException { + String keyStorePath = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); + String trustStorePath = TestUtil.getResourcePath(CacheOperationsJUnitTest.class, DEFAULT_STORE); + + SSLConfig sslConfig = new SSLConfig(); + sslConfig.setEnabled(true); + sslConfig.setCiphers(SSL_CIPHERS); + sslConfig.setProtocols(SSL_PROTOCOLS); + sslConfig.setRequireAuth(true); + sslConfig.setKeystoreType("jks"); + sslConfig.setKeystore(keyStorePath); + sslConfig.setKeystorePassword("password"); + sslConfig.setTruststore(trustStorePath); + sslConfig.setKeystorePassword("password"); + + SocketCreator socketCreator = new SocketCreator(sslConfig); + return socketCreator.connectForClient("localhost", cacheServerPort, 5000); + } } -- To stop receiving notification emails like this one, please contact u...@apache.org.