http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java deleted file mode 100644 index e5fd80c..0000000 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.cli.commands.tools; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.PrintStream; -import java.util.List; - -import io.airlift.airline.Command; -import io.airlift.airline.Option; -import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.core.io.SequentialFileFactory; -import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; -import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.impl.JournalFile; -import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback; -import org.apache.activemq.artemis.utils.Base64; - -@Command(name = "encode", description = "Encode a set of journal files into an internal encoded data format") -public class EncodeJournal extends LockAbstract { - - @Option(name = "--directory", description = "The journal folder (default the journal folder from broker.xml)") - public String directory; - - @Option(name = "--prefix", description = "The journal prefix (default activemq-data)") - public String prefix = "activemq-data"; - - @Option(name = "--suffix", description = "The journal suffix (default amq)") - public String suffix = "amq"; - - @Option(name = "--file-size", description = "The journal size (default 10485760)") - public int size = 10485760; - - @Override - public Object execute(ActionContext context) throws Exception { - super.execute(context); - try { - if (directory == null) { - directory = getFileConfiguration().getJournalDirectory(); - } - - exportJournal(directory, prefix, suffix, 2, size); - } catch (Exception e) { - treatError(e, "data", "encode"); - } - - return null; - } - - public static void exportJournal(final String directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize) throws Exception { - - exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, System.out); - } - - public static void exportJournal(final String directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize, - final String fileName) throws Exception { - try (FileOutputStream fileOutputStream = new FileOutputStream(fileName); - BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream); - PrintStream out = new PrintStream(bufferedOutputStream)) { - exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out); - } - } - - public static void exportJournal(final String directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize, - final PrintStream out) throws Exception { - NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1); - - JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); - - List<JournalFile> files = journal.orderFiles(); - - for (JournalFile file : files) { - out.println("#File," + file); - - exportJournalFile(out, nio, file); - } - } - - /** - * @param out - * @param fileFactory - * @param file - * @throws Exception - */ - public static void exportJournalFile(final PrintStream out, - final SequentialFileFactory fileFactory, - final JournalFile file) throws Exception { - JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() { - - @Override - public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception { - out.println("operation@UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo)); - } - - @Override - public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception { - out.println("operation@Update," + describeRecord(recordInfo)); - } - - @Override - public void onReadRollbackRecord(final long transactionID) throws Exception { - out.println("operation@Rollback,txID@" + transactionID); - } - - @Override - public void onReadPrepareRecord(final long transactionID, - final byte[] extraData, - final int numberOfRecords) throws Exception { - out.println("operation@Prepare,txID@" + transactionID + - ",numberOfRecords@" + - numberOfRecords + - ",extraData@" + - encode(extraData)); - } - - @Override - public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception { - out.println("operation@DeleteRecordTX,txID@" + transactionID + - "," + - describeRecord(recordInfo)); - } - - @Override - public void onReadDeleteRecord(final long recordID) throws Exception { - out.println("operation@DeleteRecord,id@" + recordID); - } - - @Override - public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception { - out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords); - } - - @Override - public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception { - out.println("operation@AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo)); - } - - @Override - public void onReadAddRecord(final RecordInfo recordInfo) throws Exception { - out.println("operation@AddRecord," + describeRecord(recordInfo)); - } - - @Override - public void markAsDataFile(final JournalFile file) { - } - }); - } - - private static String describeRecord(final RecordInfo recordInfo) { - return "id@" + recordInfo.id + - ",userRecordType@" + - recordInfo.userRecordType + - ",length@" + - recordInfo.data.length + - ",isUpdate@" + - recordInfo.isUpdate + - ",compactCount@" + - recordInfo.compactCount + - ",data@" + - encode(recordInfo.data); - } - - private static String encode(final byte[] data) { - return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); - } - - public void printUsage() { - for (int i = 0; i < 10; i++) { - System.err.println(); - } - System.err.println("This method will export the journal at low level record."); - System.err.println(); - System.err.println(); - for (int i = 0; i < 10; i++) { - System.err.println(); - } - } - -}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java index a7a27a6..86b9a60 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java @@ -25,7 +25,7 @@ import io.airlift.airline.Help; import org.apache.activemq.artemis.cli.commands.Action; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.InvalidOptionsError; -import org.apache.activemq.artemis.util.OptionsUtil; +import org.apache.activemq.artemis.cli.commands.OptionsUtil; public class HelpData extends Help implements Action { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/LockAbstract.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/LockAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/LockAbstract.java index 5bffb36..cbc5234 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/LockAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/LockAbstract.java @@ -32,15 +32,6 @@ public abstract class LockAbstract extends DataAbstract { private static RandomAccessFile serverLockFile = null; private static FileLock serverLockLock = null; - protected File getLockPlace() throws Exception { - String brokerInstance = getBrokerInstance(); - if (brokerInstance != null) { - return new File(new File(brokerInstance), "lock"); - } else { - return null; - } - } - public static void unlock() { try { if (serverLockFile != null) { @@ -70,7 +61,7 @@ public abstract class LockAbstract extends DataAbstract { return null; } - protected void lockCLI(File lockPlace) throws Exception { + void lockCLI(File lockPlace) throws Exception { if (lockPlace != null) { lockPlace.mkdirs(); if (serverLockFile == null) { @@ -89,4 +80,12 @@ public abstract class LockAbstract extends DataAbstract { } } + private File getLockPlace() throws Exception { + String brokerInstance = getBrokerInstance(); + if (brokerInstance != null) { + return new File(new File(brokerInstance), "lock"); + } else { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PerfJournal.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PerfJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PerfJournal.java deleted file mode 100644 index f7d89ec..0000000 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PerfJournal.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.cli.commands.tools; - -import java.text.DecimalFormat; - -import io.airlift.airline.Command; -import io.airlift.airline.Option; -import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; -import org.apache.activemq.artemis.core.config.impl.FileConfiguration; -import org.apache.activemq.artemis.core.server.JournalType; - -@Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder") -public class PerfJournal extends LockAbstract { - - - @Option(name = "--block-size", description = "The block size for each write (default 4096)") - public int size = 4 * 1024; - - - @Option(name = "--writes", description = "The number of writes to be performed (default 250)") - public int writes = 250; - - @Option(name = "--tries", description = "The number of tries for the test (default 5)") - public int tries = 5; - - @Option(name = "--no-sync", description = "Disable sync") - public boolean nosyncs = false; - - @Option(name = "--sync", description = "Enable syncs") - public boolean syncs = false; - - @Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)") - public String journalType = null; - - - @Override - public Object execute(ActionContext context) throws Exception { - super.execute(context); - - FileConfiguration fileConfiguration = getFileConfiguration(); - - if (nosyncs) { - fileConfiguration.setJournalDatasync(false); - } else if (syncs) { - fileConfiguration.setJournalDatasync(true); - } - - - if (journalType != null) { - fileConfiguration.setJournalType(JournalType.getType(journalType)); - } - - System.out.println(""); - System.out.println("Auto tuning journal ..."); - - System.out.println("Performing " + tries + " tests writing " + writes + " blocks of " + size + " on each test, sync=" + fileConfiguration.isJournalDatasync() + " with journalType = " + fileConfiguration.getJournalType()); - - fileConfiguration.getJournalLocation().mkdirs(); - - long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType()); - - long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose); - double writesPerMillisecond = (double) writes / (double) time; - - String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond); - - context.out.println("Your system can execute " + writesPerMillisecondStr + " syncs per millisecond"); - context.out.println("Your journal-buffer-timeout should be:" + nanosecondsWait); - context.out.println("You should use this following configuration:"); - context.out.println(); - context.out.println("<journal-buffer-timeout>" + nanosecondsWait + "</journal-buffer-timeout>"); - - return null; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java index 2816aaf..d5e895d 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.cli.commands.tools; import java.io.File; +import java.io.InputStream; +import java.io.OutputStream; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -31,7 +33,6 @@ import io.airlift.airline.Command; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.cli.Artemis; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; @@ -81,7 +82,7 @@ public class PrintData extends OptionalLocking { public static void printData(File bindingsDirectory, File messagesDirectory, File pagingDirectory) throws Exception { // Having the version on the data report is an information very useful to understand what happened // When debugging stuff - Artemis.printBanner(); + printBanner(); File serverLockFile = new File(messagesDirectory, "server.lock"); @@ -135,6 +136,20 @@ public class PrintData extends OptionalLocking { } + public static void printBanner() throws Exception { + copy(PrintData.class.getResourceAsStream("banner.txt"), System.out); + } + + private static long copy(InputStream in, OutputStream out) throws Exception { + byte[] buffer = new byte[1024]; + int len = in.read(buffer); + while (len != -1) { + out.write(buffer, 0, len); + len = in.read(buffer); + } + return len; + } + private static void printPages(File pageDirectory, DescribeJournal describeJournal) { try { @@ -214,12 +229,9 @@ public class PrintData extends OptionalLocking { System.out.println(); msgID++; } - pgid++; - } } - } catch (Exception e) { e.printStackTrace(); } @@ -228,7 +240,7 @@ public class PrintData extends OptionalLocking { /** * Calculate the acks on the page system */ - protected static PageCursorsInfo calculateCursorsInfo(List<RecordInfo> records) throws Exception { + private static PageCursorsInfo calculateCursorsInfo(List<RecordInfo> records) throws Exception { PageCursorsInfo cursorInfo = new PageCursorsInfo(); @@ -293,25 +305,18 @@ public class PrintData extends OptionalLocking { /** * @return the pgTXs */ - public Set<Long> getPgTXs() { + Set<Long> getPgTXs() { return pgTXs; } /** * @return the cursorRecords */ - public Map<Long, Set<PagePosition>> getCursorRecords() { + Map<Long, Set<PagePosition>> getCursorRecords() { return cursorRecords; } - /** - * @return the completePages - */ - public Map<Long, Set<Long>> getCompletePages() { - return completePages; - } - - public Set<Long> getCompletePages(Long queueID) { + Set<Long> getCompletePages(Long queueID) { Set<Long> completePagesSet = completePages.get(queueID); if (completePagesSet == null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java deleted file mode 100644 index be7e84e..0000000 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.cli.commands.tools; - -/** - * The constants shared by <code>org.apache.activemq.tools.XmlDataImporter</code> and - * <code>org.apache.activemq.tools.XmlDataExporter</code>. - */ -public final class XmlDataConstants { - - private XmlDataConstants() { - // Utility - } - - public static final String XML_VERSION = "1.0"; - public static final String DOCUMENT_PARENT = "activemq-journal"; - public static final String BINDINGS_PARENT = "bindings"; - - public static final String QUEUE_BINDINGS_CHILD = "queue-binding"; - public static final String QUEUE_BINDING_ADDRESS = "address"; - public static final String QUEUE_BINDING_FILTER_STRING = "filter-string"; - public static final String QUEUE_BINDING_NAME = "name"; - public static final String QUEUE_BINDING_ID = "id"; - public static final String QUEUE_BINDING_ROUTING_TYPE = "routing-type"; - - public static final String ADDRESS_BINDINGS_CHILD = "address-binding"; - public static final String ADDRESS_BINDING_NAME = "name"; - public static final String ADDRESS_BINDING_ID = "id"; - public static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types"; - - public static final String MESSAGES_PARENT = "messages"; - public static final String MESSAGES_CHILD = "message"; - public static final String MESSAGE_ID = "id"; - public static final String MESSAGE_PRIORITY = "priority"; - public static final String MESSAGE_EXPIRATION = "expiration"; - public static final String MESSAGE_TIMESTAMP = "timestamp"; - public static final String DEFAULT_TYPE_PRETTY = "default"; - public static final String BYTES_TYPE_PRETTY = "bytes"; - public static final String MAP_TYPE_PRETTY = "map"; - public static final String OBJECT_TYPE_PRETTY = "object"; - public static final String STREAM_TYPE_PRETTY = "stream"; - public static final String TEXT_TYPE_PRETTY = "text"; - public static final String MESSAGE_TYPE = "type"; - public static final String MESSAGE_IS_LARGE = "isLarge"; - public static final String MESSAGE_USER_ID = "user-id"; - public static final String MESSAGE_BODY = "body"; - public static final String PROPERTIES_PARENT = "properties"; - public static final String PROPERTIES_CHILD = "property"; - public static final String PROPERTY_NAME = "name"; - public static final String PROPERTY_VALUE = "value"; - public static final String PROPERTY_TYPE = "type"; - public static final String QUEUES_PARENT = "queues"; - public static final String QUEUES_CHILD = "queue"; - public static final String QUEUE_NAME = "name"; - public static final String PROPERTY_TYPE_BOOLEAN = "boolean"; - public static final String PROPERTY_TYPE_BYTE = "byte"; - public static final String PROPERTY_TYPE_BYTES = "bytes"; - public static final String PROPERTY_TYPE_SHORT = "short"; - public static final String PROPERTY_TYPE_INTEGER = "integer"; - public static final String PROPERTY_TYPE_LONG = "long"; - public static final String PROPERTY_TYPE_FLOAT = "float"; - public static final String PROPERTY_TYPE_DOUBLE = "double"; - public static final String PROPERTY_TYPE_STRING = "string"; - public static final String PROPERTY_TYPE_SIMPLE_STRING = "simple-string"; - - static final String JMS_CONNECTION_FACTORY_NAME = "name"; - static final String JMS_CONNECTION_FACTORY_CLIENT_ID = "client-id"; - static final String JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT = "call-failover-timeout"; - static final String JMS_CONNECTION_FACTORY_CALL_TIMEOUT = "call-timeout"; - static final String JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD = "client-failure-check-period"; - static final String JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE = "confirmation-window-size"; - static final String JMS_CONNECTION_FACTORY_CONNECTION_TTL = "connection-ttl"; - static final String JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE = "consumer-max-rate"; - static final String JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE = "consumer-window-size"; - static final String JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME = "discovery-group-name"; - static final String JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE = "dups-ok-batch-size"; - static final String JMS_CONNECTION_FACTORY_TYPE = "type"; - static final String JMS_CONNECTION_FACTORY_GROUP_ID = "group-id"; - static final String JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME = "load-balancing-policy-class-name"; - static final String JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL = "max-retry-interval"; - static final String JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE = "min-large-message-size"; - static final String JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE = "producer-max-rate"; - static final String JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE = "producer-window-size"; - static final String JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS = "reconnect-attempts"; - static final String JMS_CONNECTION_FACTORY_RETRY_INTERVAL = "retry-interval"; - static final String JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER = "retry-interval-multiplier"; - static final String JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE = "scheduled-thread-pool-max-size"; - static final String JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE = "thread-pool-max-size"; - static final String JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE = "transaction-batch-size"; - static final String JMS_CONNECTION_FACTORY_CONNECTORS = "connectors"; - static final String JMS_CONNECTION_FACTORY_CONNECTOR = "connector"; - static final String JMS_CONNECTION_FACTORY_AUTO_GROUP = "auto-group"; - static final String JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE = "block-on-acknowledge"; - static final String JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND = "block-on-durable-send"; - static final String JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND = "block-on-non-durable-send"; - static final String JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT = "cache-large-messages-client"; - static final String JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES = "compress-large-messages"; - static final String JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION = "failover-on-initial-connection"; - static final String JMS_CONNECTION_FACTORY_HA = "ha"; - static final String JMS_CONNECTION_FACTORY_PREACKNOWLEDGE = "preacknowledge"; - static final String JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS = "use-global-pools"; - - static final String JMS_DESTINATIONS = "jms-destinations"; - static final String JMS_DESTINATION = "jms-destination"; - static final String JMS_DESTINATION_NAME = "name"; - static final String JMS_DESTINATION_SELECTOR = "selector"; - static final String JMS_DESTINATION_TYPE = "type"; - - static final String JMS_JNDI_ENTRIES = "entries"; - static final String JMS_JNDI_ENTRY = "entry"; - - public static final String JNDI_COMPATIBILITY_PREFIX = "java:jboss/exported/"; - - static final String NULL = "_AMQ_NULL"; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java deleted file mode 100644 index d2f6204..0000000 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java +++ /dev/null @@ -1,626 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.cli.commands.tools; - -import javax.xml.stream.XMLOutputFactory; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.XMLStreamWriter; -import java.io.File; -import java.io.OutputStream; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - -import io.airlift.airline.Command; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ICoreMessage; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; -import org.apache.activemq.artemis.core.journal.Journal; -import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; -import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; -import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.message.LargeBodyEncoder; -import org.apache.activemq.artemis.core.paging.PagedMessage; -import org.apache.activemq.artemis.core.paging.PagingManager; -import org.apache.activemq.artemis.core.paging.PagingStore; -import org.apache.activemq.artemis.core.paging.PagingStoreFactory; -import org.apache.activemq.artemis.core.paging.cursor.PagePosition; -import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl; -import org.apache.activemq.artemis.core.paging.impl.Page; -import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl; -import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; -import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; -import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe; -import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal; -import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.MessageDescribe; -import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe; -import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; -import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; -import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding; -import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding; -import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding; -import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.JournalType; -import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.settings.HierarchicalRepository; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; -import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; -import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; - -@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.") -public final class XmlDataExporter extends OptionalLocking { - - private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L; - - private JournalStorageManager storageManager; - - private Configuration config; - - private XMLStreamWriter xmlWriter; - - // an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID - private final Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs = new HashMap<>(); - - // map of all message records hashed by their record ID (which will match the record ID of the message refs) - private final HashMap<Long, Message> messages = new HashMap<>(); - - private final Map<Long, Set<PagePosition>> cursorRecords = new HashMap<>(); - - private final Set<Long> pgTXs = new HashSet<>(); - - private final HashMap<Long, PersistentQueueBindingEncoding> queueBindings = new HashMap<>(); - - private final HashMap<Long, PersistentAddressBindingEncoding> addressBindings = new HashMap<>(); - - long messagesPrinted = 0L; - - long bindingsPrinted = 0L; - - @Override - public Object execute(ActionContext context) throws Exception { - super.execute(context); - - try { - process(context.out, getBinding(), getJournal(), getPaging(), getLargeMessages()); - } catch (Exception e) { - treatError(e, "data", "exp"); - } - return null; - } - - public void process(OutputStream out, - String bindingsDir, - String journalDir, - String pagingDir, - String largeMessagesDir) throws Exception { - config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO); - final ExecutorService executor = Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()); - ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); - - storageManager = new JournalStorageManager(config, executorFactory, executorFactory); - - XMLOutputFactory factory = XMLOutputFactory.newInstance(); - XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8"); - PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter); - xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler); - - writeXMLData(); - - executor.shutdown(); - } - - private void writeXMLData() throws Exception { - long start = System.currentTimeMillis(); - getBindings(); - processMessageJournal(); - printDataAsXML(); - ActiveMQServerLogger.LOGGER.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms"); - ActiveMQServerLogger.LOGGER.debug("Output " + messagesPrinted + " messages and " + bindingsPrinted + " bindings."); - } - - /** - * Read through the message journal and stuff all the events/data we care about into local data structures. We'll - * use this data later to print all the right information. - * - * @throws Exception will be thrown if anything goes wrong reading the journal - */ - private void processMessageJournal() throws Exception { - ArrayList<RecordInfo> acks = new ArrayList<>(); - - List<RecordInfo> records = new LinkedList<>(); - - // We load these, but don't use them. - List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>(); - - Journal messageJournal = storageManager.getMessageJournal(); - - ActiveMQServerLogger.LOGGER.debug("Reading journal from " + config.getJournalDirectory()); - - messageJournal.start(); - - // Just logging these, no action necessary - TransactionFailureCallback transactionFailureCallback = new TransactionFailureCallback() { - @Override - public void failedTransaction(long transactionID, - List<RecordInfo> records1, - List<RecordInfo> recordsToDelete) { - StringBuilder message = new StringBuilder(); - message.append("Encountered failed journal transaction: ").append(transactionID); - for (int i = 0; i < records1.size(); i++) { - if (i == 0) { - message.append("; Records: "); - } - message.append(records1.get(i)); - if (i != (records1.size() - 1)) { - message.append(", "); - } - } - - for (int i = 0; i < recordsToDelete.size(); i++) { - if (i == 0) { - message.append("; RecordsToDelete: "); - } - message.append(recordsToDelete.get(i)); - if (i != (recordsToDelete.size() - 1)) { - message.append(", "); - } - } - - ActiveMQServerLogger.LOGGER.debug(message.toString()); - } - }; - - ((JournalImpl) messageJournal).load(records, preparedTransactions, transactionFailureCallback, false); - - // Since we don't use these nullify the reference so that the garbage collector can clean them up - preparedTransactions = null; - - for (RecordInfo info : records) { - byte[] data = info.data; - - ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); - - Object o = DescribeJournal.newObjectEncoding(info, storageManager); - if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE) { - messages.put(info.id, ((MessageDescribe) o).getMsg().toCore()); - } else if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE_PROTOCOL) { - messages.put(info.id, ((MessageDescribe) o).getMsg().toCore()); - } else if (info.getUserRecordType() == JournalRecordIds.ADD_LARGE_MESSAGE) { - messages.put(info.id, ((MessageDescribe) o).getMsg()); - } else if (info.getUserRecordType() == JournalRecordIds.ADD_REF) { - ReferenceDescribe ref = (ReferenceDescribe) o; - HashMap<Long, ReferenceDescribe> map = messageRefs.get(info.id); - if (map == null) { - HashMap<Long, ReferenceDescribe> newMap = new HashMap<>(); - newMap.put(ref.refEncoding.queueID, ref); - messageRefs.put(info.id, newMap); - } else { - map.put(ref.refEncoding.queueID, ref); - } - } else if (info.getUserRecordType() == JournalRecordIds.ACKNOWLEDGE_REF) { - acks.add(info); - } else if (info.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR) { - CursorAckRecordEncoding encoding = new CursorAckRecordEncoding(); - encoding.decode(buff); - - Set<PagePosition> set = cursorRecords.get(encoding.queueID); - - if (set == null) { - set = new HashSet<>(); - cursorRecords.put(encoding.queueID, set); - } - - set.add(encoding.position); - } else if (info.userRecordType == JournalRecordIds.PAGE_TRANSACTION) { - if (info.isUpdate) { - PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding(); - - pageUpdate.decode(buff); - pgTXs.add(pageUpdate.pageTX); - } else { - PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl(); - - pageTransactionInfo.decode(buff); - - pageTransactionInfo.setRecordID(info.id); - pgTXs.add(pageTransactionInfo.getTransactionID()); - } - } - } - - messageJournal.stop(); - - removeAcked(acks); - } - - /** - * Go back through the messages and message refs we found in the journal and remove the ones that have been acked. - * - * @param acks the list of ack records we got from the journal - */ - private void removeAcked(ArrayList<RecordInfo> acks) { - for (RecordInfo info : acks) { - AckDescribe ack = (AckDescribe) DescribeJournal.newObjectEncoding(info, null); - HashMap<Long, ReferenceDescribe> referenceDescribeHashMap = messageRefs.get(info.id); - referenceDescribeHashMap.remove(ack.refEncoding.queueID); - if (referenceDescribeHashMap.size() == 0) { - messages.remove(info.id); - messageRefs.remove(info.id); - } - } - } - - /** - * Open the bindings journal and extract all bindings data. - * - * @throws Exception will be thrown if anything goes wrong reading the bindings journal - */ - private void getBindings() throws Exception { - List<RecordInfo> records = new LinkedList<>(); - - Journal bindingsJournal = storageManager.getBindingsJournal(); - - bindingsJournal.start(); - - ActiveMQServerLogger.LOGGER.debug("Reading bindings journal from " + config.getBindingsDirectory()); - - ((JournalImpl) bindingsJournal).load(records, null, null, false); - - for (RecordInfo info : records) { - if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) { - PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) DescribeJournal.newObjectEncoding(info, null); - queueBindings.put(bindingEncoding.getId(), bindingEncoding); - } else if (info.getUserRecordType() == JournalRecordIds.ADDRESS_BINDING_RECORD) { - PersistentAddressBindingEncoding bindingEncoding = (PersistentAddressBindingEncoding) DescribeJournal.newObjectEncoding(info, null); - addressBindings.put(bindingEncoding.getId(), bindingEncoding); - } - } - - bindingsJournal.stop(); - } - - private void printDataAsXML() { - try { - xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION); - xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT); - printBindingsAsXML(); - printAllMessagesAsXML(); - xmlWriter.writeEndElement(); // end DOCUMENT_PARENT - xmlWriter.writeEndDocument(); - xmlWriter.flush(); - xmlWriter.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - private void printBindingsAsXML() throws XMLStreamException { - xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT); - for (Map.Entry<Long, PersistentAddressBindingEncoding> addressBindingEncodingEntry : addressBindings.entrySet()) { - PersistentAddressBindingEncoding bindingEncoding = addressBindings.get(addressBindingEncodingEntry.getKey()); - xmlWriter.writeEmptyElement(XmlDataConstants.ADDRESS_BINDINGS_CHILD); - StringBuilder routingTypes = new StringBuilder(); - for (RoutingType routingType : bindingEncoding.getRoutingTypes()) { - routingTypes.append(routingType.toString()).append(", "); - } - xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE, routingTypes.toString().substring(0, routingTypes.length() - 2)); - xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_NAME, bindingEncoding.getName().toString()); - xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ID, Long.toString(bindingEncoding.getId())); - bindingsPrinted++; - } - for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet()) { - PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey()); - xmlWriter.writeEmptyElement(XmlDataConstants.QUEUE_BINDINGS_CHILD); - xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ADDRESS, bindingEncoding.getAddress().toString()); - String filter = ""; - if (bindingEncoding.getFilterString() != null) { - filter = bindingEncoding.getFilterString().toString(); - } - xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_FILTER_STRING, filter); - xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_NAME, bindingEncoding.getQueueName().toString()); - xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ID, Long.toString(bindingEncoding.getId())); - xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE, RoutingType.getType(bindingEncoding.getRoutingType()).toString()); - bindingsPrinted++; - } - xmlWriter.writeEndElement(); // end BINDINGS_PARENT - } - - private void printAllMessagesAsXML() throws Exception { - xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT); - - // Order here is important. We must process the messages from the journal before we process those from the page - // files in order to get the messages in the right order. - for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) { - printSingleMessageAsXML(messageMapEntry.getValue().toCore(), extractQueueNames(messageRefs.get(messageMapEntry.getKey()))); - } - - printPagedMessagesAsXML(); - - xmlWriter.writeEndElement(); // end "messages" - } - - /** - * Reads from the page files and prints messages as it finds them (making sure to check acks and transactions - * from the journal). - */ - private void printPagedMessagesAsXML() { - try { - ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()); - final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory()); - ExecutorFactory executorFactory = new ExecutorFactory() { - @Override - public Executor getExecutor() { - return executor; - } - }; - PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduled, executorFactory, true, null); - HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<>(); - addressSettingsRepository.setDefault(new AddressSettings()); - PagingManager manager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository); - - manager.start(); - - SimpleString[] stores = manager.getStoreNames(); - - for (SimpleString store : stores) { - PagingStore pageStore = manager.getPageStore(store); - - if (pageStore != null) { - File folder = pageStore.getFolder(); - ActiveMQServerLogger.LOGGER.debug("Reading page store " + store + " folder = " + folder); - - int pageId = (int) pageStore.getFirstPage(); - for (int i = 0; i < pageStore.getNumberOfPages(); i++) { - ActiveMQServerLogger.LOGGER.debug("Reading page " + pageId); - Page page = pageStore.createPage(pageId); - page.open(); - List<PagedMessage> messages = page.read(storageManager); - page.close(); - - int messageId = 0; - - for (PagedMessage message : messages) { - message.initMessage(storageManager); - long[] queueIDs = message.getQueueIDs(); - List<String> queueNames = new ArrayList<>(); - for (long queueID : queueIDs) { - PagePosition posCheck = new PagePositionImpl(pageId, messageId); - - boolean acked = false; - - Set<PagePosition> positions = cursorRecords.get(queueID); - if (positions != null) { - acked = positions.contains(posCheck); - } - - if (!acked) { - PersistentQueueBindingEncoding queueBinding = queueBindings.get(queueID); - if (queueBinding != null) { - SimpleString queueName = queueBinding.getQueueName(); - queueNames.add(queueName.toString()); - } - } - } - - if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID()))) { - printSingleMessageAsXML(message.getMessage().toCore(), queueNames); - } - - messageId++; - } - - pageId++; - } - } else { - ActiveMQServerLogger.LOGGER.debug("Page store was null"); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception { - xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD); - printMessageAttributes(message); - printMessageProperties(message); - printMessageQueues(queues); - printMessageBody(message.toCore()); - xmlWriter.writeEndElement(); // end MESSAGES_CHILD - messagesPrinted++; - } - - private void printMessageBody(Message message) throws Exception { - xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY); - - if (message.toCore().isLargeMessage()) { - printLargeMessageBody((LargeServerMessage) message); - } else { - xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message)); - } - xmlWriter.writeEndElement(); // end MESSAGE_BODY - } - - private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException { - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString()); - LargeBodyEncoder encoder = null; - - try { - encoder = message.toCore().getBodyEncoder(); - encoder.open(); - long totalBytesWritten = 0; - Long bufferSize; - long bodySize = encoder.getLargeBodySize(); - for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) { - Long remainder = bodySize - totalBytesWritten; - if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) { - bufferSize = LARGE_MESSAGE_CHUNK_SIZE; - } else { - bufferSize = remainder; - } - ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue()); - encoder.encode(buffer, bufferSize.intValue()); - xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array())); - totalBytesWritten += bufferSize; - } - encoder.close(); - } catch (ActiveMQException e) { - e.printStackTrace(); - } finally { - if (encoder != null) { - try { - encoder.close(); - } catch (ActiveMQException e) { - e.printStackTrace(); - } - } - } - } - - private void printMessageQueues(List<String> queues) throws XMLStreamException { - xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT); - for (String queueName : queues) { - xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD); - xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName); - } - xmlWriter.writeEndElement(); // end QUEUES_PARENT - } - - private void printMessageProperties(Message message) throws XMLStreamException { - xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT); - for (SimpleString key : message.getPropertyNames()) { - Object value = message.getObjectProperty(key); - xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD); - xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString()); - xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value)); - - // Write the property type as an attribute - String propertyType = XmlDataExporterUtil.getPropertyType(value); - if (propertyType != null) { - xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType); - } - } - xmlWriter.writeEndElement(); // end PROPERTIES_PARENT - } - - private void printMessageAttributes(ICoreMessage message) throws XMLStreamException { - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID())); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority())); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration())); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp())); - String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType()); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType); - if (message.getUserID() != null) { - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString()); - } - } - - private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap) { - List<String> queues = new ArrayList<>(); - for (ReferenceDescribe ref : refMap.values()) { - queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString()); - } - return queues; - } - - // Inner classes ------------------------------------------------- - - /** - * Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that. - */ - static class PrettyPrintHandler implements InvocationHandler { - - private final XMLStreamWriter target; - - private int depth = 0; - - private static final char INDENT_CHAR = ' '; - - private static final String LINE_SEPARATOR = System.getProperty("line.separator"); - - boolean wrap = true; - - PrettyPrintHandler(XMLStreamWriter target) { - this.target = target; - } - - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - String m = method.getName(); - - switch (m) { - case "writeStartElement": - target.writeCharacters(LINE_SEPARATOR); - target.writeCharacters(indent(depth)); - - depth++; - break; - case "writeEndElement": - depth--; - if (wrap) { - target.writeCharacters(LINE_SEPARATOR); - target.writeCharacters(indent(depth)); - } - wrap = true; - break; - case "writeEmptyElement": - case "writeCData": - target.writeCharacters(LINE_SEPARATOR); - target.writeCharacters(indent(depth)); - break; - case "writeCharacters": - wrap = false; - break; - } - - method.invoke(target, args); - - return null; - } - - private String indent(int depth) { - depth *= 3; // level of indentation - char[] output = new char[depth]; - while (depth-- > 0) { - output[depth] = INDENT_CHAR; - } - return new String(output); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java deleted file mode 100644 index 7711648..0000000 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.cli.commands.tools; - -import com.google.common.base.Preconditions; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.utils.Base64; - -/** - * Common utility methods to help with XML message conversion - */ -public class XmlDataExporterUtil { - - public static String convertProperty(final Object value) { - if (value instanceof byte[]) { - return encode((byte[]) value); - } else { - return value == null ? XmlDataConstants.NULL : value.toString(); - } - } - - public static String getPropertyType(final Object value) { - String stringValue = null; - - // if the value is null then we can't really know what it is so just set - // the type to the most generic thing - if (value == null) { - stringValue = XmlDataConstants.PROPERTY_TYPE_BYTES; - } else if (value instanceof Boolean) { - stringValue = XmlDataConstants.PROPERTY_TYPE_BOOLEAN; - } else if (value instanceof Byte) { - stringValue = XmlDataConstants.PROPERTY_TYPE_BYTE; - } else if (value instanceof Short) { - stringValue = XmlDataConstants.PROPERTY_TYPE_SHORT; - } else if (value instanceof Integer) { - stringValue = XmlDataConstants.PROPERTY_TYPE_INTEGER; - } else if (value instanceof Long) { - stringValue = XmlDataConstants.PROPERTY_TYPE_LONG; - } else if (value instanceof Float) { - stringValue = XmlDataConstants.PROPERTY_TYPE_FLOAT; - } else if (value instanceof Double) { - stringValue = XmlDataConstants.PROPERTY_TYPE_DOUBLE; - } else if (value instanceof String) { - stringValue = XmlDataConstants.PROPERTY_TYPE_STRING; - } else if (value instanceof SimpleString) { - stringValue = XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING; - } else if (value instanceof byte[]) { - stringValue = XmlDataConstants.PROPERTY_TYPE_BYTES; - } - - return stringValue; - } - - public static String getMessagePrettyType(byte rawType) { - String prettyType = XmlDataConstants.DEFAULT_TYPE_PRETTY; - - if (rawType == Message.BYTES_TYPE) { - prettyType = XmlDataConstants.BYTES_TYPE_PRETTY; - } else if (rawType == Message.MAP_TYPE) { - prettyType = XmlDataConstants.MAP_TYPE_PRETTY; - } else if (rawType == Message.OBJECT_TYPE) { - prettyType = XmlDataConstants.OBJECT_TYPE_PRETTY; - } else if (rawType == Message.STREAM_TYPE) { - prettyType = XmlDataConstants.STREAM_TYPE_PRETTY; - } else if (rawType == Message.TEXT_TYPE) { - prettyType = XmlDataConstants.TEXT_TYPE_PRETTY; - } - - return prettyType; - } - - /** - * Base64 encode a ServerMessage body into the proper XML format - * - * @param message - * @return - */ - public static String encodeMessageBody(final Message message) throws Exception { - Preconditions.checkNotNull(message, "ServerMessage can not be null"); - - ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer(); - byte[] buffer = new byte[byteBuffer.writerIndex()]; - byteBuffer.readBytes(buffer); - - return XmlDataExporterUtil.encode(buffer); - } - - protected static String encode(final byte[] data) { - return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); - } -}
