Author: burn
Date: Thu Jun 2 15:18:04 2016
New Revision: 1746587
URL: http://svn.apache.org/viewvc?rev=1746587&view=rev
Log:
UIMA-4953 When failover: specified disable collection of JMX statistics
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java?rev=1746587&r1=1746586&r2=1746587&view=diff
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java
(original)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java
Thu Jun 2 15:18:04 2016
@@ -19,12 +19,9 @@
package org.apache.uima.ducc.cli;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.PrintStream;
import java.io.StringReader;
-import java.io.StringWriter;
-import java.net.MalformedURLException;
-import java.net.URL;
+import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -47,30 +44,20 @@ import org.apache.activemq.command.Activ
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.ducc.common.IServiceStatistics;
-import org.apache.uima.ducc.common.TcpStreamHandler;
import org.apache.uima.ducc.common.utils.DuccProperties;
-public class UimaAsPing
- extends AServicePing
-{
- String ep;
-
+public class UimaAsPing extends AServicePing {
- String endpoint;
- String broker;
- int meta_timeout;
-
- String broker_host;
- int broker_jmx_port;
- boolean connected;
- UimaAsServiceMonitor monitor;
-
- int[] queueSizeWindow;
- int queueCursor = 0;
-
- String nodeIp;
- String pid;
- boolean gmfail = false;
+ private String endpoint;
+ private String brokerURI;
+ private int meta_timeout;
+
+ private String broker_host;
+ private int broker_jmx_port;
+ private UimaAsServiceMonitor monitor;
+
+ private String nodeIp;
+ private String pid;
private Connection connection;
@@ -82,76 +69,62 @@ public class UimaAsPing
private TemporaryQueue consumerDestination;
- private String brokerURI;
-
private MessageConsumer consumer;
- public UimaAsPing()
- {
+ private boolean failover = false;
+
+ public UimaAsPing() {
}
+ // Construct a monitor unless collection of JMX statistics is disabled
+ // - jmx port is specified as 'none', or
+ // - using the failover protocol
public void init(String args, String ep)
throws Exception
{
- this.ep = ep;
-
- // Ep is of the form UIMA-AS:queuename:broker
- int ndx = ep.indexOf(":");
- ep = ep.substring(ndx+1);
- ndx = ep.indexOf(":");
-
- this.endpoint = ep.substring(0, ndx).trim();
- this.broker = ep.substring(ndx+1).trim();
-
- // broker is a URL that we need to parse in order to get the actual
host and port
- // for jmx
- URL url = null;
- try {
- url = new URL(null, broker, new TcpStreamHandler());
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("Invalid broker URL: " +
broker);
- }
- broker_host = url.getHost();
- // not needed here fyi broker_port = url.getPort();
-
+ monitor = null;
if ( args == null ) {
meta_timeout = 5000;
broker_jmx_port = 1099;
} else {
- // 'q_thresh=nn,window=mm,broker_jmx_port=1100,meta_timeout=10000'
- // turn the argument string into properties
- String[] as = args.split(",");
- StringWriter sw = new StringWriter();
- for ( String s : as ) sw.write(s + "\n");
- StringReader sr = new StringReader(sw.toString());
+ // turn the argument string into properties and load
'broker_jmx_port' & 'meta_timeout'
+ String lines = args.replace(',', '\n');
DuccProperties props = new DuccProperties();
- try {
- props.load(sr);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- meta_timeout = props.getIntProperty ("meta-timeout"
, 5000);
- String broker_tmp_jmx = props.getProperty
("broker-jmx-port");
-
-
-
- if ( broker_tmp_jmx.equals("none") ) {
- broker_jmx_port = -1;
- this.monitor = null;
+ props.load(new StringReader(lines));
+ meta_timeout = props.getIntProperty("meta-timeout", 5000);
+ if ("none".equals(props.getProperty("broker-jmx-port"))) {
+ return;
} else {
broker_jmx_port = props.getIntProperty("broker-jmx-port",
1099);
- doLog("init","Initializing UimaAsServiceMonitor:
endpoint:"+endpoint+" broker_host:"+broker_host+"
broker_jmx_port:"+broker_jmx_port);
- this.monitor = new UimaAsServiceMonitor(endpoint, broker_host,
broker_jmx_port);
}
}
+
+ // Ep is of the form UIMA-AS:queuename:broker
+ String tokens[] = ep.split(":", 3);
+ endpoint = tokens[1];
+ brokerURI = tokens[2];
+
+ // Parse broker URL to get the host for jmx.
+ // First check for the failover protocol, which is not supported for
JMX statistics
+ if (brokerURI.startsWith("failover:")) {
+ failover = true;
+ return;
+ }
+ try {
+ URI uri = new URI(brokerURI);
+ broker_host = uri.getHost();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid broker URL '" +
brokerURI + "'");
+ }
+ doLog("init", "Initializing UimaAsServiceMonitor: endpoint:" +
endpoint + " broker_host:" + broker_host + " broker_jmx_port:"
+ + broker_jmx_port);
+ monitor = new UimaAsServiceMonitor(endpoint, broker_host,
broker_jmx_port);
}
- private void initJMS(String brokerURI ) throws JMSException {
+ private void initJMS() throws JMSException {
String methodName = "initJMS";
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerURI);
- this.brokerURI = brokerURI;
connection = factory.createConnection();
connection.start();
doLog(methodName, "Connection started");
@@ -238,10 +211,10 @@ public class UimaAsPing
evaluateService(statistics); // if we get here, the get-meta
worked well enough
ExecutorService executor = null;
Exception excp = null;
- gmfail = false;
+ boolean gmfail = false;
Future<Boolean> future = null;
try {
- initJMS(broker);
+ initJMS();
TextMessage msg = producerSession.createTextMessage();
msg.setStringProperty(AsynchAEMessage.MessageFrom,
consumerDestination.getQueueName());
@@ -324,7 +297,7 @@ public class UimaAsPing
}
}
if ( gmfail || excp != null ) {
- failure_reason = "Cannot issue getMeta to: " + endpoint + ":" +
broker;
+ failure_reason = "Cannot issue getMeta to: " + endpoint + ":" +
brokerURI;
if ( excp != null ) {
if (excp.getCause() == null ) {
failure_reason = failure_reason + ": " + excp.toString();
@@ -339,7 +312,11 @@ public class UimaAsPing
if ( failure_reason != null ) {
statistics.setInfo(failure_reason);
} else {
- statistics.setInfo("Ping to " + nodeIp + ": " + pid + " ok.
(JMX disabled.)");
+ if (failover) {
+ statistics.setInfo("(JMX statistics not available for
failover protocol)");
+ } else {
+ statistics.setInfo("Ping to " + nodeIp + ": " + pid + "
ok. (JMX disabled.)");
+ }
}
} else {
monitor.setSource(nodeIp, pid, gmfail, failure_reason);