Github user pmouawad commented on a diff in the pull request: https://github.com/apache/jmeter/pull/325#discussion_r150328441 --- Diff: src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/JMSSampler.java --- @@ -142,48 +173,244 @@ public SampleResult sample(Entry entry) { res.sampleStart(); try { - TextMessage msg = createMessage(); - if (isOneway()) { - int deliveryMode = isNonPersistent() ? - DeliveryMode.NON_PERSISTENT:DeliveryMode.PERSISTENT; - producer.send(msg, deliveryMode, Integer.parseInt(getPriority()), - Long.parseLong(getExpiration())); - res.setRequestHeaders(Utils.messageProperties(msg)); - res.setResponseOK(); - res.setResponseData("Oneway request has no response data", null); + LOGGER.debug("Point-to-point mode: " + getCommunicationstyle()); + if (isBrowse()) { + handleBrowse(res); + } else if (isClearQueue()) { + handleClearQueue(res); + } else if (isOneway()) { + handleOneWay(res); + } else if (isRead()) { + handleRead(context, res); } else { - if (!useTemporyQueue()) { - msg.setJMSReplyTo(receiveQueue); - } - Message replyMsg = executor.sendAndReceive(msg, - isNonPersistent() ? DeliveryMode.NON_PERSISTENT : DeliveryMode.PERSISTENT, - Integer.parseInt(getPriority()), - Long.parseLong(getExpiration())); - res.setRequestHeaders(Utils.messageProperties(msg)); - if (replyMsg == null) { - res.setResponseMessage("No reply message received"); - } else { - if (replyMsg instanceof TextMessage) { - res.setResponseData(((TextMessage) replyMsg).getText(), null); - } else { - res.setResponseData(replyMsg.toString(), null); - } - res.setResponseHeaders(Utils.messageProperties(replyMsg)); - res.setResponseOK(); - } + handleRequestResponse(res); } } catch (Exception e) { LOGGER.warn(e.getLocalizedMessage(), e); - if (thrown != null){ + if (thrown != null) { res.setResponseMessage(thrown.toString()); - } else { + } else { res.setResponseMessage(e.getLocalizedMessage()); } } res.sampleEnd(); return res; } + private void handleBrowse(SampleResult res) throws JMSException { + LOGGER.debug("isBrowseOnly"); + StringBuffer sb = new StringBuffer(""); + res.setSuccessful(true); + sb.append("\n \n Browse message on Send Queue " + sendQueue.getQueueName()); + sb.append(browseQueueDetails(sendQueue, res)); + res.setResponseData(sb.toString().getBytes()); + } + + private void handleClearQueue(SampleResult res) throws JMSException { + LOGGER.debug("isClearQueue"); + StringBuffer sb = new StringBuffer(""); + res.setSuccessful(true); + sb.append("\n \n Clear messages on Send Queue " + sendQueue.getQueueName()); + sb.append(clearQueue(sendQueue, res)); + res.setResponseData(sb.toString().getBytes()); + } + + private void handleOneWay(SampleResult res) throws JMSException { + LOGGER.debug("isOneWay"); + TextMessage msg = createMessage(); + int deliveryMode = isNonPersistent() ? DeliveryMode.NON_PERSISTENT : DeliveryMode.PERSISTENT; + producer.send(msg, deliveryMode, Integer.parseInt(getPriority()), Long.parseLong(getExpiration())); + res.setRequestHeaders(Utils.messageProperties(msg)); + res.setResponseOK(); + res.setResponseData("Oneway request has no response data", null); + } + + private void handleRead(JMeterContext context, SampleResult res) { + LOGGER.debug("isRead"); + StringBuffer sb = new StringBuffer(""); + res.setSuccessful(true); + Sampler sampler = context.getPreviousSampler(); + SampleResult sr = context.getPreviousResult(); + String jmsSelector = getJMSSelector(); + if (jmsSelector.equals("_PREV_SAMPLER_")) { + if (sampler instanceof JMSSampler) { + jmsSelector = sr.getResponseMessage(); + } + } + int sampleCounter = 0; + int sampleTries = 0; + String result = null; + + StringBuilder buffer = new StringBuilder(); + StringBuilder propBuffer = new StringBuilder(); + + do { + result = browseQueueForConsumption(sendQueue, jmsSelector, res, buffer, propBuffer); + if (result != null) { + sb.append(result); + sb.append('\n'); + sampleCounter++; + } + sampleTries++; + } while ((result != null) && (sampleTries < getNumberOfSamplesToAggregateAsInt())); + + res.setResponseMessage(sampleCounter + " samples messages received"); + res.setResponseData(buffer.toString().getBytes()); // TODO - charset? + res.setResponseHeaders(propBuffer.toString()); + if (sampleCounter == 0) { + res.setResponseCode("404"); + res.setSuccessful(false); + } else { + res.setResponseCodeOK(); + res.setSuccessful(true); + } + res.setResponseMessage(sampleCounter + " message(s) received successfully"); + res.setSamplerData(getNumberOfSamplesToAggregateAsInt() + " messages expected"); + res.setSampleCount(sampleCounter); + } + + private void handleRequestResponse(SampleResult res) throws JMSException { + TextMessage msg = createMessage(); + if (!useTemporyQueue()) { + LOGGER.debug("NO TEMP QUEUE"); + msg.setJMSReplyTo(receiveQueue); + } + LOGGER.debug("Create temp message"); + Message replyMsg = executor.sendAndReceive(msg, + isNonPersistent() ? DeliveryMode.NON_PERSISTENT : DeliveryMode.PERSISTENT, + Integer.parseInt(getPriority()), Long.parseLong(getExpiration())); + res.setRequestHeaders(Utils.messageProperties(msg)); + if (replyMsg == null) { + res.setResponseMessage("No reply message received"); + } else { + if (replyMsg instanceof TextMessage) { + res.setResponseData(((TextMessage) replyMsg).getText(), null); + } else { + res.setResponseData(replyMsg.toString(), null); + } + res.setResponseHeaders(Utils.messageProperties(replyMsg)); + res.setResponseOK(); + } + } + + private String browseQueueForConsumption(Queue queue, String jmsSelector, SampleResult res, StringBuilder buffer, + StringBuilder propBuffer) { + String retVal = null; + try { + QueueReceiver consumer = session.createReceiver(queue, jmsSelector); + Message reply = consumer.receive(Long.valueOf(getTimeout())); + LOGGER.debug("Message: " + reply); + consumer.close(); + if (reply != null) { + res.setResponseMessage("1 message(s) received successfully"); + res.setResponseHeaders(reply.toString()); + TextMessage msg = (TextMessage) reply; + retVal = msg.getText(); + extractContent(buffer, propBuffer, msg); + } else { + res.setResponseMessage("No message received"); + } + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.error(ex.getMessage()); + } + return retVal; + } + + private void extractContent(StringBuilder buffer, StringBuilder propBuffer, Message msg) { + if (msg != null) { + try { + if (msg instanceof TextMessage) { + buffer.append(((TextMessage) msg).getText()); + } else if (msg instanceof ObjectMessage) { + ObjectMessage objectMessage = (ObjectMessage) msg; + if (objectMessage.getObject() != null) { + buffer.append(objectMessage.getObject().getClass()); + } else { + buffer.append("object is null"); + } + } else if (msg instanceof BytesMessage) { + BytesMessage bytesMessage = (BytesMessage) msg; + buffer.append(bytesMessage.getBodyLength() + " bytes received in BytesMessage"); + } else if (msg instanceof MapMessage) { + MapMessage mapm = (MapMessage) msg; + @SuppressWarnings("unchecked") // MapNames are Strings + Enumeration<String> enumb = mapm.getMapNames(); + while (enumb.hasMoreElements()) { + String name = enumb.nextElement(); + Object obj = mapm.getObject(name); + buffer.append(name); + buffer.append(","); + buffer.append(obj.getClass().getCanonicalName()); + buffer.append(","); + buffer.append(obj); + buffer.append("\n"); + } + } + Utils.messageProperties(propBuffer, msg); + } catch (JMSException e) { + LOGGER.error(e.getMessage()); + } + } + } + + private String browseQueueDetails(Queue queue, SampleResult res) { + try { + String messageBodies = new String("\n==== Browsing Messages === \n"); + // get some queue details + QueueBrowser qBrowser = session.createBrowser(queue); + // browse the messages + Enumeration<?> e = qBrowser.getEnumeration(); + int numMsgs = 0; + // count number of messages + String corrID = ""; + while (e.hasMoreElements()) { + TextMessage message = (TextMessage) e.nextElement(); + corrID = message.getJMSCorrelationID(); + if (corrID == null) { + corrID = message.getJMSMessageID(); + messageBodies = messageBodies + numMsgs + " - MessageID: " + corrID + ": " + message.getText() + + "\n"; + } else { + messageBodies = messageBodies + numMsgs + " - CorrelationID: " + corrID + ": " + message.getText() + + "\n"; + } + numMsgs++; + } + res.setResponseMessage(numMsgs + " messages available on the queue"); + res.setResponseHeaders(qBrowser.toString()); + return (messageBodies + queue.getQueueName() + " has " + numMsgs + " messages"); + } catch (Exception e) { + res.setResponseMessage("Error counting message on the queue"); + e.printStackTrace(); --- End diff -- Same remarks as above
---