This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
new 1c50838 AMQ-7159 - Adding a new attribute on
PersistenceAdapterViewMBean to show information about Storage write/read latency
1c50838 is described below
commit 1c5083880a2610ebdc806b09ab5107a3957b0873
Author: Alan Protasio <[email protected]>
AuthorDate: Thu Feb 28 16:22:00 2019 -0800
AMQ-7159 - Adding a new attribute on PersistenceAdapterViewMBean to show
information about Storage write/read latency
(cherry picked from commit 87467dc61e747670bce9c27424a9d93aad8b8499)
---
.../broker/jmx/PersistenceAdapterView.java | 61 +++++++++++++++++++++-
.../broker/jmx/PersistenceAdapterViewMBean.java | 6 +++
.../store/PersistenceAdapterStatistics.java | 54 +++++++++++++++++++
.../store/kahadb/KahaDBPersistenceAdapter.java | 30 +++++++----
.../activemq/store/kahadb/MessageDatabase.java | 12 +++++
5 files changed, 153 insertions(+), 10 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java
index 06b50d0..beebd10 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterView.java
@@ -16,16 +16,25 @@
*/
package org.apache.activemq.broker.jmx;
-import java.util.concurrent.Callable;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.activemq.management.TimeStatisticImpl;
import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.PersistenceAdapterStatistics;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
public class PersistenceAdapterView implements PersistenceAdapterViewMBean {
+ private final static ObjectMapper mapper = new ObjectMapper();
private final String name;
private final PersistenceAdapter persistenceAdapter;
private Callable<String> inflightTransactionViewCallable;
private Callable<String> dataViewCallable;
+ private PersistenceAdapterStatistics persistenceAdapterStatistics;
public PersistenceAdapterView(PersistenceAdapter adapter) {
this.name = adapter.toString();
@@ -52,6 +61,22 @@ public class PersistenceAdapterView implements
PersistenceAdapterViewMBean {
return persistenceAdapter.size();
}
+ @Override
+ public String getStatistics() {
+ return serializePersistenceAdapterStatistics();
+ }
+
+ @Override
+ public String resetStatistics() {
+ final String result = serializePersistenceAdapterStatistics();
+
+ if (persistenceAdapterStatistics != null) {
+ persistenceAdapterStatistics.reset();
+ }
+
+ return result;
+ }
+
private String invoke(Callable<String> callable) {
String result = null;
if (callable != null) {
@@ -64,6 +89,36 @@ public class PersistenceAdapterView implements
PersistenceAdapterViewMBean {
return result;
}
+ private String serializePersistenceAdapterStatistics() {
+ if (persistenceAdapterStatistics != null) {
+ try {
+ Map<String, Object> result = new HashMap<String, Object>();
+ result.put("writeTime",
getTimeStatisticAsMap(persistenceAdapterStatistics.getWriteTime()));
+ result.put("readTime",
getTimeStatisticAsMap(persistenceAdapterStatistics.getReadTime()));
+ return mapper.writeValueAsString(result);
+ } catch (IOException e) {
+ return e.toString();
+ }
+ }
+
+ return null;
+ }
+
+ private Map<String, Object> getTimeStatisticAsMap(final TimeStatisticImpl
timeStatistic) {
+ Map<String, Object> result = new HashMap<String, Object>();
+
+ result.put("count", timeStatistic.getCount());
+ result.put("maxTime", timeStatistic.getMaxTime());
+ result.put("minTime", timeStatistic.getMinTime());
+ result.put("totalTime", timeStatistic.getTotalTime());
+ result.put("averageTime", timeStatistic.getAverageTime());
+ result.put("averageTimeExMinMax",
timeStatistic.getAverageTimeExcludingMinMax());
+ result.put("averagePerSecond", timeStatistic.getAveragePerSecond());
+ result.put("averagePerSecondExMinMax",
timeStatistic.getAveragePerSecondExcludingMinMax());
+
+ return result;
+ }
+
public void setDataViewCallable(Callable<String> dataViewCallable) {
this.dataViewCallable = dataViewCallable;
}
@@ -71,4 +126,8 @@ public class PersistenceAdapterView implements
PersistenceAdapterViewMBean {
public void setInflightTransactionViewCallable(Callable<String>
inflightTransactionViewCallable) {
this.inflightTransactionViewCallable = inflightTransactionViewCallable;
}
+
+ public void setPersistenceAdapterStatistics(PersistenceAdapterStatistics
persistenceAdapterStatistics) {
+ this.persistenceAdapterStatistics = persistenceAdapterStatistics;
+ }
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java
index b860e9c..3eee6ea 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java
@@ -29,4 +29,10 @@ public interface PersistenceAdapterViewMBean {
@MBeanInfo("Current size.")
long getSize();
+
+ @MBeanInfo("Statistics related to the PersistentAdapter.")
+ String getStatistics();
+
+ @MBeanInfo("Resets statistics.")
+ String resetStatistics();
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterStatistics.java
b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterStatistics.java
new file mode 100644
index 0000000..0a21469
--- /dev/null
+++
b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterStatistics.java
@@ -0,0 +1,54 @@
+package org.apache.activemq.store;
+
+import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.management.TimeStatisticImpl;
+
+public class PersistenceAdapterStatistics extends StatsImpl {
+ protected TimeStatisticImpl writeTime;
+ protected TimeStatisticImpl readTime;
+
+ public PersistenceAdapterStatistics() {
+ writeTime = new TimeStatisticImpl("writeTime", "Time to write data to
the PersistentAdapter.");
+ readTime = new TimeStatisticImpl("readTime", "Time to read data from
the PersistentAdapter.");
+ addStatistic("writeTime", writeTime);
+ addStatistic("readTime", readTime);
+ }
+
+ public void addWriteTime(final long time) {
+ writeTime.addTime(time);
+ }
+
+ public void addReadTime(final long time) {
+ readTime.addTime(time);
+ }
+
+ public void setEnabled(boolean enabled) {
+ super.setEnabled(enabled);
+ writeTime.setEnabled(enabled);
+ readTime.setEnabled(enabled);
+ }
+
+ public TimeStatisticImpl getWriteTime() {
+ return writeTime;
+ }
+
+ public TimeStatisticImpl getReadTime() { return readTime; }
+
+ public void reset() {
+ if (isDoReset()) {
+ writeTime.reset();
+ readTime.reset();
+ }
+ }
+
+ public void setParent(PersistenceAdapterStatistics parent) {
+ if (parent != null) {
+ writeTime.setParent(parent.writeTime);
+ readTime.setParent(parent.readTime);
+ } else {
+ writeTime.setParent(null);
+ readTime.setParent(null);
+ }
+
+ }
+}
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index fbeda4c..e8122d9 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -16,15 +16,6 @@
*/
package org.apache.activemq.store.kahadb;
-import static
org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
-import javax.management.ObjectName;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.LockableServiceSupport;
@@ -44,6 +35,7 @@ import org.apache.activemq.store.JournaledStore;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.NoLocalSubscriptionAware;
import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.PersistenceAdapterStatistics;
import org.apache.activemq.store.SharedFileLocker;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionIdTransformer;
@@ -56,6 +48,14 @@ import
org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStra
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper;
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import static
org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
+
/**
* An implementation of {@link PersistenceAdapter} designed for use with
* KahaDB - Embedded Lightweight Non-Relational Database
@@ -245,6 +245,9 @@ public class KahaDBPersistenceAdapter extends
LockableServiceSupport implements
return
letter.getJournal().getFileMap().keySet().toString();
}
});
+
+
view.setPersistenceAdapterStatistics(letter.persistenceAdapterStatistics);
+
AnnotatedMBean.registerMBean(brokerService.getManagementContext(),
view,
createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(),
toString()));
}
@@ -405,6 +408,15 @@ public class KahaDBPersistenceAdapter extends
LockableServiceSupport implements
}
/**
+ * Get the PersistenceAdapterStatistics
+ *
+ * @return the persistenceAdapterStatistics
+ */
+ public PersistenceAdapterStatistics getPersistenceAdapterStatistics() {
+ return this.letter.getPersistenceAdapterStatistics();
+ }
+
+ /**
* Get the directory
*
* @return the directory
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 83d3fff..2ef3023 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -71,6 +71,7 @@ import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
+import org.apache.activemq.store.PersistenceAdapterStatistics;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
@@ -249,6 +250,7 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
protected PageFile pageFile;
protected Journal journal;
protected Metadata metadata = new Metadata();
+ protected final PersistenceAdapterStatistics persistenceAdapterStatistics
= new PersistenceAdapterStatistics();
protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
@@ -1141,6 +1143,9 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
LOG.info("Slow KahaDB access: Journal append took:
"+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
}
}
+
+ persistenceAdapterStatistics.addWriteTime(end - start);
+
} finally {
checkpointLock.readLock().unlock();
}
@@ -1176,6 +1181,9 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
LOG.info("Slow KahaDB access: Journal read took:
"+(end-start)+" ms");
}
}
+
+ persistenceAdapterStatistics.addReadTime(end - start);
+
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
byte readByte = is.readByte();
KahaEntryType type = KahaEntryType.valueOf(readByte);
@@ -3525,6 +3533,10 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
return enableIndexPageCaching;
}
+ public PersistenceAdapterStatistics getPersistenceAdapterStatistics() {
+ return this.persistenceAdapterStatistics;
+ }
+
// /////////////////////////////////////////////////////////////////
// Internal conversion methods.
// /////////////////////////////////////////////////////////////////