Add stream session progress to JMX patch by Greg DeAngelis; reviewed by yukim for CASSANDRA-4757
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cf1de311 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cf1de311 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cf1de311 Branch: refs/heads/cassandra-2.0.0 Commit: cf1de311224dce9c3db51db9dcbeae30bb22cfd0 Parents: c284786 Author: Greg DeAngelis <gdean...@gmail.com> Authored: Mon Aug 26 15:22:55 2013 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Aug 27 07:36:29 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../management/StreamStateCompositeData.java | 38 ++++++++++++++++++-- 2 files changed, 36 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf1de311/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cfc4845..b910f14 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,7 @@ * Fix periodic memtable flushing behavior with clean memtables (CASSANDRA-5931) * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928) * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938) + * Add stream session progress to JMX (CASSANDRA-4757) Merged from 1.2: * Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900) * Don't announce schema version until we've loaded the changes locally http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf1de311/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java index 820a71a..3752d39 100644 --- a/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java +++ b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java @@ -33,10 +33,18 @@ import org.apache.cassandra.streaming.StreamState; */ public class StreamStateCompositeData { - private static final String[] ITEM_NAMES = new String[]{"planId", "description", "sessions"}; + private static final String[] ITEM_NAMES = new String[]{"planId", "description", "sessions", + "currentRxBytes", "totalRxBytes", "rxPercentage", + "currentTxBytes", "totalTxBytes", "txPercentage"}; private static final String[] ITEM_DESCS = new String[]{"Plan ID of this stream", "Stream plan description", - "Active stream sessions"}; + "Active stream sessions", + "Number of bytes received across all streams", + "Total bytes available to receive across all streams", + "Percentage received across all streams", + "Number of bytes sent across all streams", + "Total bytes available to send across all streams", + "Percentage sent across all streams"}; private static final OpenType<?>[] ITEM_TYPES; public static final CompositeType COMPOSITE_TYPE; @@ -45,7 +53,9 @@ public class StreamStateCompositeData { ITEM_TYPES = new OpenType[]{SimpleType.STRING, SimpleType.STRING, - ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE)}; + ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE), + SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE, + SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE}; COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(), "StreamState", ITEM_NAMES, @@ -73,6 +83,28 @@ public class StreamStateCompositeData } })).toArray(sessions); valueMap.put(ITEM_NAMES[2], sessions); + + long currentRxBytes = 0; + long totalRxBytes = 0; + long currentTxBytes = 0; + long totalTxBytes = 0; + for (SessionInfo sessInfo : streamState.sessions) + { + currentRxBytes += sessInfo.getTotalSizeReceived(); + totalRxBytes += sessInfo.getTotalSizeToReceive(); + currentTxBytes += sessInfo.getTotalSizeSent(); + totalTxBytes += sessInfo.getTotalSizeToSend(); + } + double rxPercentage = (totalRxBytes == 0 ? 100L : currentRxBytes * 100L / totalRxBytes); + double txPercentage = (totalTxBytes == 0 ? 100L : currentTxBytes * 100L / totalTxBytes); + + valueMap.put(ITEM_NAMES[3], currentRxBytes); + valueMap.put(ITEM_NAMES[4], totalRxBytes); + valueMap.put(ITEM_NAMES[5], rxPercentage); + valueMap.put(ITEM_NAMES[6], currentTxBytes); + valueMap.put(ITEM_NAMES[7], totalTxBytes); + valueMap.put(ITEM_NAMES[8], txPercentage); + try { return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);