http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-journal/src/main/java/org/apache/activemq/journal/HornetQJournalLogger.java ---------------------------------------------------------------------- diff --git a/activemq-journal/src/main/java/org/apache/activemq/journal/HornetQJournalLogger.java b/activemq-journal/src/main/java/org/apache/activemq/journal/HornetQJournalLogger.java deleted file mode 100644 index 1f6cf60..0000000 --- a/activemq-journal/src/main/java/org/apache/activemq/journal/HornetQJournalLogger.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.journal; - -import org.apache.activemq.core.journal.impl.JournalFile; -import org.jboss.logging.BasicLogger; -import org.jboss.logging.Logger; -import org.jboss.logging.annotations.Cause; -import org.jboss.logging.annotations.LogMessage; -import org.jboss.logging.annotations.Message; -import org.jboss.logging.annotations.MessageLogger; - -/** - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * 3/15/12 - * - * Logger Code 14 - * - * each message id must be 6 digits long starting with 14, the 3rd digit donates the level so - * - * INF0 1 - * WARN 2 - * DEBUG 3 - * ERROR 4 - * TRACE 5 - * FATAL 6 - * - * so an INFO message would be 141000 to 141999 - */ -@MessageLogger(projectCode = "HQ") -public interface HornetQJournalLogger extends BasicLogger -{ - /** - * The journal logger. - */ - HornetQJournalLogger LOGGER = Logger.getMessageLogger(HornetQJournalLogger.class, HornetQJournalLogger.class.getPackage().getName()); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 141000, value = "*** running direct journal blast: {0}", format = Message.Format.MESSAGE_FORMAT) - void runningJournalBlast(Integer numIts); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 141002, value = "starting thread for sync speed test", format = Message.Format.MESSAGE_FORMAT) - void startingThread(); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 141003, value = "Write rate = {0} bytes / sec or {1} MiB / sec", format = Message.Format.MESSAGE_FORMAT) - void writeRate(Double rate, Long l); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 141004, value = "Flush rate = {0} flushes / sec", format = Message.Format.MESSAGE_FORMAT) - void flushRate(Double rate); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 141005, value = "Check Data Files:", format = Message.Format.MESSAGE_FORMAT) - void checkFiles(); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 141006, value = "Sequence out of order on journal", format = Message.Format.MESSAGE_FORMAT) - void seqOutOfOrder(); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 141007, value = "Current File on the journal is <= the sequence file.getFileID={0} on the dataFiles" + - "\nCurrentfile.getFileId={1} while the file.getFileID()={2}" + - "\nIs same = ({3})", - format = Message.Format.MESSAGE_FORMAT) - void currentFile(Long fileID, Long id, Long fileFileID, Boolean b); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 141008, value = "Free File ID out of order", format = Message.Format.MESSAGE_FORMAT) - void fileIdOutOfOrder(); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 141009, value = "A Free File is less than the maximum data", format = Message.Format.MESSAGE_FORMAT) - void fileTooSmall(); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142000, value = "You have a native library with a different version than expected", format = Message.Format.MESSAGE_FORMAT) - void incompatibleNativeLibrary(); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142001, value = "Could not get lock after 60 seconds on closing Asynchronous File: {0}", - format = Message.Format.MESSAGE_FORMAT) - void couldNotGetLock(String fileName); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142002, value = "Asynchronous File: {0} being finalized with opened state", format = Message.Format.MESSAGE_FORMAT) - void fileFinalizedWhileOpen(String fileName); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142003, value = "AIO Callback Error: {0}", format = Message.Format.MESSAGE_FORMAT) - void callbackError(String error); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142004, value = "Inconsistency during compacting: CommitRecord ID = {0} for an already committed transaction during compacting", - format = Message.Format.MESSAGE_FORMAT) - void inconsistencyDuringCompacting(Long transactionID); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142005, value = "Inconsistency during compacting: Delete record being read on an existent record (id={0})", - format = Message.Format.MESSAGE_FORMAT) - void inconsistencyDuringCompactingDelete(Long recordID); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142006, value = "Could not find add Record information for record {0} during compacting", - format = Message.Format.MESSAGE_FORMAT) - void compactingWithNoAddRecord(Long id); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142007, value = "Can not find record {0} during compact replay", - format = Message.Format.MESSAGE_FORMAT) - void noRecordDuringCompactReplay(Long id); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142008, value = "Could not remove file {0} from the list of data files", - format = Message.Format.MESSAGE_FORMAT) - void couldNotRemoveFile(JournalFile file); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142009, value = "Deleting {0} as it does not have the configured size", - format = Message.Format.MESSAGE_FORMAT) - void deletingFile(JournalFile file); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142010, value = "Failed to add file to opened files queue: {0}. This should NOT happen!", - format = Message.Format.MESSAGE_FORMAT) - void failedToAddFile(JournalFile nextOpenedFile); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142011, value = "Error on reading compacting for {0}", - format = Message.Format.MESSAGE_FORMAT) - void compactReadError(JournalFile file); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142012, value = "Couldn''t find tx={0} to merge after compacting", - format = Message.Format.MESSAGE_FORMAT) - void compactMergeError(Long id); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142013, value = "Prepared transaction {0} was not considered completed, it will be ignored", - format = Message.Format.MESSAGE_FORMAT) - void preparedTXIncomplete(Long id); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142014, value = "Transaction {0} is missing elements so the transaction is being ignored", - format = Message.Format.MESSAGE_FORMAT) - void txMissingElements(Long id); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142015, value = "Uncommitted transaction with id {0} found and discarded", - format = Message.Format.MESSAGE_FORMAT) - void uncomittedTxFound(Long id); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142016, value = "Couldn''t stop compactor executor after 120 seconds", - format = Message.Format.MESSAGE_FORMAT) - void couldNotStopCompactor(); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142017, value = "Couldn''t stop journal executor after 60 seconds", - format = Message.Format.MESSAGE_FORMAT) - void couldNotStopJournalExecutor(); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142018, value = "Temporary files were left unnatended after a crash on journal directory, deleting invalid files now", - format = Message.Format.MESSAGE_FORMAT) - void tempFilesLeftOpen(); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142019, value = "Deleting orphaned file {0}", format = Message.Format.MESSAGE_FORMAT) - void deletingOrphanedFile(String fileToDelete); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142020, value = "Couldn''t get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT) - void errorClosingFile(String fileToDelete); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142021, value = "Error on IO callback, {0}", format = Message.Format.MESSAGE_FORMAT) - void errorOnIOCallback(String errorMessage); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142022, value = "Timed out on AIO poller shutdown", format = Message.Format.MESSAGE_FORMAT) - void timeoutOnPollerShutdown(@Cause Exception e); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142023, value = "Executor on file {0} couldn''t complete its tasks in 60 seconds.", format = Message.Format.MESSAGE_FORMAT) - void couldNotCompleteTask(@Cause Exception e, String name); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142024, value = "Error completing callback", format = Message.Format.MESSAGE_FORMAT) - void errorCompletingCallback(@Cause Throwable e); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142025, value = "Error calling onError callback", format = Message.Format.MESSAGE_FORMAT) - void errorCallingErrorCallback(@Cause Throwable e); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142026, value = "Timed out on AIO writer shutdown", format = Message.Format.MESSAGE_FORMAT) - void timeoutOnWriterShutdown(@Cause Throwable e); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142027, value = "Error on writing data! {0} code - {1}", format = Message.Format.MESSAGE_FORMAT) - void errorWritingData(@Cause Throwable e, String errorMessage, Integer errorCode); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142028, value = "Error replaying pending commands after compacting", format = Message.Format.MESSAGE_FORMAT) - void errorReplayingCommands(@Cause Throwable e); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142029, value = "Error closing file", format = Message.Format.MESSAGE_FORMAT) - void errorClosingFile(@Cause Throwable e); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142030, value = "Could not open a file in 60 Seconds", format = Message.Format.MESSAGE_FORMAT) - void errorOpeningFile(@Cause Throwable e); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142031, value = "Error retrieving ID part of the file name {0}", format = Message.Format.MESSAGE_FORMAT) - void errorRetrievingID(@Cause Throwable e, String fileName); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142032, value = "Error reading journal file", format = Message.Format.MESSAGE_FORMAT) - void errorReadingFile(@Cause Throwable e); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142033, value = "Error reinitializing file {0}", format = Message.Format.MESSAGE_FORMAT) - void errorReinitializingFile(@Cause Throwable e, JournalFile file); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 142034, value = "Exception on submitting write", format = Message.Format.MESSAGE_FORMAT) - void errorSubmittingWrite(@Cause Throwable e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 144000, value = "Failed to delete file {0}", format = Message.Format.MESSAGE_FORMAT) - void errorDeletingFile(Object e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 144001, value = "Error starting poller", format = Message.Format.MESSAGE_FORMAT) - void errorStartingPoller(@Cause Exception e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 144002, value = "Error pushing opened file", format = Message.Format.MESSAGE_FORMAT) - void errorPushingFile(@Cause Exception e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 144003, value = "Error compacting", format = Message.Format.MESSAGE_FORMAT) - void errorCompacting(@Cause Throwable e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 144004, value = "Error scheduling compacting", format = Message.Format.MESSAGE_FORMAT) - void errorSchedulingCompacting(@Cause Throwable e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 144005, value = "Failed to performance blast", format = Message.Format.MESSAGE_FORMAT) - void failedToPerfBlast(@Cause Throwable e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 144006, value = "IOError code {0}, {1}", format = Message.Format.MESSAGE_FORMAT) - void ioError(final int errorCode, final String errorMessage); - -}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ActiveMQProtonRemotingConnection.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ActiveMQProtonRemotingConnection.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ActiveMQProtonRemotingConnection.java new file mode 100644 index 0000000..6fbd6e4 --- /dev/null +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ActiveMQProtonRemotingConnection.java @@ -0,0 +1,146 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.core.protocol.proton; + +import java.util.concurrent.Executor; + +import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.core.client.ActiveMQClientLogger; +import org.apache.activemq.spi.core.protocol.AbstractRemotingConnection; +import org.apache.activemq.spi.core.remoting.Connection; +import org.proton.plug.AMQPConnectionContext; + +/** + * + * This is a Server's Connection representation used by ActiveMQ. + * @author Clebert Suconic + */ + +public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection +{ + private final AMQPConnectionContext amqpConnection; + + private boolean destroyed = false; + + private final ProtonProtocolManager manager; + + + public ActiveMQProtonRemotingConnection(ProtonProtocolManager manager, AMQPConnectionContext amqpConnection, Connection transportConnection, Executor executor) + { + super(transportConnection, executor); + this.manager = manager; + this.amqpConnection = amqpConnection; + } + + public ProtonProtocolManager getManager() + { + return manager; + } + + /* + * This can be called concurrently by more than one thread so needs to be locked + */ + public void fail(final ActiveMQException me, String scaleDownTargetNodeID) + { + if (destroyed) + { + return; + } + + destroyed = true; + + ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); + + // Then call the listeners + callFailureListeners(me, scaleDownTargetNodeID); + + callClosingListeners(); + + internalClose(); + } + + + @Override + public void destroy() + { + synchronized (this) + { + if (destroyed) + { + return; + } + + destroyed = true; + } + + + callClosingListeners(); + + internalClose(); + + } + + @Override + public boolean isClient() + { + return false; + } + + @Override + public boolean isDestroyed() + { + return destroyed; + } + + @Override + public void disconnect(boolean criticalError) + { + getTransportConnection().close(); + } + + /** + * Disconnect the connection, closing all channels + */ + @Override + public void disconnect(String scaleDownNodeID, boolean criticalError) + { + getTransportConnection().close(); + } + + @Override + public boolean checkDataReceived() + { + return amqpConnection.checkDataReceived(); + } + + @Override + public void flush() + { + amqpConnection.flush(); + } + + @Override + public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) + { + amqpConnection.inputBuffer(buffer.byteBuf()); + super.bufferReceived(connectionID, buffer); + } + + private void internalClose() + { + // We close the underlying transport connection + getTransportConnection().close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/HornetQProtonRemotingConnection.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/HornetQProtonRemotingConnection.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/HornetQProtonRemotingConnection.java deleted file mode 100644 index fec1eac..0000000 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/HornetQProtonRemotingConnection.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.apache.activemq.core.protocol.proton; - -import java.util.concurrent.Executor; - -import org.apache.activemq.api.core.ActiveMQBuffer; -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.core.client.HornetQClientLogger; -import org.apache.activemq.spi.core.protocol.AbstractRemotingConnection; -import org.apache.activemq.spi.core.remoting.Connection; -import org.proton.plug.AMQPConnectionContext; - -/** - * - * This is a Server's Connection representation used by HornetQ. - * @author Clebert Suconic - */ - -public class HornetQProtonRemotingConnection extends AbstractRemotingConnection -{ - private final AMQPConnectionContext amqpConnection; - - private boolean destroyed = false; - - private final ProtonProtocolManager manager; - - - public HornetQProtonRemotingConnection(ProtonProtocolManager manager, AMQPConnectionContext amqpConnection, Connection transportConnection, Executor executor) - { - super(transportConnection, executor); - this.manager = manager; - this.amqpConnection = amqpConnection; - } - - public ProtonProtocolManager getManager() - { - return manager; - } - - /* - * This can be called concurrently by more than one thread so needs to be locked - */ - public void fail(final ActiveMQException me, String scaleDownTargetNodeID) - { - if (destroyed) - { - return; - } - - destroyed = true; - - HornetQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); - - // Then call the listeners - callFailureListeners(me, scaleDownTargetNodeID); - - callClosingListeners(); - - internalClose(); - } - - - @Override - public void destroy() - { - synchronized (this) - { - if (destroyed) - { - return; - } - - destroyed = true; - } - - - callClosingListeners(); - - internalClose(); - - } - - @Override - public boolean isClient() - { - return false; - } - - @Override - public boolean isDestroyed() - { - return destroyed; - } - - @Override - public void disconnect(boolean criticalError) - { - getTransportConnection().close(); - } - - /** - * Disconnect the connection, closing all channels - */ - @Override - public void disconnect(String scaleDownNodeID, boolean criticalError) - { - getTransportConnection().close(); - } - - @Override - public boolean checkDataReceived() - { - return amqpConnection.checkDataReceived(); - } - - @Override - public void flush() - { - amqpConnection.flush(); - } - - @Override - public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) - { - amqpConnection.inputBuffer(buffer.byteBuf()); - super.bufferReceived(connectionID, buffer); - } - - private void internalClose() - { - // We close the underlying transport connection - getTransportConnection().close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java index cf5bb9a..5e69aae 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java @@ -17,11 +17,11 @@ import java.util.concurrent.Executor; import io.netty.channel.ChannelPipeline; import org.apache.activemq.api.core.ActiveMQBuffer; -import org.apache.activemq.api.core.client.HornetQClient; +import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.core.protocol.proton.converter.ProtonMessageConverter; -import org.apache.activemq.core.protocol.proton.plug.HornetQProtonConnectionCallback; +import org.apache.activemq.core.protocol.proton.plug.ActiveMQProtonConnectionCallback; import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection; -import org.apache.activemq.core.server.HornetQServer; +import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.management.Notification; import org.apache.activemq.core.server.management.NotificationListener; import org.apache.activemq.spi.core.protocol.ConnectionEntry; @@ -34,23 +34,23 @@ import org.proton.plug.AMQPServerConnectionContext; import org.proton.plug.context.server.ProtonServerConnectionContextFactory; /** - * A proton protocol manager, basically reads the Proton Input and maps proton resources to HornetQ resources + * A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ resources * * @author <a href="mailto:[email protected]">Andy Taylor</a> */ public class ProtonProtocolManager implements ProtocolManager, NotificationListener { - private final HornetQServer server; + private final ActiveMQServer server; private MessageConverter protonConverter; - public ProtonProtocolManager(HornetQServer server) + public ProtonProtocolManager(ActiveMQServer server) { this.server = server; this.protonConverter = new ProtonMessageConverter(server.getStorageManager()); } - public HornetQServer getServer() + public ActiveMQServer getServer() { return server; } @@ -71,18 +71,18 @@ public class ProtonProtocolManager implements ProtocolManager, NotificationListe @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) { - HornetQProtonConnectionCallback connectionCallback = new HornetQProtonConnectionCallback(this, remotingConnection); + ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection); AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection(connectionCallback); Executor executor = server.getExecutorFactory().getExecutor(); - HornetQProtonRemotingConnection delegate = new HornetQProtonRemotingConnection(this, amqpConnection, remotingConnection, executor); + ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(this, amqpConnection, remotingConnection, executor); connectionCallback.setProtonConnectionDelegate(delegate); ConnectionEntry entry = new ConnectionEntry(delegate, executor, - System.currentTimeMillis(), HornetQClient.DEFAULT_CONNECTION_TTL); + System.currentTimeMillis(), ActiveMQClient.DEFAULT_CONNECTION_TTL); return entry; } @@ -96,7 +96,7 @@ public class ProtonProtocolManager implements ProtocolManager, NotificationListe @Override public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) { - HornetQProtonRemotingConnection protonConnection = (HornetQProtonRemotingConnection)connection; + ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection)connection; protonConnection.bufferReceived(protonConnection.getID(), buffer); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java index e82202c..07877d2 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java @@ -14,7 +14,7 @@ package org.apache.activemq.core.protocol.proton; import org.apache.activemq.api.core.Interceptor; -import org.apache.activemq.core.server.HornetQServer; +import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory; @@ -30,7 +30,7 @@ public class ProtonProtocolManagerFactory implements ProtocolManagerFactory private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME}; @Override - public ProtocolManager createProtocolManager(HornetQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) + public ProtocolManager createProtocolManager(ActiveMQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) { return new ProtonProtocolManager(server); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/ActiveMQJMSVendor.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/ActiveMQJMSVendor.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/ActiveMQJMSVendor.java new file mode 100644 index 0000000..be8be9e --- /dev/null +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/ActiveMQJMSVendor.java @@ -0,0 +1,155 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.core.protocol.proton.converter; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.qpid.proton.jms.JMSVendor; +import org.apache.activemq.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; +import org.apache.activemq.core.protocol.proton.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq.core.protocol.proton.converter.jms.ServerJMSMapMessage; +import org.apache.activemq.core.protocol.proton.converter.jms.ServerJMSMessage; +import org.apache.activemq.core.protocol.proton.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq.core.protocol.proton.converter.jms.ServerJMSTextMessage; +import org.apache.activemq.core.server.ServerMessage; +import org.apache.activemq.core.server.impl.ServerMessageImpl; +import org.apache.activemq.utils.IDGenerator; + +/** + * @author Clebert Suconic + */ + +public class ActiveMQJMSVendor extends JMSVendor +{ + + private final IDGenerator serverGenerator; + + ActiveMQJMSVendor(IDGenerator idGenerator) + { + this.serverGenerator = idGenerator; + } + + @Override + public BytesMessage createBytesMessage() + { + return new ServerJMSBytesMessage(newMessage(org.apache.activemq.api.core.Message.BYTES_TYPE), 0); + } + + @Override + public StreamMessage createStreamMessage() + { + return new ServerJMSStreamMessage(newMessage(org.apache.activemq.api.core.Message.STREAM_TYPE), 0); + } + + @Override + public Message createMessage() + { + return new ServerJMSMessage(newMessage(org.apache.activemq.api.core.Message.DEFAULT_TYPE), 0 ); + } + + @Override + public TextMessage createTextMessage() + { + return new ServerJMSTextMessage(newMessage(org.apache.activemq.api.core.Message.TEXT_TYPE), 0); + } + + @Override + public ObjectMessage createObjectMessage() + { + return null; + } + + @Override + public MapMessage createMapMessage() + { + return new ServerJMSMapMessage(newMessage(org.apache.activemq.api.core.Message.MAP_TYPE), 0); + } + + @Override + public void setJMSXUserID(Message message, String s) + { + } + + @Override + public Destination createDestination(String name) + { + return super.createDestination(name); + } + + @Override + public <T extends Destination> T createDestination(String name, Class<T> kind) + { + return super.createDestination(name, kind); + } + + @Override + public void setJMSXGroupID(Message message, String s) + { + + } + + @Override + public void setJMSXGroupSequence(Message message, int i) + { + + } + + @Override + public void setJMSXDeliveryCount(Message message, long l) + { + + } + + + public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) + { + switch (messageType) + { + case org.apache.activemq.api.core.Message.STREAM_TYPE: + return new ServerJMSStreamMessage(wrapped, deliveryCount); + case org.apache.activemq.api.core.Message.BYTES_TYPE: + return new ServerJMSBytesMessage(wrapped, deliveryCount); + case org.apache.activemq.api.core.Message.MAP_TYPE: + return new ServerJMSMapMessage(wrapped, deliveryCount); + case org.apache.activemq.api.core.Message.TEXT_TYPE: + return new ServerJMSTextMessage(wrapped, deliveryCount); + default: + return new ServerJMSMessage(wrapped, deliveryCount); + } + + } + + + @Override + public String toAddress(Destination destination) + { + return null; + } + + + private ServerMessageImpl newMessage(byte messageType) + { + ServerMessageImpl message = new ServerMessageImpl(serverGenerator.generateID(), 512); + message.setType(messageType); + ((ResetLimitWrappedActiveMQBuffer)message.getBodyBuffer()).setMessage(null); + return message; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/HornetQJMSVendor.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/HornetQJMSVendor.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/HornetQJMSVendor.java deleted file mode 100644 index 4c09836..0000000 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/HornetQJMSVendor.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.apache.activemq.core.protocol.proton.converter; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.ObjectMessage; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -import org.apache.qpid.proton.jms.JMSVendor; -import org.apache.activemq.core.buffers.impl.ResetLimitWrappedHornetQBuffer; -import org.apache.activemq.core.protocol.proton.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.core.protocol.proton.converter.jms.ServerJMSMapMessage; -import org.apache.activemq.core.protocol.proton.converter.jms.ServerJMSMessage; -import org.apache.activemq.core.protocol.proton.converter.jms.ServerJMSStreamMessage; -import org.apache.activemq.core.protocol.proton.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.core.server.ServerMessage; -import org.apache.activemq.core.server.impl.ServerMessageImpl; -import org.apache.activemq.utils.IDGenerator; - -/** - * @author Clebert Suconic - */ - -public class HornetQJMSVendor extends JMSVendor -{ - - private final IDGenerator serverGenerator; - - HornetQJMSVendor(IDGenerator idGenerator) - { - this.serverGenerator = idGenerator; - } - - @Override - public BytesMessage createBytesMessage() - { - return new ServerJMSBytesMessage(newMessage(org.apache.activemq.api.core.Message.BYTES_TYPE), 0); - } - - @Override - public StreamMessage createStreamMessage() - { - return new ServerJMSStreamMessage(newMessage(org.apache.activemq.api.core.Message.STREAM_TYPE), 0); - } - - @Override - public Message createMessage() - { - return new ServerJMSMessage(newMessage(org.apache.activemq.api.core.Message.DEFAULT_TYPE), 0 ); - } - - @Override - public TextMessage createTextMessage() - { - return new ServerJMSTextMessage(newMessage(org.apache.activemq.api.core.Message.TEXT_TYPE), 0); - } - - @Override - public ObjectMessage createObjectMessage() - { - return null; - } - - @Override - public MapMessage createMapMessage() - { - return new ServerJMSMapMessage(newMessage(org.apache.activemq.api.core.Message.MAP_TYPE), 0); - } - - @Override - public void setJMSXUserID(Message message, String s) - { - } - - @Override - public Destination createDestination(String name) - { - return super.createDestination(name); - } - - @Override - public <T extends Destination> T createDestination(String name, Class<T> kind) - { - return super.createDestination(name, kind); - } - - @Override - public void setJMSXGroupID(Message message, String s) - { - - } - - @Override - public void setJMSXGroupSequence(Message message, int i) - { - - } - - @Override - public void setJMSXDeliveryCount(Message message, long l) - { - - } - - - public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) - { - switch (messageType) - { - case org.apache.activemq.api.core.Message.STREAM_TYPE: - return new ServerJMSStreamMessage(wrapped, deliveryCount); - case org.apache.activemq.api.core.Message.BYTES_TYPE: - return new ServerJMSBytesMessage(wrapped, deliveryCount); - case org.apache.activemq.api.core.Message.MAP_TYPE: - return new ServerJMSMapMessage(wrapped, deliveryCount); - case org.apache.activemq.api.core.Message.TEXT_TYPE: - return new ServerJMSTextMessage(wrapped, deliveryCount); - default: - return new ServerJMSMessage(wrapped, deliveryCount); - } - - } - - - @Override - public String toAddress(Destination destination) - { - return null; - } - - - private ServerMessageImpl newMessage(byte messageType) - { - ServerMessageImpl message = new ServerMessageImpl(serverGenerator.generateID(), 512); - message.setType(messageType); - ((ResetLimitWrappedHornetQBuffer)message.getBodyBuffer()).setMessage(null); - return message; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/ProtonMessageConverter.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/ProtonMessageConverter.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/ProtonMessageConverter.java index faf45f4..18b7de1 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/ProtonMessageConverter.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/ProtonMessageConverter.java @@ -30,13 +30,13 @@ public class ProtonMessageConverter implements MessageConverter { - HornetQJMSVendor hornetQJMSVendor; + ActiveMQJMSVendor activeMQJMSVendor; public ProtonMessageConverter(IDGenerator idGenerator) { - hornetQJMSVendor = new HornetQJMSVendor(idGenerator); - inboundTransformer = new JMSMappingInboundTransformer(hornetQJMSVendor); - outboundTransformer = new JMSMappingOutboundTransformer(hornetQJMSVendor); + activeMQJMSVendor = new ActiveMQJMSVendor(idGenerator); + inboundTransformer = new JMSMappingInboundTransformer(activeMQJMSVendor); + outboundTransformer = new JMSMappingOutboundTransformer(activeMQJMSVendor); } private final InboundTransformer inboundTransformer; @@ -70,7 +70,7 @@ public class ProtonMessageConverter implements MessageConverter @Override public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception { - ServerJMSMessage jmsMessage = hornetQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); + ServerJMSMessage jmsMessage = activeMQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); jmsMessage.decode(); return outboundTransformer.convert(jmsMessage); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java index 36b9c96..f2fd1b8 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java @@ -30,7 +30,7 @@ import static org.apache.activemq.reader.MapMessageUtil.readBodyMap; import static org.apache.activemq.reader.MapMessageUtil.writeBodyMap; /** - * HornetQ implementation of a JMS MapMessage. + * ActiveMQ implementation of a JMS MapMessage. * * @author Norbert Lataille ([email protected]) * @author <a href="mailto:[email protected]">Adrian Brock</a> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSMessage.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSMessage.java index ab2d2b4..fdf14d9 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSMessage.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSMessage.java @@ -23,8 +23,8 @@ import java.util.Enumeration; import org.apache.activemq.api.core.ActiveMQException; import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.core.message.impl.MessageInternal; -import org.apache.activemq.jms.client.HornetQDestination; -import org.apache.activemq.jms.client.HornetQQueue; +import org.apache.activemq.jms.client.ActiveMQDestination; +import org.apache.activemq.jms.client.ActiveMQQueue; import org.apache.activemq.reader.MessageUtil; /** @@ -111,7 +111,7 @@ public class ServerJMSMessage implements Message SimpleString reply = MessageUtil.getJMSReplyTo(message); if (reply != null) { - return HornetQDestination.fromAddress(reply.toString()); + return ActiveMQDestination.fromAddress(reply.toString()); } else { @@ -122,7 +122,7 @@ public class ServerJMSMessage implements Message @Override public final void setJMSReplyTo(Destination replyTo) throws JMSException { - MessageUtil.setJMSReplyTo(message, replyTo == null ? null : ((HornetQDestination) replyTo).getSimpleAddress()); + MessageUtil.setJMSReplyTo(message, replyTo == null ? null : ((ActiveMQDestination) replyTo).getSimpleAddress()); } @@ -138,11 +138,11 @@ public class ServerJMSMessage implements Message { if (!sdest.toString().startsWith("jms.")) { - return new HornetQQueue(sdest.toString(), sdest.toString()); + return new ActiveMQQueue(sdest.toString(), sdest.toString()); } else { - return HornetQDestination.fromAddress(sdest.toString()); + return ActiveMQDestination.fromAddress(sdest.toString()); } } } @@ -156,7 +156,7 @@ public class ServerJMSMessage implements Message } else { - message.setAddress(((HornetQDestination) destination).getSimpleAddress()); + message.setAddress(((ActiveMQDestination) destination).getSimpleAddress()); } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java index 8c9ce31..927eb46 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java @@ -384,7 +384,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St getBuffer().resetReaderIndex(); } - // HornetQRAMessage overrides ---------------------------------------- + // ActiveMQRAMessage overrides ---------------------------------------- @Override public void clearBody() throws JMSException http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java index 8db24a2..ff113cf 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java @@ -24,7 +24,7 @@ import static org.apache.activemq.reader.TextMessageUtil.writeBodyText; /** - * HornetQ implementation of a JMS TextMessage. + * ActiveMQ implementation of a JMS TextMessage. * <br> * This class was ported from SpyTextMessage in JBossMQ. * http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/package-info.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/package-info.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/package-info.java index e45c8d7..6c7a919 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/package-info.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/converter/jms/package-info.java @@ -13,7 +13,7 @@ /** - * This package contains incomplete JMS implementations just to be used with converting amqp to hornetq and + * This package contains incomplete JMS implementations just to be used with converting amqp to activemq and * vice versa * @author Clebert Suconic */ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java new file mode 100644 index 0000000..97c3174 --- /dev/null +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java @@ -0,0 +1,125 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.core.protocol.proton.plug; + +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import org.apache.activemq.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.core.protocol.proton.ActiveMQProtonRemotingConnection; +import org.apache.activemq.core.protocol.proton.ProtonProtocolManager; +import org.apache.activemq.core.protocol.proton.sasl.ActiveMQPlainSASL; +import org.apache.activemq.spi.core.remoting.Connection; +import org.apache.activemq.utils.ReusableLatch; +import org.proton.plug.AMQPConnectionCallback; +import org.proton.plug.AMQPConnectionContext; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.ServerSASL; +import org.proton.plug.sasl.AnonymousServerSASL; + +/** + * @author Clebert Suconic + */ + +public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback +{ + private final ProtonProtocolManager manager; + + private final Connection connection; + + protected ActiveMQProtonRemotingConnection protonConnectionDelegate; + + protected AMQPConnectionContext amqpConnection; + + private final ReusableLatch latch = new ReusableLatch(0); + + public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection) + { + this.manager = manager; + this.connection = connection; + } + + @Override + public ServerSASL[] getSASLMechnisms() + { + return new ServerSASL[]{new AnonymousServerSASL(), new ActiveMQPlainSASL(manager.getServer().getSecurityStore(), manager.getServer().getSecurityManager())}; + } + + @Override + public void close() + { + + } + + @Override + public void setConnection(AMQPConnectionContext connection) + { + this.amqpConnection = connection; + } + + @Override + public AMQPConnectionContext getConnection() + { + return amqpConnection; + } + + public ActiveMQProtonRemotingConnection getProtonConnectionDelegate() + { + return protonConnectionDelegate; + } + + public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection protonConnectionDelegate) + { + this.protonConnectionDelegate = protonConnectionDelegate; + } + + public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) + { + final int size = byteBuf.writerIndex(); + + latch.countUp(); + connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() + { + @Override + public void operationComplete(ChannelFuture future) throws Exception + { + latch.countDown(); + } + }); + + if (amqpConnection.isSyncOnFlush()) + { + try + { + latch.await(5, TimeUnit.SECONDS); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + amqpConnection.outputDone(size); + } + + + @Override + public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) + { + return new ProtonSessionIntegrationCallback(this, manager, connection); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/HornetQProtonConnectionCallback.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/HornetQProtonConnectionCallback.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/HornetQProtonConnectionCallback.java deleted file mode 100644 index 185220f..0000000 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/HornetQProtonConnectionCallback.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.apache.activemq.core.protocol.proton.plug; - -import java.util.concurrent.TimeUnit; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import org.apache.activemq.core.buffers.impl.ChannelBufferWrapper; -import org.apache.activemq.core.protocol.proton.HornetQProtonRemotingConnection; -import org.apache.activemq.core.protocol.proton.ProtonProtocolManager; -import org.apache.activemq.core.protocol.proton.sasl.HornetQPlainSASL; -import org.apache.activemq.spi.core.remoting.Connection; -import org.apache.activemq.utils.ReusableLatch; -import org.proton.plug.AMQPConnectionCallback; -import org.proton.plug.AMQPConnectionContext; -import org.proton.plug.AMQPSessionCallback; -import org.proton.plug.ServerSASL; -import org.proton.plug.sasl.AnonymousServerSASL; - -/** - * @author Clebert Suconic - */ - -public class HornetQProtonConnectionCallback implements AMQPConnectionCallback -{ - private final ProtonProtocolManager manager; - - private final Connection connection; - - protected HornetQProtonRemotingConnection protonConnectionDelegate; - - protected AMQPConnectionContext amqpConnection; - - private final ReusableLatch latch = new ReusableLatch(0); - - public HornetQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection) - { - this.manager = manager; - this.connection = connection; - } - - @Override - public ServerSASL[] getSASLMechnisms() - { - return new ServerSASL[]{new AnonymousServerSASL(), new HornetQPlainSASL(manager.getServer().getSecurityStore(), manager.getServer().getSecurityManager())}; - } - - @Override - public void close() - { - - } - - @Override - public void setConnection(AMQPConnectionContext connection) - { - this.amqpConnection = connection; - } - - @Override - public AMQPConnectionContext getConnection() - { - return amqpConnection; - } - - public HornetQProtonRemotingConnection getProtonConnectionDelegate() - { - return protonConnectionDelegate; - } - - public void setProtonConnectionDelegate(HornetQProtonRemotingConnection protonConnectionDelegate) - { - this.protonConnectionDelegate = protonConnectionDelegate; - } - - public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) - { - final int size = byteBuf.writerIndex(); - - latch.countUp(); - connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() - { - @Override - public void operationComplete(ChannelFuture future) throws Exception - { - latch.countDown(); - } - }); - - if (amqpConnection.isSyncOnFlush()) - { - try - { - latch.await(5, TimeUnit.SECONDS); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - - amqpConnection.outputDone(size); - } - - - @Override - public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) - { - return new ProtonSessionIntegrationCallback(this, manager, connection); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 38c105f..5be9a13 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -24,7 +24,7 @@ import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.jms.EncodedMessage; import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.api.core.client.HornetQClient; +import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.core.journal.IOAsyncTask; import org.apache.activemq.core.protocol.proton.ProtonProtocolManager; import org.apache.activemq.core.server.QueueQueryResult; @@ -52,7 +52,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se { protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0); - private final HornetQProtonConnectionCallback protonSPI; + private final ActiveMQProtonConnectionCallback protonSPI; private final ProtonProtocolManager manager; @@ -62,7 +62,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se private AMQPSessionContext protonSession; - public ProtonSessionIntegrationCallback(HornetQProtonConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection) + public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection) { this.protonSPI = protonSPI; this.manager = manager; @@ -72,7 +72,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se @Override public void onFlowConsumer(Object consumer, int credits) { - // We have our own flow control on AMQP, so we set hornetq's flow control to 0 + // We have our own flow control on AMQP, so we set activemq's flow control to 0 ((ServerConsumer) consumer).receiveCredits(-1); } @@ -98,7 +98,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se serverSession = manager.getServer().createSession(name, user, passcode, - HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, + ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection, false, // boolean autoCommitSends false, // boolean autoCommitAcks, http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/sasl/ActiveMQPlainSASL.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/sasl/ActiveMQPlainSASL.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/sasl/ActiveMQPlainSASL.java new file mode 100644 index 0000000..0fafb31 --- /dev/null +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/sasl/ActiveMQPlainSASL.java @@ -0,0 +1,50 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.core.protocol.proton.sasl; + +import org.apache.activemq.core.security.SecurityStore; +import org.apache.activemq.spi.core.security.ActiveMQSecurityManager; +import org.proton.plug.sasl.ServerSASLPlain; + +/** + * @author Clebert Suconic + */ + +public class ActiveMQPlainSASL extends ServerSASLPlain +{ + + private final ActiveMQSecurityManager securityManager; + + private final SecurityStore securityStore; + + + public ActiveMQPlainSASL(SecurityStore securityStore, ActiveMQSecurityManager securityManager) + { + this.securityManager = securityManager; + this.securityStore = securityStore; + } + + @Override + protected boolean authenticate(String user, String password) + { + if (securityStore.isSecurityEnabled()) + { + return securityManager.validateUser(user, password); + } + else + { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/sasl/HornetQPlainSASL.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/sasl/HornetQPlainSASL.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/sasl/HornetQPlainSASL.java deleted file mode 100644 index 389a474..0000000 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/sasl/HornetQPlainSASL.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.apache.activemq.core.protocol.proton.sasl; - -import org.apache.activemq.core.security.SecurityStore; -import org.apache.activemq.spi.core.security.HornetQSecurityManager; -import org.proton.plug.sasl.ServerSASLPlain; - -/** - * @author Clebert Suconic - */ - -public class HornetQPlainSASL extends ServerSASLPlain -{ - - private final HornetQSecurityManager securityManager; - - private final SecurityStore securityStore; - - - public HornetQPlainSASL(SecurityStore securityStore, HornetQSecurityManager securityManager) - { - this.securityManager = securityManager; - this.securityStore = securityStore; - } - - @Override - protected boolean authenticate(String user, String password) - { - if (securityStore.isSecurityEnabled()) - { - return securityManager.validateUser(user, password); - } - else - { - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/resources/META-INF/services/org.apache.activemq.spi.core.protocol.ProtocolManagerFactory ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/resources/META-INF/services/org.apache.activemq.spi.core.protocol.ProtocolManagerFactory b/activemq-protocols/activemq-amqp-protocol/src/main/resources/META-INF/services/org.apache.activemq.spi.core.protocol.ProtocolManagerFactory new file mode 100644 index 0000000..8f035c7 --- /dev/null +++ b/activemq-protocols/activemq-amqp-protocol/src/main/resources/META-INF/services/org.apache.activemq.spi.core.protocol.ProtocolManagerFactory @@ -0,0 +1 @@ +org.apache.activemq.core.protocol.proton.ProtonProtocolManagerFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-amqp-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory b/activemq-protocols/activemq-amqp-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory deleted file mode 100644 index 8f035c7..0000000 --- a/activemq-protocols/activemq-amqp-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory +++ /dev/null @@ -1 +0,0 @@ -org.apache.activemq.core.protocol.proton.ProtonProtocolManagerFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java index 6005b3d..86575b6 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java @@ -93,7 +93,7 @@ import org.apache.activemq.core.protocol.openwire.amq.AMQTransportConnectionStat import org.apache.activemq.core.protocol.openwire.amq.AMQTransportConnectionStateRegister; import org.apache.activemq.core.remoting.CloseListener; import org.apache.activemq.core.remoting.FailureListener; -import org.apache.activemq.core.server.HornetQServerLogger; +import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Connection; @@ -142,7 +142,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor private Throwable stopError = null; - // should come from hornetq server + // should come from activemq server private final TaskRunnerFactory stopTaskRunnerFactory = null; private boolean starting; @@ -204,7 +204,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor } catch (Throwable t) { - HornetQServerLogger.LOGGER.error("decoding error", t); + ActiveMQServerLogger.LOGGER.error("decoding error", t); return; } @@ -314,11 +314,11 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor } catch (IOException e) { - HornetQServerLogger.LOGGER.error("error decoding", e); + ActiveMQServerLogger.LOGGER.error("error decoding", e); } catch (Throwable t) { - HornetQServerLogger.LOGGER.error("error decoding", t); + ActiveMQServerLogger.LOGGER.error("error decoding", t); } } } @@ -444,7 +444,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor @Override public void fail(ActiveMQException me) { - HornetQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), + ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); // Then call the listeners callFailureListeners(me); @@ -543,7 +543,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor // Failure of one listener to execute shouldn't prevent others // from // executing - HornetQServerLogger.LOGGER.errorCallingFailureListener(t); + ActiveMQServerLogger.LOGGER.errorCallingFailureListener(t); } } } @@ -564,7 +564,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor // Failure of one listener to execute shouldn't prevent others // from // executing - HornetQServerLogger.LOGGER.errorCallingFailureListener(t); + ActiveMQServerLogger.LOGGER.errorCallingFailureListener(t); } } } @@ -586,7 +586,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor try { ByteSequence bytes = wireFormat.marshal(command); - ActiveMQBuffer buffer = OpenWireUtil.toHornetQBuffer(bytes); + ActiveMQBuffer buffer = OpenWireUtil.toActiveMQBuffer(bytes); synchronized (sendLock) { getTransportConnection().write(buffer, false, false); @@ -598,7 +598,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor } catch (Throwable t) { - HornetQServerLogger.LOGGER.error("error sending", t); + ActiveMQServerLogger.LOGGER.error("error sending", t); } } @@ -652,7 +652,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor context.setConnection(this); context.setConnectionId(info.getConnectionId()); // for now we pass the manager as the connector and see what happens - // it should be related to hornetq's Acceptor + // it should be related to activemq's Acceptor context.setConnector(this.acceptorUsed); context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); context.setNetworkConnection(networkConnection); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java index 65a8d71..e878763 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java @@ -50,6 +50,7 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.state.ConnectionState; @@ -68,16 +69,15 @@ import org.apache.activemq.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.core.protocol.openwire.amq.AMQTransportConnectionState; import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.core.security.CheckType; -import org.apache.activemq.core.server.HornetQServer; -import org.apache.activemq.core.server.HornetQServerLogger; -import org.apache.activemq.core.server.impl.HornetQServerImpl; +import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.spi.core.protocol.ConnectionEntry; import org.apache.activemq.spi.core.protocol.MessageConverter; import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Connection; -import org.apache.activemq.spi.core.security.HornetQSecurityManager; +import org.apache.activemq.spi.core.security.ActiveMQSecurityManager; public class OpenWireProtocolManager implements ProtocolManager { @@ -85,7 +85,7 @@ public class OpenWireProtocolManager implements ProtocolManager private static final IdGenerator ID_GENERATOR = new IdGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); - private final HornetQServer server; + private final ActiveMQServer server; private OpenWireFormatFactory wireFactory; @@ -114,7 +114,7 @@ public class OpenWireProtocolManager implements ProtocolManager private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<TransactionId, AMQSession>(); - public OpenWireProtocolManager(HornetQServer server) + public OpenWireProtocolManager(ActiveMQServer server) { this.server = server; this.wireFactory = new OpenWireFormatFactory(); @@ -223,7 +223,7 @@ public class OpenWireProtocolManager implements ProtocolManager { public void onError(final int errorCode, final String errorMessage) { - HornetQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, + ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); } @@ -236,9 +236,9 @@ public class OpenWireProtocolManager implements ProtocolManager public boolean send(final OpenWireConnection connection, final Command command) { - if (HornetQServerLogger.LOGGER.isTraceEnabled()) + if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { - HornetQServerLogger.LOGGER.trace("sending " + command); + ActiveMQServerLogger.LOGGER.trace("sending " + command); } synchronized (connection) { @@ -642,7 +642,7 @@ public class OpenWireProtocolManager implements ProtocolManager AMQServerSession fakeSession = new AMQServerSession(user, pass); CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; - ((HornetQServerImpl)server).getSecurityStore().check(qName, checkType, fakeSession); + ((ActiveMQServerImpl)server).getSecurityStore().check(qName, checkType, fakeSession); } this.server.createQueue(qName, qName, null, false, true); if (dest.isTemporary()) @@ -729,7 +729,7 @@ public class OpenWireProtocolManager implements ProtocolManager { boolean validated = true; - HornetQSecurityManager sm = server.getSecurityManager(); + ActiveMQSecurityManager sm = server.getSecurityManager(); if (sm != null && server.getConfiguration().isSecurityEnabled()) { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java index 25c5a46..2582b8d 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java @@ -15,7 +15,7 @@ package org.apache.activemq.core.protocol.openwire; import java.util.List; import org.apache.activemq.api.core.Interceptor; -import org.apache.activemq.core.server.HornetQServer; +import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory; @@ -32,7 +32,7 @@ public class OpenWireProtocolManagerFactory implements ProtocolManagerFactory private static String[] SUPPORTED_PROTOCOLS = {OPENWIRE_PROTOCOL_NAME}; - public ProtocolManager createProtocolManager(final HornetQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) + public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) { return new OpenWireProtocolManager(server); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java index 603c5c6..e55e7b1 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java @@ -22,7 +22,7 @@ import org.apache.activemq.api.core.SimpleString; public class OpenWireUtil { - public static ActiveMQBuffer toHornetQBuffer(ByteSequence bytes) + public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) { ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQConsumer.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQConsumer.java index 9e98de2..a7f3527 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQConsumer.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQConsumer.java @@ -19,7 +19,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; @@ -32,12 +31,12 @@ import org.apache.activemq.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.core.protocol.openwire.OpenWireUtil; import org.apache.activemq.core.server.QueueQueryResult; import org.apache.activemq.core.server.ServerMessage; -import org.apache.activemq.jms.client.HornetQDestination; +import org.apache.activemq.jms.client.ActiveMQDestination; public class AMQConsumer implements BrowserListener { private AMQSession session; - private ActiveMQDestination actualDest; + private org.apache.activemq.command.ActiveMQDestination actualDest; private ConsumerInfo info; private long nativeId = -1; private SimpleString subQueueName = null; @@ -46,7 +45,7 @@ public class AMQConsumer implements BrowserListener private AtomicInteger currentSize; private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<MessageInfo>(); - public AMQConsumer(AMQSession amqSession, ActiveMQDestination d, ConsumerInfo info) + public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, ConsumerInfo info) { this.session = amqSession; this.actualDest = d; @@ -79,8 +78,8 @@ public class AMQConsumer implements BrowserListener if (info.isDurable()) { subQueueName = new SimpleString( - HornetQDestination.createQueueNameForDurableSubscription( - true, info.getClientId(), info.getSubscriptionName())); + ActiveMQDestination.createQueueNameForDurableSubscription( + true, info.getClientId(), info.getSubscriptionName())); QueueQueryResult result = coreSession.executeQueueQuery(subQueueName); if (result.isExists()) http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java index e74032f..a609186 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java @@ -27,7 +27,7 @@ public class AMQProducer public void init() { - //hornetq doesn't have producer at server. + //activemq doesn't have producer at server. } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerConsumer.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerConsumer.java index 5a1bcfc..5c53d02 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerConsumer.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerConsumer.java @@ -18,8 +18,8 @@ import org.apache.activemq.core.filter.Filter; import org.apache.activemq.core.persistence.StorageManager; import org.apache.activemq.core.postoffice.QueueBinding; import org.apache.activemq.core.protocol.openwire.OpenWireMessageConverter; +import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.core.server.HandleStatus; -import org.apache.activemq.core.server.HornetQServerLogger; import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.ServerMessage; import org.apache.activemq.core.server.impl.QueueImpl; @@ -83,7 +83,7 @@ public class AMQServerConsumer extends ServerConsumerImpl } catch (Exception e) { - HornetQServerLogger.LOGGER.errorBrowserHandlingMessage(e, current); + ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, current); return; } } @@ -127,7 +127,7 @@ public class AMQServerConsumer extends ServerConsumerImpl } catch (Exception e) { - HornetQServerLogger.LOGGER.errorBrowserHandlingMessage(e, ref); + ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, ref); break; } }
