http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java new file mode 100644 index 0000000..607f92b --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java @@ -0,0 +1,627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.tools.xml; + +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import java.io.File; +import java.io.OutputStream; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import io.airlift.airline.Command; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.commands.tools.OptionalLocking; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; +import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.paging.PagedMessage; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.PagingStoreFactory; +import org.apache.activemq.artemis.core.paging.cursor.PagePosition; +import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl; +import org.apache.activemq.artemis.core.paging.impl.Page; +import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl; +import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; +import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; +import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe; +import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal; +import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.MessageDescribe; +import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.OrderedExecutorFactory; + +@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.") +public final class XmlDataExporter extends OptionalLocking { + + private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L; + + private JournalStorageManager storageManager; + + private Configuration config; + + private XMLStreamWriter xmlWriter; + + // an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID + private final Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs = new HashMap<>(); + + // map of all message records hashed by their record ID (which will match the record ID of the message refs) + private final HashMap<Long, Message> messages = new HashMap<>(); + + private final Map<Long, Set<PagePosition>> cursorRecords = new HashMap<>(); + + private final Set<Long> pgTXs = new HashSet<>(); + + private final HashMap<Long, PersistentQueueBindingEncoding> queueBindings = new HashMap<>(); + + private final HashMap<Long, PersistentAddressBindingEncoding> addressBindings = new HashMap<>(); + + long messagesPrinted = 0L; + + long bindingsPrinted = 0L; + + @Override + public Object execute(ActionContext context) throws Exception { + super.execute(context); + + try { + process(context.out, getBinding(), getJournal(), getPaging(), getLargeMessages()); + } catch (Exception e) { + treatError(e, "data", "exp"); + } + return null; + } + + public void process(OutputStream out, + String bindingsDir, + String journalDir, + String pagingDir, + String largeMessagesDir) throws Exception { + config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO); + final ExecutorService executor = Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()); + ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); + + storageManager = new JournalStorageManager(config, executorFactory, executorFactory); + + XMLOutputFactory factory = XMLOutputFactory.newInstance(); + XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8"); + PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter); + xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler); + + writeXMLData(); + + executor.shutdown(); + } + + private void writeXMLData() throws Exception { + long start = System.currentTimeMillis(); + getBindings(); + processMessageJournal(); + printDataAsXML(); + ActiveMQServerLogger.LOGGER.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms"); + ActiveMQServerLogger.LOGGER.debug("Output " + messagesPrinted + " messages and " + bindingsPrinted + " bindings."); + } + + /** + * Read through the message journal and stuff all the events/data we care about into local data structures. We'll + * use this data later to print all the right information. + * + * @throws Exception will be thrown if anything goes wrong reading the journal + */ + private void processMessageJournal() throws Exception { + ArrayList<RecordInfo> acks = new ArrayList<>(); + + List<RecordInfo> records = new LinkedList<>(); + + // We load these, but don't use them. + List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>(); + + Journal messageJournal = storageManager.getMessageJournal(); + + ActiveMQServerLogger.LOGGER.debug("Reading journal from " + config.getJournalDirectory()); + + messageJournal.start(); + + // Just logging these, no action necessary + TransactionFailureCallback transactionFailureCallback = new TransactionFailureCallback() { + @Override + public void failedTransaction(long transactionID, + List<RecordInfo> records1, + List<RecordInfo> recordsToDelete) { + StringBuilder message = new StringBuilder(); + message.append("Encountered failed journal transaction: ").append(transactionID); + for (int i = 0; i < records1.size(); i++) { + if (i == 0) { + message.append("; Records: "); + } + message.append(records1.get(i)); + if (i != (records1.size() - 1)) { + message.append(", "); + } + } + + for (int i = 0; i < recordsToDelete.size(); i++) { + if (i == 0) { + message.append("; RecordsToDelete: "); + } + message.append(recordsToDelete.get(i)); + if (i != (recordsToDelete.size() - 1)) { + message.append(", "); + } + } + + ActiveMQServerLogger.LOGGER.debug(message.toString()); + } + }; + + ((JournalImpl) messageJournal).load(records, preparedTransactions, transactionFailureCallback, false); + + // Since we don't use these nullify the reference so that the garbage collector can clean them up + preparedTransactions = null; + + for (RecordInfo info : records) { + byte[] data = info.data; + + ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); + + Object o = DescribeJournal.newObjectEncoding(info, storageManager); + if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE) { + messages.put(info.id, ((MessageDescribe) o).getMsg().toCore()); + } else if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE_PROTOCOL) { + messages.put(info.id, ((MessageDescribe) o).getMsg().toCore()); + } else if (info.getUserRecordType() == JournalRecordIds.ADD_LARGE_MESSAGE) { + messages.put(info.id, ((MessageDescribe) o).getMsg()); + } else if (info.getUserRecordType() == JournalRecordIds.ADD_REF) { + ReferenceDescribe ref = (ReferenceDescribe) o; + HashMap<Long, ReferenceDescribe> map = messageRefs.get(info.id); + if (map == null) { + HashMap<Long, ReferenceDescribe> newMap = new HashMap<>(); + newMap.put(ref.refEncoding.queueID, ref); + messageRefs.put(info.id, newMap); + } else { + map.put(ref.refEncoding.queueID, ref); + } + } else if (info.getUserRecordType() == JournalRecordIds.ACKNOWLEDGE_REF) { + acks.add(info); + } else if (info.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR) { + CursorAckRecordEncoding encoding = new CursorAckRecordEncoding(); + encoding.decode(buff); + + Set<PagePosition> set = cursorRecords.get(encoding.queueID); + + if (set == null) { + set = new HashSet<>(); + cursorRecords.put(encoding.queueID, set); + } + + set.add(encoding.position); + } else if (info.userRecordType == JournalRecordIds.PAGE_TRANSACTION) { + if (info.isUpdate) { + PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding(); + + pageUpdate.decode(buff); + pgTXs.add(pageUpdate.pageTX); + } else { + PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl(); + + pageTransactionInfo.decode(buff); + + pageTransactionInfo.setRecordID(info.id); + pgTXs.add(pageTransactionInfo.getTransactionID()); + } + } + } + + messageJournal.stop(); + + removeAcked(acks); + } + + /** + * Go back through the messages and message refs we found in the journal and remove the ones that have been acked. + * + * @param acks the list of ack records we got from the journal + */ + private void removeAcked(ArrayList<RecordInfo> acks) { + for (RecordInfo info : acks) { + AckDescribe ack = (AckDescribe) DescribeJournal.newObjectEncoding(info, null); + HashMap<Long, ReferenceDescribe> referenceDescribeHashMap = messageRefs.get(info.id); + referenceDescribeHashMap.remove(ack.refEncoding.queueID); + if (referenceDescribeHashMap.size() == 0) { + messages.remove(info.id); + messageRefs.remove(info.id); + } + } + } + + /** + * Open the bindings journal and extract all bindings data. + * + * @throws Exception will be thrown if anything goes wrong reading the bindings journal + */ + private void getBindings() throws Exception { + List<RecordInfo> records = new LinkedList<>(); + + Journal bindingsJournal = storageManager.getBindingsJournal(); + + bindingsJournal.start(); + + ActiveMQServerLogger.LOGGER.debug("Reading bindings journal from " + config.getBindingsDirectory()); + + ((JournalImpl) bindingsJournal).load(records, null, null, false); + + for (RecordInfo info : records) { + if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) { + PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) DescribeJournal.newObjectEncoding(info, null); + queueBindings.put(bindingEncoding.getId(), bindingEncoding); + } else if (info.getUserRecordType() == JournalRecordIds.ADDRESS_BINDING_RECORD) { + PersistentAddressBindingEncoding bindingEncoding = (PersistentAddressBindingEncoding) DescribeJournal.newObjectEncoding(info, null); + addressBindings.put(bindingEncoding.getId(), bindingEncoding); + } + } + + bindingsJournal.stop(); + } + + private void printDataAsXML() { + try { + xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION); + xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT); + printBindingsAsXML(); + printAllMessagesAsXML(); + xmlWriter.writeEndElement(); // end DOCUMENT_PARENT + xmlWriter.writeEndDocument(); + xmlWriter.flush(); + xmlWriter.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void printBindingsAsXML() throws XMLStreamException { + xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT); + for (Map.Entry<Long, PersistentAddressBindingEncoding> addressBindingEncodingEntry : addressBindings.entrySet()) { + PersistentAddressBindingEncoding bindingEncoding = addressBindings.get(addressBindingEncodingEntry.getKey()); + xmlWriter.writeEmptyElement(XmlDataConstants.ADDRESS_BINDINGS_CHILD); + StringBuilder routingTypes = new StringBuilder(); + for (RoutingType routingType : bindingEncoding.getRoutingTypes()) { + routingTypes.append(routingType.toString()).append(", "); + } + xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE, routingTypes.toString().substring(0, routingTypes.length() - 2)); + xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_NAME, bindingEncoding.getName().toString()); + xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ID, Long.toString(bindingEncoding.getId())); + bindingsPrinted++; + } + for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet()) { + PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey()); + xmlWriter.writeEmptyElement(XmlDataConstants.QUEUE_BINDINGS_CHILD); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ADDRESS, bindingEncoding.getAddress().toString()); + String filter = ""; + if (bindingEncoding.getFilterString() != null) { + filter = bindingEncoding.getFilterString().toString(); + } + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_FILTER_STRING, filter); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_NAME, bindingEncoding.getQueueName().toString()); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ID, Long.toString(bindingEncoding.getId())); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE, RoutingType.getType(bindingEncoding.getRoutingType()).toString()); + bindingsPrinted++; + } + xmlWriter.writeEndElement(); // end BINDINGS_PARENT + } + + private void printAllMessagesAsXML() throws Exception { + xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT); + + // Order here is important. We must process the messages from the journal before we process those from the page + // files in order to get the messages in the right order. + for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) { + printSingleMessageAsXML(messageMapEntry.getValue().toCore(), extractQueueNames(messageRefs.get(messageMapEntry.getKey()))); + } + + printPagedMessagesAsXML(); + + xmlWriter.writeEndElement(); // end "messages" + } + + /** + * Reads from the page files and prints messages as it finds them (making sure to check acks and transactions + * from the journal). + */ + private void printPagedMessagesAsXML() { + try { + ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()); + final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory()); + ExecutorFactory executorFactory = new ExecutorFactory() { + @Override + public Executor getExecutor() { + return executor; + } + }; + PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduled, executorFactory, true, null); + HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<>(); + addressSettingsRepository.setDefault(new AddressSettings()); + PagingManager manager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository); + + manager.start(); + + SimpleString[] stores = manager.getStoreNames(); + + for (SimpleString store : stores) { + PagingStore pageStore = manager.getPageStore(store); + + if (pageStore != null) { + File folder = pageStore.getFolder(); + ActiveMQServerLogger.LOGGER.debug("Reading page store " + store + " folder = " + folder); + + int pageId = (int) pageStore.getFirstPage(); + for (int i = 0; i < pageStore.getNumberOfPages(); i++) { + ActiveMQServerLogger.LOGGER.debug("Reading page " + pageId); + Page page = pageStore.createPage(pageId); + page.open(); + List<PagedMessage> messages = page.read(storageManager); + page.close(); + + int messageId = 0; + + for (PagedMessage message : messages) { + message.initMessage(storageManager); + long[] queueIDs = message.getQueueIDs(); + List<String> queueNames = new ArrayList<>(); + for (long queueID : queueIDs) { + PagePosition posCheck = new PagePositionImpl(pageId, messageId); + + boolean acked = false; + + Set<PagePosition> positions = cursorRecords.get(queueID); + if (positions != null) { + acked = positions.contains(posCheck); + } + + if (!acked) { + PersistentQueueBindingEncoding queueBinding = queueBindings.get(queueID); + if (queueBinding != null) { + SimpleString queueName = queueBinding.getQueueName(); + queueNames.add(queueName.toString()); + } + } + } + + if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID()))) { + printSingleMessageAsXML(message.getMessage().toCore(), queueNames); + } + + messageId++; + } + + pageId++; + } + } else { + ActiveMQServerLogger.LOGGER.debug("Page store was null"); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception { + xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD); + printMessageAttributes(message); + printMessageProperties(message); + printMessageQueues(queues); + printMessageBody(message.toCore()); + xmlWriter.writeEndElement(); // end MESSAGES_CHILD + messagesPrinted++; + } + + private void printMessageBody(Message message) throws Exception { + xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY); + + if (message.toCore().isLargeMessage()) { + printLargeMessageBody((LargeServerMessage) message); + } else { + xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message)); + } + xmlWriter.writeEndElement(); // end MESSAGE_BODY + } + + private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException { + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString()); + LargeBodyEncoder encoder = null; + + try { + encoder = message.toCore().getBodyEncoder(); + encoder.open(); + long totalBytesWritten = 0; + Long bufferSize; + long bodySize = encoder.getLargeBodySize(); + for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) { + Long remainder = bodySize - totalBytesWritten; + if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) { + bufferSize = LARGE_MESSAGE_CHUNK_SIZE; + } else { + bufferSize = remainder; + } + ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue()); + encoder.encode(buffer, bufferSize.intValue()); + xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array())); + totalBytesWritten += bufferSize; + } + encoder.close(); + } catch (ActiveMQException e) { + e.printStackTrace(); + } finally { + if (encoder != null) { + try { + encoder.close(); + } catch (ActiveMQException e) { + e.printStackTrace(); + } + } + } + } + + private void printMessageQueues(List<String> queues) throws XMLStreamException { + xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT); + for (String queueName : queues) { + xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName); + } + xmlWriter.writeEndElement(); // end QUEUES_PARENT + } + + private void printMessageProperties(Message message) throws XMLStreamException { + xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT); + for (SimpleString key : message.getPropertyNames()) { + Object value = message.getObjectProperty(key); + xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD); + xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString()); + xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value)); + + // Write the property type as an attribute + String propertyType = XmlDataExporterUtil.getPropertyType(value); + if (propertyType != null) { + xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType); + } + } + xmlWriter.writeEndElement(); // end PROPERTIES_PARENT + } + + private void printMessageAttributes(ICoreMessage message) throws XMLStreamException { + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID())); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority())); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration())); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp())); + String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType()); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType); + if (message.getUserID() != null) { + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString()); + } + } + + private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap) { + List<String> queues = new ArrayList<>(); + for (ReferenceDescribe ref : refMap.values()) { + queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString()); + } + return queues; + } + + // Inner classes ------------------------------------------------- + + /** + * Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that. + */ + static class PrettyPrintHandler implements InvocationHandler { + + private final XMLStreamWriter target; + + private int depth = 0; + + private static final char INDENT_CHAR = ' '; + + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + boolean wrap = true; + + PrettyPrintHandler(XMLStreamWriter target) { + this.target = target; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + String m = method.getName(); + + switch (m) { + case "writeStartElement": + target.writeCharacters(LINE_SEPARATOR); + target.writeCharacters(indent(depth)); + + depth++; + break; + case "writeEndElement": + depth--; + if (wrap) { + target.writeCharacters(LINE_SEPARATOR); + target.writeCharacters(indent(depth)); + } + wrap = true; + break; + case "writeEmptyElement": + case "writeCData": + target.writeCharacters(LINE_SEPARATOR); + target.writeCharacters(indent(depth)); + break; + case "writeCharacters": + wrap = false; + break; + } + + method.invoke(target, args); + + return null; + } + + private String indent(int depth) { + depth *= 3; // level of indentation + char[] output = new char[depth]; + while (depth-- > 0) { + output[depth] = INDENT_CHAR; + } + return new String(output); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java new file mode 100644 index 0000000..df48dcf --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.tools.xml; + +import com.google.common.base.Preconditions; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.utils.Base64; + +/** + * Common utility methods to help with XML message conversion + */ +public class XmlDataExporterUtil { + + static String convertProperty(final Object value) { + if (value instanceof byte[]) { + return encode((byte[]) value); + } else { + return value == null ? XmlDataConstants.NULL : value.toString(); + } + } + + static String getPropertyType(final Object value) { + String stringValue = null; + + // if the value is null then we can't really know what it is so just set + // the type to the most generic thing + if (value == null) { + stringValue = XmlDataConstants.PROPERTY_TYPE_BYTES; + } else if (value instanceof Boolean) { + stringValue = XmlDataConstants.PROPERTY_TYPE_BOOLEAN; + } else if (value instanceof Byte) { + stringValue = XmlDataConstants.PROPERTY_TYPE_BYTE; + } else if (value instanceof Short) { + stringValue = XmlDataConstants.PROPERTY_TYPE_SHORT; + } else if (value instanceof Integer) { + stringValue = XmlDataConstants.PROPERTY_TYPE_INTEGER; + } else if (value instanceof Long) { + stringValue = XmlDataConstants.PROPERTY_TYPE_LONG; + } else if (value instanceof Float) { + stringValue = XmlDataConstants.PROPERTY_TYPE_FLOAT; + } else if (value instanceof Double) { + stringValue = XmlDataConstants.PROPERTY_TYPE_DOUBLE; + } else if (value instanceof String) { + stringValue = XmlDataConstants.PROPERTY_TYPE_STRING; + } else if (value instanceof SimpleString) { + stringValue = XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING; + } else if (value instanceof byte[]) { + stringValue = XmlDataConstants.PROPERTY_TYPE_BYTES; + } + + return stringValue; + } + + public static String getMessagePrettyType(byte rawType) { + String prettyType = XmlDataConstants.DEFAULT_TYPE_PRETTY; + + if (rawType == Message.BYTES_TYPE) { + prettyType = XmlDataConstants.BYTES_TYPE_PRETTY; + } else if (rawType == Message.MAP_TYPE) { + prettyType = XmlDataConstants.MAP_TYPE_PRETTY; + } else if (rawType == Message.OBJECT_TYPE) { + prettyType = XmlDataConstants.OBJECT_TYPE_PRETTY; + } else if (rawType == Message.STREAM_TYPE) { + prettyType = XmlDataConstants.STREAM_TYPE_PRETTY; + } else if (rawType == Message.TEXT_TYPE) { + prettyType = XmlDataConstants.TEXT_TYPE_PRETTY; + } + + return prettyType; + } + + /** + * Base64 encode a ServerMessage body into the proper XML format + */ + static String encodeMessageBody(final Message message) throws Exception { + Preconditions.checkNotNull(message, "ServerMessage can not be null"); + + ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer(); + byte[] buffer = new byte[byteBuffer.writerIndex()]; + byteBuffer.readBytes(buffer); + + return XmlDataExporterUtil.encode(buffer); + } + + protected static String encode(final byte[] data) { + return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java new file mode 100644 index 0000000..a824177 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.tools.xml; + +import javax.xml.XMLConstants; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamConstants; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.transform.stax.StAXSource; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; +import javax.xml.validation.Validator; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientRequestor; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.cli.commands.ActionAbstract; +import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.utils.Base64; +import org.apache.activemq.artemis.utils.ClassloadingUtil; +import org.apache.activemq.artemis.utils.ListUtil; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.jboss.logging.Logger; + +/** + * Read XML output from <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataExporter</code>, create a core session, and + * send the messages to a running instance of ActiveMQ Artemis. It uses the StAX <code>javax.xml.stream.XMLStreamReader</code> + * for speed and simplicity. + */ +@Command(name = "imp", description = "Import all message-data using an XML that could be interpreted by any system.") +public final class XmlDataImporter extends ActionAbstract { + + private static final Logger logger = Logger.getLogger(XmlDataImporter.class); + + private XMLStreamReader reader; + + // this session is really only needed if the "session" variable does not auto-commit sends + ClientSession managementSession; + + boolean localSession = false; + + final Map<String, String> addressMap = new HashMap<>(); + + final Map<String, Long> queueIDs = new HashMap<>(); + + String tempFileName = ""; + + private ClientSession session; + + @Option(name = "--host", description = "The host used to import the data (default localhost)") + public String host = "localhost"; + + @Option(name = "--port", description = "The port used to import the data (default 61616)") + public int port = 61616; + + @Option(name = "--transaction", description = "If this is set to true you will need a whole transaction to commit at the end. (default false)") + public boolean transactional; + + @Option(name = "--user", description = "User name used to import the data. (default null)") + public String user = null; + + @Option(name = "--password", description = "User name used to import the data. (default null)") + public String password = null; + + @Option(name = "--input", description = "The input file name (default=exp.dmp)", required = true) + public String input = "exp.dmp"; + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + @Override + public Object execute(ActionContext context) throws Exception { + process(input, host, port, transactional); + return null; + } + + public void process(String inputFile, String host, int port, boolean transactional) throws Exception { + this.process(new FileInputStream(inputFile), host, port, transactional); + } + + /** + * This is the normal constructor for programmatic access to the + * <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataImporter</code> if the session passed + * in uses auto-commit for sends. + * <br> + * If the session needs to be transactional then use the constructor which takes 2 sessions. + * + * @param inputStream the stream from which to read the XML for import + * @param session used for sending messages, must use auto-commit for sends + */ + public void process(InputStream inputStream, ClientSession session) throws Exception { + this.process(inputStream, session, null); + } + + /** + * This is the constructor to use if you wish to import all messages transactionally. + * <br> + * Pass in a session which doesn't use auto-commit for sends, and one that does (for management + * operations necessary during import). + * + * @param inputStream the stream from which to read the XML for import + * @param session used for sending messages, doesn't need to auto-commit sends + * @param managementSession used for management queries, must use auto-commit for sends + */ + public void process(InputStream inputStream, + ClientSession session, + ClientSession managementSession) throws Exception { + reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream); + this.session = session; + if (managementSession != null) { + this.managementSession = managementSession; + } else { + this.managementSession = session; + } + + processXml(); + + } + + public void process(InputStream inputStream, String host, int port, boolean transactional) throws Exception { + HashMap<String, Object> connectionParams = new HashMap<>(); + connectionParams.put(TransportConstants.HOST_PROP_NAME, host); + connectionParams.put(TransportConstants.PORT_PROP_NAME, Integer.toString(port)); + ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams)); + ClientSessionFactory sf = serverLocator.createSessionFactory(); + + ClientSession session; + ClientSession managementSession; + + if (user != null || password != null) { + session = sf.createSession(user, password, false, !transactional, true, false, 0); + managementSession = sf.createSession(user, password, false, true, true, false, 0); + } else { + session = sf.createSession(false, !transactional, true); + managementSession = sf.createSession(false, true, true); + } + localSession = true; + + process(inputStream, session, managementSession); + } + + public void validate(String file) throws Exception { + validate(new FileInputStream(file)); + } + + public void validate(InputStream inputStream) throws Exception { + XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream); + SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + Schema schema = factory.newSchema(XmlDataImporter.findResource("schema/artemis-import-export.xsd")); + + Validator validator = schema.newValidator(); + validator.validate(new StAXSource(reader)); + reader.close(); + } + + private static URL findResource(final String resourceName) { + return AccessController.doPrivileged(new PrivilegedAction<URL>() { + @Override + public URL run() { + return ClassloadingUtil.findResource(resourceName); + } + }); + } + + private void processXml() throws Exception { + try { + while (reader.hasNext()) { + if (logger.isDebugEnabled()) { + logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] "); + } + if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) { + if (XmlDataConstants.QUEUE_BINDINGS_CHILD.equals(reader.getLocalName())) { + bindQueue(); + } else if (XmlDataConstants.ADDRESS_BINDINGS_CHILD.equals(reader.getLocalName())) { + bindAddress(); + } else if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) { + processMessage(); + } + } + reader.next(); + } + + if (!session.isAutoCommitSends()) { + session.commit(); + } + } finally { + // if the session was created in our constructor then close it (otherwise the caller will close it) + if (localSession) { + session.close(); + managementSession.close(); + } + } + } + + private void processMessage() throws Exception { + Byte type = 0; + Byte priority = 0; + Long expiration = 0L; + Long timestamp = 0L; + org.apache.activemq.artemis.utils.UUID userId = null; + ArrayList<String> queues = new ArrayList<>(); + + // get message's attributes + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + switch (attributeName) { + case XmlDataConstants.MESSAGE_TYPE: + type = getMessageType(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_PRIORITY: + priority = Byte.parseByte(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_EXPIRATION: + expiration = Long.parseLong(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_TIMESTAMP: + timestamp = Long.parseLong(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_USER_ID: + userId = UUIDGenerator.getInstance().generateUUID(); + break; + } + } + + Message message = session.createMessage(type, true, expiration, timestamp, priority); + message.setUserID(userId); + + boolean endLoop = false; + + // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.) + while (reader.hasNext()) { + int eventType = reader.getEventType(); + switch (eventType) { + case XMLStreamConstants.START_ELEMENT: + if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) { + processMessageBody(message.toCore()); + } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) { + processMessageProperties(message); + } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) { + processMessageQueues(queues); + } + break; + case XMLStreamConstants.END_ELEMENT: + if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) { + endLoop = true; + } + break; + } + if (endLoop) { + break; + } + reader.next(); + } + + sendMessage(queues, message); + } + + private Byte getMessageType(String value) { + Byte type = Message.DEFAULT_TYPE; + switch (value) { + case XmlDataConstants.DEFAULT_TYPE_PRETTY: + type = Message.DEFAULT_TYPE; + break; + case XmlDataConstants.BYTES_TYPE_PRETTY: + type = Message.BYTES_TYPE; + break; + case XmlDataConstants.MAP_TYPE_PRETTY: + type = Message.MAP_TYPE; + break; + case XmlDataConstants.OBJECT_TYPE_PRETTY: + type = Message.OBJECT_TYPE; + break; + case XmlDataConstants.STREAM_TYPE_PRETTY: + type = Message.STREAM_TYPE; + break; + case XmlDataConstants.TEXT_TYPE_PRETTY: + type = Message.TEXT_TYPE; + break; + } + return type; + } + + private void sendMessage(ArrayList<String> queues, Message message) throws Exception { + StringBuilder logMessage = new StringBuilder(); + String destination = addressMap.get(queues.get(0)); + + logMessage.append("Sending ").append(message).append(" to address: ").append(destination).append("; routed to queues: "); + ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8); + + for (String queue : queues) { + long queueID; + + if (queueIDs.containsKey(queue)) { + queueID = queueIDs.get(queue); + } else { + // Get the ID of the queues involved so the message can be routed properly. This is done because we cannot + // send directly to a queue, we have to send to an address instead but not all the queues related to the + // address may need the message + try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) { + ClientMessage managementMessage = managementSession.createMessage(false); + ManagementHelper.putAttribute(managementMessage, ResourceNames.QUEUE + queue, "ID"); + managementSession.start(); + if (logger.isDebugEnabled()) { + logger.debug("Requesting ID for: " + queue); + } + ClientMessage reply = requestor.request(managementMessage); + Number idObject = (Number) ManagementHelper.getResult(reply); + queueID = idObject.longValue(); + } + if (logger.isDebugEnabled()) { + logger.debug("ID for " + queue + " is: " + queueID); + } + queueIDs.put(queue, queueID); // store it so we don't have to look it up every time + } + + logMessage.append(queue).append(", "); + buffer.putLong(queueID); + } + + logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma + if (logger.isDebugEnabled()) { + logger.debug(logMessage); + } + + message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array()); + try (ClientProducer producer = session.createProducer(destination)) { + producer.send(message); + } + + if (tempFileName.length() > 0) { + File tempFile = new File(tempFileName); + if (!tempFile.delete()) { + ActiveMQServerLogger.LOGGER.couldNotDeleteTempFile(tempFileName); + } + tempFileName = ""; + } + } + + private void processMessageQueues(ArrayList<String> queues) { + for (int i = 0; i < reader.getAttributeCount(); i++) { + if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) { + queues.add(reader.getAttributeValue(i)); + } + } + } + + private void processMessageProperties(Message message) { + String key = ""; + String value = ""; + String propertyType = ""; + + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + switch (attributeName) { + case XmlDataConstants.PROPERTY_NAME: + key = reader.getAttributeValue(i); + break; + case XmlDataConstants.PROPERTY_VALUE: + value = reader.getAttributeValue(i); + break; + case XmlDataConstants.PROPERTY_TYPE: + propertyType = reader.getAttributeValue(i); + break; + } + } + + if (value.equals(XmlDataConstants.NULL)) { + value = null; + } + + switch (propertyType) { + case XmlDataConstants.PROPERTY_TYPE_SHORT: + message.putShortProperty(key, Short.parseShort(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_BOOLEAN: + message.putBooleanProperty(key, Boolean.parseBoolean(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_BYTE: + message.putByteProperty(key, Byte.parseByte(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_BYTES: + message.putBytesProperty(key, value == null ? null : decode(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_DOUBLE: + message.putDoubleProperty(key, Double.parseDouble(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_FLOAT: + message.putFloatProperty(key, Float.parseFloat(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_INTEGER: + message.putIntProperty(key, Integer.parseInt(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_LONG: + message.putLongProperty(key, Long.parseLong(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING: + message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_STRING: + message.putStringProperty(key, value); + break; + } + } + + private void processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException { + boolean isLarge = false; + + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) { + isLarge = Boolean.parseBoolean(reader.getAttributeValue(i)); + } + } + reader.next(); + if (logger.isDebugEnabled()) { + logger.debug("XMLStreamReader impl: " + reader); + } + if (isLarge) { + tempFileName = UUID.randomUUID().toString() + ".tmp"; + if (logger.isDebugEnabled()) { + logger.debug("Creating temp file " + tempFileName + " for large message."); + } + try (OutputStream out = new FileOutputStream(tempFileName)) { + getMessageBodyBytes(new MessageBodyBytesProcessor() { + @Override + public void processBodyBytes(byte[] bytes) throws IOException { + out.write(bytes); + } + }); + } + FileInputStream fileInputStream = new FileInputStream(tempFileName); + BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); + ((ClientMessage) message).setBodyInputStream(bufferedInput); + } else { + getMessageBodyBytes(new MessageBodyBytesProcessor() { + @Override + public void processBodyBytes(byte[] bytes) throws IOException { + message.getBodyBuffer().writeBytes(bytes); + } + }); + } + } + + /** + * Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't + * read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need + * to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each + * CDATA has to be decoded in its entirety. + * + * @param processor used to deal with the decoded CDATA elements + */ + private void getMessageBodyBytes(MessageBodyBytesProcessor processor) throws IOException, XMLStreamException { + int currentEventType; + StringBuilder cdata = new StringBuilder(); + while (reader.hasNext()) { + currentEventType = reader.getEventType(); + if (currentEventType == XMLStreamConstants.END_ELEMENT) { + break; + } else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) { + /* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to + * the processor, and reset the cdata for the next event(s) + */ + processor.processBodyBytes(decode(cdata.toString())); + cdata.setLength(0); + } else { + cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim()); + } + reader.next(); + } + } + + private void bindQueue() throws Exception { + String queueName = ""; + String address = ""; + String filter = ""; + String routingType = ""; + + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + switch (attributeName) { + case XmlDataConstants.QUEUE_BINDING_ADDRESS: + address = reader.getAttributeValue(i); + break; + case XmlDataConstants.QUEUE_BINDING_NAME: + queueName = reader.getAttributeValue(i); + break; + case XmlDataConstants.QUEUE_BINDING_FILTER_STRING: + filter = reader.getAttributeValue(i); + break; + case XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE: + routingType = reader.getAttributeValue(i); + break; + } + } + + ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName)); + + if (!queueQuery.isExists()) { + session.createQueue(address, RoutingType.valueOf(routingType), queueName, filter, true); + if (logger.isDebugEnabled()) { + logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")"); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("Binding " + queueName + " already exists so won't re-bind."); + } + } + + addressMap.put(queueName, address); + } + + private void bindAddress() throws Exception { + String addressName = ""; + String routingTypes = ""; + + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + switch (attributeName) { + case XmlDataConstants.ADDRESS_BINDING_NAME: + addressName = reader.getAttributeValue(i); + break; + case XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE: + routingTypes = reader.getAttributeValue(i); + break; + } + } + + ClientSession.AddressQuery addressQuery = session.addressQuery(new SimpleString(addressName)); + + if (!addressQuery.isExists()) { + Set<RoutingType> set = new HashSet<>(); + for (String routingType : ListUtil.toList(routingTypes)) { + set.add(RoutingType.valueOf(routingType)); + } + session.createAddress(SimpleString.toSimpleString(addressName), set, false); + if (logger.isDebugEnabled()) { + logger.debug("Binding address(name=" + addressName + ", routingTypes=" + routingTypes + ")"); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("Binding " + addressName + " already exists so won't re-bind."); + } + } + } + + private static byte[] decode(String data) { + return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } + + private interface MessageBodyBytesProcessor { + void processBodyBytes(byte[] bytes) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/AddUser.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/AddUser.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/AddUser.java index caa32a7..37bd676 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/AddUser.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/AddUser.java @@ -20,7 +20,6 @@ import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.util.HashUtil; -import org.apache.activemq.artemis.util.FileBasedSecStoreConfig; import org.apache.commons.lang3.StringUtils; /** @@ -53,7 +52,7 @@ public class AddUser extends PasswordAction { * @param role the role * @throws IllegalArgumentException if user exists */ - protected void add(String hash, String... role) throws Exception { + private void add(String hash, String... role) throws Exception { FileBasedSecStoreConfig config = getConfiguration(); config.addNewUser(username, hash, role); config.save(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/FileBasedSecStoreConfig.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/FileBasedSecStoreConfig.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/FileBasedSecStoreConfig.java new file mode 100644 index 0000000..1f8e297 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/FileBasedSecStoreConfig.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.user; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.utils.StringUtil; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; +import org.apache.commons.configuration2.builder.fluent.Configurations; + +class FileBasedSecStoreConfig { + + private static final String LICENSE_HEADER = + "## ---------------------------------------------------------------------------\n" + + "## Licensed to the Apache Software Foundation (ASF) under one or more\n" + + "## contributor license agreements. See the NOTICE file distributed with\n" + + "## this work for additional information regarding copyright ownership.\n" + + "## The ASF licenses this file to You under the Apache License, Version 2.0\n" + + "## (the \"License\"); you may not use this file except in compliance with\n" + + "## the License. You may obtain a copy of the License at\n" + + "##\n" + + "## http://www.apache.org/licenses/LICENSE-2.0\n" + + "##\n" + + "## Unless required by applicable law or agreed to in writing, software\n" + + "## distributed under the License is distributed on an \"AS IS\" BASIS,\n" + + "## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" + + "## See the License for the specific language governing permissions and\n" + + "## limitations under the License.\n" + + "## ---------------------------------------------------------------------------\n"; + private FileBasedConfigurationBuilder<PropertiesConfiguration> userBuilder; + private FileBasedConfigurationBuilder<PropertiesConfiguration> roleBuilder; + private PropertiesConfiguration userConfig; + private PropertiesConfiguration roleConfig; + + FileBasedSecStoreConfig(File userFile, File roleFile) throws Exception { + Configurations configs = new Configurations(); + userBuilder = configs.propertiesBuilder(userFile); + roleBuilder = configs.propertiesBuilder(roleFile); + userConfig = userBuilder.getConfiguration(); + roleConfig = roleBuilder.getConfiguration(); + + String roleHeader = roleConfig.getLayout().getHeaderComment(); + String userHeader = userConfig.getLayout().getHeaderComment(); + + if (userHeader == null) { + if (userConfig.isEmpty()) { + //clean and reset header + userConfig.clear(); + userConfig.setHeader(LICENSE_HEADER); + } + } + + if (roleHeader == null) { + if (roleConfig.isEmpty()) { + //clean and reset header + roleConfig.clear(); + roleConfig.setHeader(LICENSE_HEADER); + } + } + } + + void addNewUser(String username, String hash, String... roles) throws Exception { + if (userConfig.getString(username) != null) { + throw new IllegalArgumentException("User already exist: " + username); + } + userConfig.addProperty(username, hash); + addRoles(username, roles); + } + + void save() throws Exception { + userBuilder.save(); + roleBuilder.save(); + } + + void removeUser(String username) throws Exception { + if (userConfig.getProperty(username) == null) { + throw new IllegalArgumentException("user " + username + " doesn't exist."); + } + userConfig.clearProperty(username); + removeRoles(username); + } + + List<String> listUser(String username) { + List<String> result = new ArrayList<>(); + result.add("--- \"user\"(roles) ---\n"); + + int totalUsers = 0; + if (username != null) { + String roles = findRoles(username); + result.add("\"" + username + "\"(" + roles + ")"); + totalUsers++; + } else { + Iterator<String> iter = userConfig.getKeys(); + while (iter.hasNext()) { + String keyUser = iter.next(); + String roles = findRoles(keyUser); + result.add("\"" + keyUser + "\"(" + roles + ")"); + totalUsers++; + } + } + result.add("\n Total: " + totalUsers); + return result; + } + + void updateUser(String username, String password, String[] roles) { + String oldPassword = (String) userConfig.getProperty(username); + if (oldPassword == null) { + throw new IllegalArgumentException("user " + username + " doesn't exist."); + } + + if (password != null) { + userConfig.setProperty(username, password); + } + + if (roles != null && roles.length > 0) { + + removeRoles(username); + addRoles(username, roles); + } + } + + private String findRoles(String uname) { + Iterator<String> iter = roleConfig.getKeys(); + StringBuilder builder = new StringBuilder(); + boolean first = true; + while (iter.hasNext()) { + String role = iter.next(); + List<String> names = roleConfig.getList(String.class, role); + for (String value : names) { + //each value may be a comma separated list + String[] items = value.split(","); + for (String item : items) { + if (item.equals(uname)) { + if (!first) { + builder.append(","); + } + builder.append(role); + first = false; + } + } + } + } + + return builder.toString(); + } + + private void addRoles(String username, String[] roles) { + for (String role : roles) { + List<String> users = roleConfig.getList(String.class, role); + if (users == null) { + users = new ArrayList<>(); + } + users.add(username); + roleConfig.setProperty(role, StringUtil.joinStringList(users, ",")); + } + } + + private void removeRoles(String username) { + + Iterator<String> iterKeys = roleConfig.getKeys(); + + List<Pair<String, List<String>>> updateMap = new ArrayList<>(); + while (iterKeys.hasNext()) { + String theRole = iterKeys.next(); + + List<String> userList = roleConfig.getList(String.class, theRole); + List<String> newList = new ArrayList<>(); + + boolean roleChaned = false; + for (String value : userList) { + //each value may be comma separated. + List<String> update = new ArrayList<>(); + String[] items = value.split(","); + boolean found = false; + for (String item : items) { + if (!item.equals(username)) { + update.add(item); + } else { + found = true; + roleChaned = true; + } + } + if (found) { + if (update.size() > 0) { + newList.add(StringUtil.joinStringList(update, ",")); + } + } + } + if (roleChaned) { + updateMap.add(new Pair(theRole, newList)); + } + } + //do update + Iterator<Pair<String, List<String>>> iterUpdate = updateMap.iterator(); + while (iterUpdate.hasNext()) { + Pair<String, List<String>> entry = iterUpdate.next(); + roleConfig.clearProperty(entry.getA()); + if (entry.getB().size() > 0) { + roleConfig.addProperty(entry.getA(), entry.getB()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/HelpUser.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/HelpUser.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/HelpUser.java index 36c0348..2e0ce2b 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/HelpUser.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/HelpUser.java @@ -16,15 +16,15 @@ */ package org.apache.activemq.artemis.cli.commands.user; +import java.io.File; +import java.util.ArrayList; +import java.util.List; + import io.airlift.airline.Help; import org.apache.activemq.artemis.cli.commands.Action; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.InvalidOptionsError; -import org.apache.activemq.artemis.util.OptionsUtil; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; +import org.apache.activemq.artemis.cli.commands.OptionsUtil; public class HelpUser extends Help implements Action { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ListUser.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ListUser.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ListUser.java index 136a417..c0fb979 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ListUser.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ListUser.java @@ -20,7 +20,6 @@ import java.util.List; import io.airlift.airline.Command; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.util.FileBasedSecStoreConfig; /** * list existing users, example: @@ -42,7 +41,7 @@ public class ListUser extends UserAction { * list a single user or all users * if username is not specified */ - protected void list() throws Exception { + private void list() throws Exception { FileBasedSecStoreConfig config = getConfiguration(); List<String> result = config.listUser(username); for (String str : result) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/PasswordAction.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/PasswordAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/PasswordAction.java index 2260488..aeba55a 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/PasswordAction.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/PasswordAction.java @@ -24,7 +24,7 @@ public class PasswordAction extends UserAction { @Option(name = "--password", description = "the password (Default: input)") String password; - protected void checkInputPassword() { + void checkInputPassword() { if (password == null) { password = inputPassword("--password", "Please provide the password:", null); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/RemoveUser.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/RemoveUser.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/RemoveUser.java index 70167da..a9dce8d 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/RemoveUser.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/RemoveUser.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.cli.commands.user; import io.airlift.airline.Command; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.util.FileBasedSecStoreConfig; /** * Remove a user, example: @@ -35,7 +34,7 @@ public class RemoveUser extends UserAction { return null; } - protected void remove() throws Exception { + private void remove() throws Exception { FileBasedSecStoreConfig config = getConfiguration(); config.removeUser(username); config.save(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ResetUser.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ResetUser.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ResetUser.java index c219ef5..2e3e725 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ResetUser.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/ResetUser.java @@ -20,7 +20,6 @@ import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.util.HashUtil; -import org.apache.activemq.artemis.util.FileBasedSecStoreConfig; import org.apache.commons.lang3.StringUtils; /** @@ -53,7 +52,7 @@ public class ResetUser extends PasswordAction { return null; } - protected void reset(String password, String[] roles) throws Exception { + private void reset(String password, String[] roles) throws Exception { if (password == null && roles == null) { context.err.println("Nothing to update."); return; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/UserAction.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/UserAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/UserAction.java index 2f7c77f..2a23fa6 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/UserAction.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/user/UserAction.java @@ -16,15 +16,14 @@ */ package org.apache.activemq.artemis.cli.commands.user; -import io.airlift.airline.Option; -import org.apache.activemq.artemis.cli.commands.InputAbstract; -import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule; -import org.apache.activemq.artemis.util.FileBasedSecStoreConfig; - import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.Configuration; import java.io.File; +import io.airlift.airline.Option; +import org.apache.activemq.artemis.cli.commands.InputAbstract; +import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule; + import static org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule.ROLE_FILE_PROP_NAME; import static org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule.USER_FILE_PROP_NAME; @@ -39,23 +38,19 @@ public abstract class UserAction extends InputAbstract { @Option(name = "--entry", description = "The appConfigurationEntry (default: activemq)") String entry = "activemq"; - protected void checkInputUser() { + void checkInputUser() { if (username == null) { username = input("--user", "Please provider the userName:", null); } } - public void setRole(String role) { - this.role = role; - } - - public void checkInputRole() { + void checkInputRole() { if (role == null) { role = input("--role", "type a comma separated list of roles", null); } } - protected FileBasedSecStoreConfig getConfiguration() throws Exception { + FileBasedSecStoreConfig getConfiguration() throws Exception { Configuration securityConfig = Configuration.getConfiguration(); AppConfigurationEntry[] entries = securityConfig.getAppConfigurationEntry(entry); @@ -82,4 +77,8 @@ public abstract class UserAction extends InputAbstract { public void setUsername(String username) { this.username = username; } + + public void setRole(String role) { + this.role = role; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java deleted file mode 100644 index fc63518..0000000 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.cli.commands.util; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import java.util.Enumeration; -import java.util.concurrent.CountDownLatch; - -public class ConsumerThread extends Thread { - - int messageCount = 1000; - int receiveTimeOut = 3000; - Destination destination; - Session session; - boolean durable; - boolean breakOnNull = true; - int sleep; - int batchSize; - boolean verbose; - boolean browse; - - String filter; - - int received = 0; - int transactions = 0; - boolean running = false; - CountDownLatch finished; - boolean bytesAsText; - - public ConsumerThread(Session session, Destination destination, int threadNr) { - super("Consumer " + destination.toString() + ", thread=" + threadNr); - this.destination = destination; - this.session = session; - } - - @Override - public void run() { - if (browse) { - browse(); - } else { - consume(); - } - } - - public void browse() { - running = true; - QueueBrowser consumer = null; - String threadName = Thread.currentThread().getName(); - System.out.println(threadName + " wait until " + messageCount + " messages are consumed"); - try { - if (filter != null) { - consumer = session.createBrowser((Queue) destination, filter); - } else { - consumer = session.createBrowser((Queue) destination); - } - Enumeration<Message> enumBrowse = consumer.getEnumeration(); - - while (enumBrowse.hasMoreElements()) { - Message msg = enumBrowse.nextElement(); - if (msg != null) { - System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); - - if (verbose) { - System.out.println("..." + msg); - } - if (bytesAsText && (msg instanceof BytesMessage)) { - long length = ((BytesMessage) msg).getBodyLength(); - byte[] bytes = new byte[(int) length]; - ((BytesMessage) msg).readBytes(bytes); - System.out.println("Message:" + msg); - } - received++; - - if (received >= messageCount) { - break; - } - } else { - break; - } - - if (sleep > 0) { - Thread.sleep(sleep); - } - - } - - consumer.close(); - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (finished != null) { - finished.countDown(); - } - if (consumer != null) { - System.out.println(threadName + " Consumed: " + this.getReceived() + " messages"); - try { - consumer.close(); - } catch (JMSException e) { - e.printStackTrace(); - } - } - } - - System.out.println(threadName + " Consumer thread finished"); - } - - public void consume() { - running = true; - MessageConsumer consumer = null; - String threadName = Thread.currentThread().getName(); - System.out.println(threadName + " wait until " + messageCount + " messages are consumed"); - try { - if (durable && destination instanceof Topic) { - if (filter != null) { - consumer = session.createDurableSubscriber((Topic) destination, getName(), filter, false); - } else { - consumer = session.createDurableSubscriber((Topic) destination, getName()); - } - } else { - if (filter != null) { - consumer = session.createConsumer(destination, filter); - } else { - consumer = session.createConsumer(destination); - } - } - while (running && received < messageCount) { - Message msg = consumer.receive(receiveTimeOut); - if (msg != null) { - System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); - if (verbose) { - System.out.println("..." + msg); - } - if (bytesAsText && (msg instanceof BytesMessage)) { - long length = ((BytesMessage) msg).getBodyLength(); - byte[] bytes = new byte[(int) length]; - ((BytesMessage) msg).readBytes(bytes); - System.out.println("Message:" + msg); - } - received++; - } else { - if (breakOnNull) { - break; - } - } - - if (session.getTransacted()) { - if (batchSize > 0 && received > 0 && received % batchSize == 0) { - System.out.println(threadName + " Committing transaction: " + transactions++); - session.commit(); - } - } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { - if (batchSize > 0 && received > 0 && received % batchSize == 0) { - System.out.println("Acknowledging last " + batchSize + " messages; messages so far = " + received); - msg.acknowledge(); - } - } - if (sleep > 0) { - Thread.sleep(sleep); - } - - } - - try { - session.commit(); - } catch (Throwable ignored) { - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (finished != null) { - finished.countDown(); - } - if (consumer != null) { - System.out.println(threadName + " Consumed: " + this.getReceived() + " messages"); - try { - consumer.close(); - } catch (JMSException e) { - e.printStackTrace(); - } - } - } - - System.out.println(threadName + " Consumer thread finished"); - } - - public int getReceived() { - return received; - } - - public boolean isDurable() { - return durable; - } - - public ConsumerThread setDurable(boolean durable) { - this.durable = durable; - return this; - } - - public ConsumerThread setMessageCount(int messageCount) { - this.messageCount = messageCount; - return this; - } - - public ConsumerThread setBreakOnNull(boolean breakOnNull) { - this.breakOnNull = breakOnNull; - return this; - } - - public int getBatchSize() { - return batchSize; - } - - public ConsumerThread setBatchSize(int batchSize) { - this.batchSize = batchSize; - return this; - } - - public int getMessageCount() { - return messageCount; - } - - public boolean isBreakOnNull() { - return breakOnNull; - } - - public int getReceiveTimeOut() { - return receiveTimeOut; - } - - public ConsumerThread setReceiveTimeOut(int receiveTimeOut) { - this.receiveTimeOut = receiveTimeOut; - return this; - } - - public boolean isRunning() { - return running; - } - - public ConsumerThread setRunning(boolean running) { - this.running = running; - return this; - } - - public int getSleep() { - return sleep; - } - - public ConsumerThread setSleep(int sleep) { - this.sleep = sleep; - return this; - } - - public CountDownLatch getFinished() { - return finished; - } - - public ConsumerThread setFinished(CountDownLatch finished) { - this.finished = finished; - return this; - } - - public boolean isBytesAsText() { - return bytesAsText; - } - - public boolean isVerbose() { - return verbose; - } - - public ConsumerThread setVerbose(boolean verbose) { - this.verbose = verbose; - return this; - } - - public ConsumerThread setBytesAsText(boolean bytesAsText) { - this.bytesAsText = bytesAsText; - return this; - } - - public String getFilter() { - return filter; - } - - public ConsumerThread setFilter(String filter) { - this.filter = filter; - return this; - } - - public boolean isBrowse() { - return browse; - } - - public ConsumerThread setBrowse(boolean browse) { - this.browse = browse; - return this; - } -}
