This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new f8bbeeee2 AMQ-9166: Add destination to scheduler job
new 3a080e37e This closes #937
f8bbeeee2 is described below
commit f8bbeeee22aac7ffae1be67240214788a3bdcf29
Author: Shikhar Gupta <[email protected]>
AuthorDate: Sun Dec 11 03:14:05 2022 -0500
AMQ-9166: Add destination to scheduler job
---
.../activemq/broker/jmx/JobSchedulerView.java | 31 ++++++++++++++++++++++
.../activemq/broker/jmx/JobSchedulerViewMBean.java | 1 +
.../activemq/broker/jmx/OpenTypeSupport.java | 2 ++
.../org/apache/activemq/broker/scheduler/Job.java | 7 +++++
.../broker/scheduler/memory/InMemoryJob.java | 11 ++++++++
.../activemq/store/kahadb/scheduler/JobImpl.java | 12 +++++++++
6 files changed, 64 insertions(+)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
index 92dfe9277..b0695ada2 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
@@ -30,6 +30,7 @@ import
org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSupport;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.ByteSequence;
@@ -54,12 +55,22 @@ public class JobSchedulerView implements
JobSchedulerViewMBean {
@Override
public TabularData getAllJobs() throws Exception {
+ return getAllJobs(false);
+ }
+
+ public TabularData getAllJobs(boolean includeDestinationName) throws
Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs",
ct, new String[] { "jobId" });
TabularDataSupport rc = new TabularDataSupport(tt);
List<Job> jobs = this.jobScheduler.getAllJobs();
+ OpenWireFormat wireFormat = new OpenWireFormat();
for (Job job : jobs) {
+ if (includeDestinationName) {
+ Message msg = (Message) wireFormat.unmarshal(new
ByteSequence(job.getPayload()));
+ ActiveMQDestination destination = (ActiveMQDestination)
msg.getJMSDestination();
+ job.setDestinationName(destination.getPhysicalName());
+ }
rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
}
return rc;
@@ -67,6 +78,10 @@ public class JobSchedulerView implements
JobSchedulerViewMBean {
@Override
public TabularData getAllJobs(String startTime, String finishTime) throws
Exception {
+ return getAllJobs(startTime, finishTime, false);
+ }
+
+ public TabularData getAllJobs(String startTime, String finishTime, boolean
includeDestinationName) throws Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs",
ct, new String[] { "jobId" });
@@ -74,7 +89,13 @@ public class JobSchedulerView implements
JobSchedulerViewMBean {
long start = JobSupport.getDataTime(startTime);
long finish = JobSupport.getDataTime(finishTime);
List<Job> jobs = this.jobScheduler.getAllJobs(start, finish);
+ OpenWireFormat wireFormat = new OpenWireFormat();
for (Job job : jobs) {
+ if (includeDestinationName) {
+ Message msg = (Message) wireFormat.unmarshal(new
ByteSequence(job.getPayload()));
+ ActiveMQDestination destination = (ActiveMQDestination)
msg.getJMSDestination();
+ job.setDestinationName(destination.getPhysicalName());
+ }
rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
}
return rc;
@@ -100,12 +121,22 @@ public class JobSchedulerView implements
JobSchedulerViewMBean {
@Override
public TabularData getNextScheduleJobs() throws Exception {
+ return getNextScheduleJobs(false);
+ }
+
+ public TabularData getNextScheduleJobs(boolean includeDestinationName)
throws Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs",
ct, new String[] { "jobId" });
TabularDataSupport rc = new TabularDataSupport(tt);
List<Job> jobs = this.jobScheduler.getNextScheduleJobs();
+ OpenWireFormat wireFormat = new OpenWireFormat();
for (Job job : jobs) {
+ if (includeDestinationName) {
+ Message msg = (Message) wireFormat.unmarshal(new
ByteSequence(job.getPayload()));
+ ActiveMQDestination destination = (ActiveMQDestination)
msg.getJMSDestination();
+ job.setDestinationName(destination.getPhysicalName());
+ }
rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
}
return rc;
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
index 9aedbefc7..8649ca5b2 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
@@ -67,6 +67,7 @@ public interface JobSchedulerViewMBean {
@MBeanInfo("remove all scheduled jobs between time ranges ")
public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd
hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)
throws Exception;
+
/**
* Get the next time jobs will be fired from this scheduler store.
*
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
index dafbdf148..cf0dc4d91 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
@@ -420,6 +420,7 @@ public final class OpenTypeSupport {
addItem("next", "next time", SimpleType.STRING);
addItem("period", "period between jobs", SimpleType.LONG);
addItem("repeat", "number of times to repeat", SimpleType.INTEGER);
+ addItem("destinationName", "destination name", SimpleType.STRING);
}
@Override
@@ -433,6 +434,7 @@ public final class OpenTypeSupport {
rc.put("next", job.getNextExecutionTime());
rc.put("period", job.getPeriod());
rc.put("repeat", job.getRepeat());
+ rc.put("destinationName", job.getDestinationName());
return rc;
}
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
index 047fe239e..184521395 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
@@ -78,4 +78,11 @@ public interface Job {
*/
public int getExecutionCount();
+ /**
+ *
+ * @return name of destination
+ */
+ public String getDestinationName();
+
+ public void setDestinationName(String destinationName);
}
\ No newline at end of file
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java
index 9a4b01266..7d470664f 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java
@@ -35,6 +35,7 @@ public class InMemoryJob implements Job {
private int executionCount;
private byte[] payload;
+ private String destinationName;
public InMemoryJob(String jobId) {
this.jobId = jobId;
@@ -122,6 +123,16 @@ public class InMemoryJob implements Job {
return executionCount;
}
+ @Override
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ @Override
+ public void setDestinationName(String destinationName) {
+ this.destinationName = destinationName;
+ }
+
public void incrementExecutionCount() {
this.executionCount++;
}
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
index 217bc1fb2..56b49fa1f 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
@@ -25,6 +25,8 @@ public class JobImpl implements Job {
private final JobLocation jobLocation;
private final byte[] payload;
+ private String destinationName;
+
protected JobImpl(JobLocation location, ByteSequence bs) {
this.jobLocation = location;
this.payload = new byte[bs.getLength()];
@@ -85,4 +87,14 @@ public class JobImpl implements Job {
public String toString() {
return "Job: " + getJobId();
}
+
+ @Override
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ @Override
+ public void setDestinationName(String destinationName) {
+ this.destinationName = destinationName;
+ }
}