http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectionLifeCycleListener.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectionLifeCycleListener.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectionLifeCycleListener.java new file mode 100644 index 0000000..6d14138 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectionLifeCycleListener.java @@ -0,0 +1,56 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.spi.core.remoting; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.core.server.HornetQComponent; + +/** + * A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public interface ConnectionLifeCycleListener +{ + /** + * This method is used both by client connector creation and server connection creation through + * acceptors. On the client side the {@code component} parameter is normally passed as + * {@code null}. + * <p> + * Leaving this method here and adding a different one at + * {@code ServerConnectionLifeCycleListener} is a compromise for a reasonable split between the + * hornetq-server and hornetq-client packages while avoiding to pull too much into hornetq-core. + * The pivotal point keeping us from removing the method is {@link ConnectorFactory} and the + * usage of it. + * @param component This will probably be an {@code Acceptor} and only used on the server side. + * @param connection the connection that has been created + * @param protocol the messaging protocol type this connection uses + */ + void connectionCreated(HornetQComponent component, Connection connection, String protocol); + + /** + * Called when a connection is destroyed. + * @param connectionID the connection being destroyed. + */ + void connectionDestroyed(Object connectionID); + + + /** + * Called when an error occurs on the connection. + * @param connectionID the id of the connection. + * @param me the exception. + */ + void connectionException(Object connectionID, HornetQException me); + + void connectionReadyForWrites(Object connectionID, boolean ready); +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connector.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connector.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connector.java new file mode 100644 index 0000000..349213b --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connector.java @@ -0,0 +1,61 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.spi.core.remoting; + +import java.util.Map; + +/** + * A Connector is used by the client for creating and controlling a connection. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author Clebert Suconic + */ +public interface Connector +{ + /** + * starts the connector + */ + void start(); + + /** + * closes the connector + */ + void close(); + + /** + * returns true if the connector is started, oterwise false. + * + * @return true if the connector is started + */ + boolean isStarted(); + + /** + * Create and return a connection from this connector. + * <p> + * This method must NOT throw an exception if it fails to create the connection + * (e.g. network is not available), in this case it MUST return null + * + * @return The connection, or null if unable to create a connection (e.g. network is unavailable) + */ + Connection createConnection(); + + /** + * If the configuration is equivalent to this connector, which means + * if the parameter configuration is used to create a connection to a target + * node, it will be the same node as of the connections made with this connector. + * @param configuration + * @return true means the configuration is equivalent to the connector. false otherwise. + */ + boolean isEquivalent(Map<String, Object> configuration); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectorFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectorFactory.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectorFactory.java new file mode 100644 index 0000000..b309a75 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConnectorFactory.java @@ -0,0 +1,66 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.spi.core.remoting; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.activemq6.api.core.TransportConfigurationHelper; + +/** + * A ConnectorFactory is used by the client for creating connectors. + * <p> + * A Connector is used to connect to an {@link org.apache.activemq6.spi.core.remoting.Acceptor}. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public interface ConnectorFactory extends TransportConfigurationHelper +{ + /** + * creates a new instance of a connector. + * + * @param configuration the configuration + * @param handler the handler + * @param listener the listener + * @param closeExecutor the close executor + * @param threadPool the thread pool + * @param scheduledThreadPool the scheduled thread pool + * @return a new connector + */ + Connector createConnector(Map<String, Object> configuration, + BufferHandler handler, + ConnectionLifeCycleListener listener, + Executor closeExecutor, + Executor threadPool, + ScheduledExecutorService scheduledThreadPool, + ClientProtocolManager protocolManager); + + /** + * Returns the allowable properties for this connector. + * <p> + * This will differ between different connector implementations. + * + * @return the allowable properties. + */ + Set<String> getAllowableProperties(); + + /** + * Indicates if connectors from this factory are reliable or not. If a connector is reliable then connection + * monitoring (i.e. pings/pongs) will be disabled. + * + * @return whether or not connectors from this factory are reliable + */ + boolean isReliable(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConsumerContext.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConsumerContext.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConsumerContext.java new file mode 100644 index 0000000..51e4155 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ConsumerContext.java @@ -0,0 +1,22 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.spi.core.remoting; + +/** + * @author Clebert Suconic + */ + +public abstract class ConsumerContext +{ +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ReadyListener.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ReadyListener.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ReadyListener.java new file mode 100644 index 0000000..7829410 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ReadyListener.java @@ -0,0 +1,26 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.spi.core.remoting; + +/** + * A ReadyListener + * + * @author Tim Fox + * + * + */ +public interface ReadyListener +{ + void readyForWriting(boolean ready); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/SessionContext.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/SessionContext.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/SessionContext.java new file mode 100644 index 0000000..abc8657 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/SessionContext.java @@ -0,0 +1,268 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.spi.core.remoting; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.Xid; +import java.util.HashMap; +import java.util.concurrent.Executor; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientConsumer; +import org.apache.activemq6.api.core.client.ClientSession; +import org.apache.activemq6.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq6.core.client.impl.ClientConsumerInternal; +import org.apache.activemq6.core.client.impl.ClientLargeMessageInternal; +import org.apache.activemq6.core.client.impl.ClientMessageInternal; +import org.apache.activemq6.core.client.impl.ClientProducerCreditsImpl; +import org.apache.activemq6.core.client.impl.ClientSessionInternal; +import org.apache.activemq6.core.message.impl.MessageInternal; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; +import org.apache.activemq6.utils.IDGenerator; +import org.apache.activemq6.utils.SimpleIDGenerator; + +/** + * @author Clebert Suconic + */ + +public abstract class SessionContext +{ + protected ClientSessionInternal session; + + protected SendAcknowledgementHandler sendAckHandler; + + protected volatile RemotingConnection remotingConnection; + + protected final IDGenerator idGenerator = new SimpleIDGenerator(0); + + + public SessionContext(RemotingConnection remotingConnection) + { + this.remotingConnection = remotingConnection; + } + + + public ClientSessionInternal getSession() + { + return session; + } + + public void setSession(ClientSessionInternal session) + { + this.session = session; + } + + /** + * it will eather reattach or reconnect, preferably reattaching it. + * + * @param newConnection + * @return true if it was possible to reattach + * @throws HornetQException + */ + public abstract boolean reattachOnNewConnection(RemotingConnection newConnection) throws HornetQException; + + public RemotingConnection getRemotingConnection() + { + return remotingConnection; + } + + + public abstract void closeConsumer(ClientConsumer consumer) throws HornetQException; + + public abstract void sendConsumerCredits(ClientConsumer consumer, int credits); + + public abstract boolean supportsLargeMessage(); + + protected void handleReceiveLargeMessage(ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception + { + ClientSessionInternal session = this.session; + if (session != null) + { + session.handleReceiveLargeMessage(consumerID, clientLargeMessage, largeMessageSize); + } + } + + protected void handleReceiveMessage(ConsumerContext consumerID, final ClientMessageInternal message) throws Exception + { + + ClientSessionInternal session = this.session; + if (session != null) + { + session.handleReceiveMessage(consumerID, message); + } + } + + protected void handleReceiveContinuation(final ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception + { + ClientSessionInternal session = this.session; + if (session != null) + { + session.handleReceiveContinuation(consumerID, chunk, flowControlSize, isContinues); + } + } + + protected void handleReceiveProducerCredits(SimpleString address, int credits) + { + ClientSessionInternal session = this.session; + if (session != null) + { + session.handleReceiveProducerCredits(address, credits); + } + + } + + protected void handleReceiveProducerFailCredits(SimpleString address, int credits) + { + ClientSessionInternal session = this.session; + if (session != null) + { + session.handleReceiveProducerFailCredits(address, credits); + } + + } + + public abstract int getCreditsOnSendingFull(MessageInternal msgI); + + public abstract void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws HornetQException; + + /** + * it should return the number of credits (or bytes) used to send this packet + * + * @param msgI + * @return + * @throws HornetQException + */ + public abstract int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws HornetQException; + + + public abstract int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws HornetQException; + + + public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler); + + public abstract void createSharedQueue(SimpleString address, + SimpleString queueName, + SimpleString filterString, + boolean durable) throws HornetQException; + + public abstract void deleteQueue(SimpleString queueName) throws HornetQException; + + public abstract void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString, boolean durable, boolean temp) throws HornetQException; + + public abstract ClientSession.QueueQuery queueQuery(SimpleString queueName) throws HornetQException; + + public abstract void forceDelivery(ClientConsumer consumer, long sequence) throws HornetQException; + + public abstract ClientSession.AddressQuery addressQuery(final SimpleString address) throws HornetQException; + + public abstract void simpleCommit() throws HornetQException; + + + /** + * If we are doing a simple rollback on the RA, we need to ack the last message sent to the consumer, + * otherwise DLQ won't work. + * <p/> + * this is because we only ACK after on the RA, We may review this if we always acked earlier. + * + * @param lastMessageAsDelivered + * @throws HornetQException + */ + public abstract void simpleRollback(boolean lastMessageAsDelivered) throws HornetQException; + + public abstract void sessionStart() throws HornetQException; + + public abstract void sessionStop() throws HornetQException; + + public abstract void sendACK(boolean individual, boolean block, final ClientConsumer consumer, final Message message) throws HornetQException; + + public abstract void expireMessage(final ClientConsumer consumer, Message message) throws HornetQException; + + public abstract void sessionClose() throws HornetQException; + + public abstract void addSessionMetadata(String key, String data) throws HornetQException; + + public abstract void addUniqueMetaData(String key, String data) throws HornetQException; + + public abstract void sendProducerCreditsMessage(final int credits, final SimpleString address); + + public abstract void xaCommit(Xid xid, boolean onePhase) throws XAException, HornetQException; + + public abstract void xaEnd(Xid xid, int flags) throws XAException, HornetQException; + + public abstract void xaForget(Xid xid) throws XAException, HornetQException; + + public abstract int xaPrepare(Xid xid) throws XAException, HornetQException; + + public abstract Xid[] xaScan() throws HornetQException; + + public abstract void xaRollback(Xid xid, boolean wasStarted) throws HornetQException, XAException; + + public abstract void xaStart(Xid xid, int flags) throws XAException, HornetQException; + + public abstract boolean configureTransactionTimeout(int seconds) throws HornetQException; + + public abstract ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, int windowSize, int maxRate, int ackBatchSize, boolean browseOnly, + Executor executor, Executor flowControlExecutor) throws HornetQException; + + /** + * Performs a round trip to the server requesting what is the current tx timeout on the session + * + * @return + */ + public abstract int recoverSessionTimeout() throws HornetQException; + + public abstract int getServerVersion(); + + public abstract void recreateSession(final String username, + final String password, + final int minLargeMessageSize, + final boolean xa, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge, + final SimpleString defaultAddress) throws HornetQException; + + + public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws HornetQException; + + public abstract void xaFailed(Xid xid) throws HornetQException; + + public abstract void restartSession() throws HornetQException; + + public abstract void resetMetadata(HashMap<String, String> metaDataToSend); + + + // Failover utility classes + + /** + * Interrupt and return any blocked calls + */ + public abstract void returnBlocking(HornetQException cause); + + /** + * it will lock the communication channel of the session avoiding anything to come while failover is happening. + * It happens on preFailover from ClientSessionImpl + */ + public abstract void lockCommunications(); + + + public abstract void releaseCommunications(); + + public abstract void cleanup(); + + + public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/TopologyResponseHandler.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/TopologyResponseHandler.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/TopologyResponseHandler.java new file mode 100644 index 0000000..05ef1be --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/TopologyResponseHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.spi.core.remoting; + +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; + +/** + * @author Clebert Suconic + */ + +public interface TopologyResponseHandler +{ + // This is sent when the server is telling the client the node is being disconnected + void nodeDisconnected(RemotingConnection conn, String nodeID, String scaleDownTargetNodeID); + + void notifyNodeUp(long uniqueEventID, + final String backupGroupName, + final String scaleDownGroupName, + final String nodeName, + final Pair<TransportConfiguration, TransportConfiguration> connectorPair, + final boolean isLast); + + // This is sent when any node on the cluster topology is going down + void notifyNodeDown(final long eventTime, final String nodeID); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/package-info.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/package-info.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/package-info.java new file mode 100644 index 0000000..f6cebeb --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * Remoting SPI. + * <br> + * This package defines the Service Provide Interface that + * remoting providers must implement to be supported by HornetQ. + */ +package org.apache.activemq6.spi.core.remoting; + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/BufferHelper.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/BufferHelper.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/BufferHelper.java new file mode 100644 index 0000000..fb19e41 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/BufferHelper.java @@ -0,0 +1,193 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.SimpleString; + +/** + * Helper methods to read and write from HornetQBuffer. + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public class BufferHelper +{ + + /** Size of a String as if it was a Nullable Simple String */ + public static int sizeOfNullableSimpleString(String str) + { + if (str == null) + { + return DataConstants.SIZE_BOOLEAN; + } + else + { + return DataConstants.SIZE_BOOLEAN + sizeOfSimpleString(str); + } + } + + /** Size of a String as it if was a Simple String*/ + public static int sizeOfSimpleString(String str) + { + return DataConstants.SIZE_INT + str.length() * 2; + } + + public static void writeAsNullableSimpleString(HornetQBuffer buffer, String str) + { + buffer.writeNullableSimpleString(SimpleString.toSimpleString(str)); + } + + public static String readNullableSimpleStringAsString(HornetQBuffer buffer) + { + SimpleString str = buffer.readNullableSimpleString(); + return str != null ? str.toString() : null; + } + + public static void writeAsSimpleString(HornetQBuffer buffer, String str) + { + buffer.writeSimpleString(new SimpleString(str)); + } + + /** + * @param buffer + */ + public static void writeNullableBoolean(HornetQBuffer buffer, Boolean value) + { + buffer.writeBoolean(value != null); + + if (value != null) + { + buffer.writeBoolean(value.booleanValue()); + } + } + + public static int sizeOfNullableBoolean(Boolean value) + { + return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_BOOLEAN : 0); + } + + public static Boolean readNullableBoolean(HornetQBuffer buffer) + { + boolean isNotNull = buffer.readBoolean(); + + if (isNotNull) + { + return buffer.readBoolean(); + } + else + { + return null; + } + } + + /** + * @param buffer + */ + public static void writeNullableLong(HornetQBuffer buffer, Long value) + { + buffer.writeBoolean(value != null); + + if (value != null) + { + buffer.writeLong(value.longValue()); + } + } + + /** + * @param buffer + */ + public static void writeNullableDouble(HornetQBuffer buffer, Double value) + { + buffer.writeBoolean(value != null); + + if (value != null) + { + buffer.writeDouble(value.doubleValue()); + } + } + + public static int sizeOfNullableLong(Long value) + { + return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_LONG : 0); + } + + public static int sizeOfNullableDouble(Double value) + { + return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_DOUBLE : 0); + } + + + public static Long readNullableLong(HornetQBuffer buffer) + { + boolean isNotNull = buffer.readBoolean(); + + if (isNotNull) + { + return buffer.readLong(); + } + else + { + return null; + } + } + + /** + * @param buffer + */ + public static void writeNullableInteger(HornetQBuffer buffer, Integer value) + { + buffer.writeBoolean(value != null); + + if (value != null) + { + buffer.writeInt(value.intValue()); + } + } + + public static int sizeOfNullableInteger(Integer value) + { + return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_INT : 0); + } + + public static Integer readNullableInteger(HornetQBuffer buffer) + { + boolean isNotNull = buffer.readBoolean(); + + if (isNotNull) + { + return buffer.readInt(); + } + else + { + return null; + } + } + + public static Double readNullableDouble(HornetQBuffer buffer) + { + boolean isNotNull = buffer.readBoolean(); + + if (isNotNull) + { + return buffer.readDouble(); + } + else + { + return null; + } + } + +} + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfigurationHelper.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfigurationHelper.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfigurationHelper.java new file mode 100644 index 0000000..bf619cf --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfigurationHelper.java @@ -0,0 +1,250 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + + +/** + * A ConfigurationHelper + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public class ConfigurationHelper +{ + public static String getStringProperty(final String propName, final String def, final Map<String, Object> props) + { + if (props == null) + { + return def; + } + + Object prop = props.get(propName); + + if (prop == null) + { + return def; + } + else + { + if (prop instanceof String == false) + { + return prop.toString(); + } + else + { + return (String)prop; + } + } + } + + public static int getIntProperty(final String propName, final int def, final Map<String, Object> props) + { + if (props == null) + { + return def; + } + Object prop = props.get(propName); + + if (prop == null) + { + return def; + } + else + { + // The resource adapter will aways send Strings, hence the conversion here + if (prop instanceof String) + { + return Integer.valueOf((String)prop); + } + else if (prop instanceof Number == false) + { + HornetQClientLogger.LOGGER.propertyNotInteger(propName, prop.getClass().getName()); + + return def; + } + else + { + return ((Number)prop).intValue(); + } + } + } + + public static long getLongProperty(final String propName, final long def, final Map<String, Object> props) + { + if (props == null) + { + return def; + } + + Object prop = props.get(propName); + + if (prop == null) + { + return def; + } + else + { + // The resource adapter will aways send Strings, hence the conversion here + if (prop instanceof String) + { + return Long.valueOf((String)prop); + } + else if (prop instanceof Number == false) + { + HornetQClientLogger.LOGGER.propertyNotLong(propName, prop.getClass().getName()); + + return def; + } + else + { + return ((Number)prop).longValue(); + } + } + } + + public static boolean getBooleanProperty(final String propName, final boolean def, final Map<String, Object> props) + { + if (props == null) + { + return def; + } + + Object prop = props.get(propName); + + if (prop == null) + { + return def; + } + else + { + // The resource adapter will aways send Strings, hence the conversion here + if (prop instanceof String) + { + return Boolean.valueOf((String)prop); + } + else if (prop instanceof Boolean == false) + { + HornetQClientLogger.LOGGER.propertyNotBoolean(propName, prop.getClass().getName()); + + return def; + } + else + { + return (Boolean)prop; + } + } + } + + public static Set<String> checkKeys(final Set<String> allowableKeys, final Set<String> keys) + { + Set<String> invalid = new HashSet<String>(); + + for (String key : keys) + { + if (!allowableKeys.contains(key)) + { + invalid.add(key); + } + } + return invalid; + } + + public static Set<String> checkKeysExist(final Set<String> requiredKeys, final Set<String> keys) + { + Set<String> invalid = new HashSet<String>(requiredKeys); + + for (String key : keys) + { + if (requiredKeys.contains(key)) + { + invalid.remove(key); + } + } + return invalid; + } + + public static String stringSetToCommaListString(final Set<String> invalid) + { + StringBuilder sb = new StringBuilder(); + int count = 0; + for (String key : invalid) + { + sb.append(key); + if (count != invalid.size() - 1) + { + sb.append(", "); + } + count++; + } + return sb.toString(); + } + + public static String getPasswordProperty(final String propName, + final String def, final Map<String, Object> props, + String defaultMaskPassword, String defaultPasswordCodec) + { + if (props == null) + { + return def; + } + + Object prop = props.get(propName); + + if (prop == null) + { + return def; + } + + String value = prop.toString(); + Boolean useMask = (Boolean) props.get(defaultMaskPassword); + if (useMask == null || (!useMask)) + { + return value; + } + + final String classImpl = (String) props.get(defaultPasswordCodec); + + if (classImpl == null) + { + throw HornetQClientMessageBundle.BUNDLE.noCodec(); + } + + SensitiveDataCodec<String> codec = null; + try + { + codec = PasswordMaskingUtil.getCodec(classImpl); + } + catch (HornetQException e1) + { + throw HornetQClientMessageBundle.BUNDLE.failedToGetDecoder(e1); + } + + try + { + return codec.decode(value); + } + catch (Exception e) + { + throw HornetQClientMessageBundle.BUNDLE.errordecodingPassword(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfirmationWindowWarning.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfirmationWindowWarning.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfirmationWindowWarning.java new file mode 100644 index 0000000..85d7034 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ConfirmationWindowWarning.java @@ -0,0 +1,36 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * + */ +package org.apache.activemq6.utils; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * TODO: get rid of this + */ +public final class ConfirmationWindowWarning +{ + public final boolean disabled; + public final AtomicBoolean warningIssued; + + /** + * + */ + public ConfirmationWindowWarning(boolean disabled) + { + this.disabled = disabled; + warningIssued = new AtomicBoolean(false); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/DeflaterReader.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/DeflaterReader.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/DeflaterReader.java new file mode 100644 index 0000000..c735fa0 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/DeflaterReader.java @@ -0,0 +1,138 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicLong; +import java.util.zip.Deflater; + +/** + * A DeflaterReader + * The reader takes an inputstream and compress it. + * Not for concurrent use. + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + */ +public class DeflaterReader extends InputStream +{ + private final Deflater deflater = new Deflater(); + private boolean isFinished = false; + private boolean compressDone = false; + + private InputStream input; + + private final AtomicLong bytesRead; + + public DeflaterReader(final InputStream inData, final AtomicLong bytesRead) + { + input = inData; + this.bytesRead = bytesRead; + } + + @Override + public int read() throws IOException + { + byte[] buffer = new byte[1]; + int n = read(buffer, 0, 1); + if (n == 1) + { + return buffer[0] & 0xFF; + } + if (n == -1 || n == 0) + { + return -1; + } + throw new IOException("Error reading data, invalid n: " + n); + } + + /** + * Try to fill the buffer with compressed bytes. Except the last effective read, + * this method always returns with a full buffer of compressed data. + * + * @param buffer the buffer to fill compressed bytes + * @return the number of bytes really filled, -1 indicates end. + * @throws IOException + */ + @Override + public int read(final byte[] buffer, int offset, int len) throws IOException + { + if (compressDone) + { + return -1; + } + + //buffer for reading input stream + byte[] readBuffer = new byte[2 * len]; + + int n = 0; + int read = 0; + + while (len > 0) + { + n = deflater.deflate(buffer, offset, len); + if (n == 0) + { + if (isFinished) + { + deflater.end(); + compressDone = true; + break; + } + else if (deflater.needsInput()) + { + // read some data from inputstream + int m = input.read(readBuffer); + + if (m == -1) + { + deflater.finish(); + isFinished = true; + } + else + { + if (bytesRead != null) + { + bytesRead.addAndGet(m); + } + deflater.setInput(readBuffer, 0, m); + } + } + else + { + deflater.finish(); + isFinished = true; + } + } + else + { + read += n; + offset += n; + len -= n; + } + } + return read; + } + + public void closeStream() throws IOException + { + super.close(); + input.close(); + } + + public long getTotalSize() + { + return bytesRead.get(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ExecutorFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ExecutorFactory.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ExecutorFactory.java new file mode 100644 index 0000000..b2d00f2 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/ExecutorFactory.java @@ -0,0 +1,27 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import java.util.concurrent.Executor; + +/** + * + * A ExecutorFactory + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public interface ExecutorFactory +{ + Executor getExecutor(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/FutureLatch.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/FutureLatch.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/FutureLatch.java new file mode 100644 index 0000000..c4fa561 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/FutureLatch.java @@ -0,0 +1,61 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * A Future + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public class FutureLatch implements Runnable +{ + private final CountDownLatch latch; + + public FutureLatch() + { + super(); + latch = new CountDownLatch(1); + } + + public FutureLatch(int latches) + { + super(); + latch = new CountDownLatch(latches); + } + + public boolean await(final long timeout) + { + try + { + return latch.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + return false; + } + } + + public void run() + { + latch.countDown(); + } + + @Override + public String toString() + { + return FutureLatch.class.getSimpleName() + "(latch=" + latch + ")"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/HornetQBufferInputStream.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/HornetQBufferInputStream.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/HornetQBufferInputStream.java new file mode 100644 index 0000000..cb2edde --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/HornetQBufferInputStream.java @@ -0,0 +1,181 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.activemq6.api.core.HornetQBuffer; + +/** + * Used to send large messages + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public class HornetQBufferInputStream extends InputStream +{ + + /* (non-Javadoc) + * @see java.io.InputStream#read() + */ + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + private HornetQBuffer bb; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + public HornetQBufferInputStream(final HornetQBuffer paramByteBuffer) + { + bb = paramByteBuffer; + } + + @Override + public int read() throws IOException + { + if (bb == null) + { + throw new IOException("read on a closed InputStream"); + } + + if (remainingBytes() == 0) + { + return -1; + } + else + { + return bb.readByte() & 0xFF; + } + } + + @Override + public int read(final byte[] byteArray) throws IOException + { + if (bb == null) + { + throw new IOException("read on a closed InputStream"); + } + + return read(byteArray, 0, byteArray.length); + } + + @Override + public int read(final byte[] byteArray, final int off, final int len) throws IOException + { + if (bb == null) + { + throw new IOException("read on a closed InputStream"); + } + + if (byteArray == null) + { + throw new NullPointerException(); + } + if (off < 0 || off > byteArray.length || len < 0 || off + len > byteArray.length || off + len < 0) + { + throw new IndexOutOfBoundsException(); + } + if (len == 0) + { + return 0; + } + + int size = Math.min(remainingBytes(), len); + + if (size == 0) + { + return -1; + } + + bb.readBytes(byteArray, off, size); + return size; + } + + @Override + public long skip(final long len) throws IOException + { + if (bb == null) + { + throw new IOException("skip on a closed InputStream"); + } + + if (len <= 0L) + { + return 0L; + } + + int size = Math.min(remainingBytes(), (int) len); + + bb.skipBytes(size); + + return size; + } + + @Override + public int available() throws IOException + { + if (bb == null) + { + throw new IOException("available on a closed InputStream"); + } + + return remainingBytes(); + } + + @Override + public void close() throws IOException + { + bb = null; + } + + @Override + public synchronized void mark(final int paramInt) + { + } + + @Override + public synchronized void reset() throws IOException + { + throw new IOException("mark/reset not supported"); + } + + @Override + public boolean markSupported() + { + return false; + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + /** + * @return + */ + private int remainingBytes() + { + return bb.writerIndex() - bb.readerIndex(); + } + + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/IDGenerator.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/IDGenerator.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/IDGenerator.java new file mode 100644 index 0000000..0d31ff7 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/IDGenerator.java @@ -0,0 +1,31 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +/** + * Generator of record IDs for the journals. + * <p> + * Notice that while the bindings and messages journals are independent from one another they use + * the same {@link IDGenerator} instance. + * <p> + * The next recordID should be persisted in the journals during a normal shutdown. The lack of such + * a record indicates a server crash. During server restart, if the journals lack a + * {@literal next-recordID} record, we use the last recorded ID plus {@code MAX_INT}. + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public interface IDGenerator +{ + long generateID(); + + long getCurrentID(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterReader.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterReader.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterReader.java new file mode 100644 index 0000000..13eea8b --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterReader.java @@ -0,0 +1,139 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +/** + * An InflaterReader + * It takes an compressed input stream and decompressed it as it is being read. + * Not for concurrent use. + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + */ +public class InflaterReader extends InputStream +{ + private Inflater inflater = new Inflater(); + + private InputStream input; + + private byte[] readBuffer; + private int pointer; + private int length; + + public InflaterReader(InputStream input) + { + this(input, 1024); + } + + public InflaterReader(InputStream input, int bufferSize) + { + this.input = input; + this.readBuffer = new byte[bufferSize]; + this.pointer = -1; + } + + public int read() throws IOException + { + if (pointer == -1) + { + try + { + length = doRead(readBuffer, 0, readBuffer.length); + if (length == 0) + { + return -1; + } + pointer = 0; + } + catch (DataFormatException e) + { + IOException e2 = new IOException(e.getMessage()); + e2.initCause(e); + throw e2; + } + } + + int value = readBuffer[pointer] & 0xFF; + pointer++; + if (pointer == length) + { + pointer = -1; + } + + return value; + } + + /* + * feed inflater more bytes in order to get some + * decompressed output. + * returns number of bytes actually got + */ + private int doRead(byte[] buf, int offset, int len) throws DataFormatException, IOException + { + int read = 0; + int n = 0; + byte[] inputBuffer = new byte[len]; + + while (len > 0) + { + n = inflater.inflate(buf, offset, len); + if (n == 0) + { + if (inflater.finished()) + { + break; + } + else if (inflater.needsInput()) + { + //feeding + int m = input.read(inputBuffer); + + if (m == -1) + { + //it shouldn't be here, throw exception + throw new DataFormatException("Input is over while inflater still expecting data"); + } + else + { + //feed the data in + inflater.setInput(inputBuffer, 0, m); + n = inflater.inflate(buf, offset, len); + if (n > 0) + { + read += n; + offset += n; + len -= n; + } + } + } + else + { + //it shouldn't be here, throw + throw new DataFormatException("Inflater is neither finished nor needing input."); + } + } + else + { + read += n; + offset += n; + len -= n; + } + } + return read; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterWriter.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterWriter.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterWriter.java new file mode 100644 index 0000000..e2a4691 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/InflaterWriter.java @@ -0,0 +1,110 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +/** + * A InflaterWriter + * <p> + * This class takes an OutputStream. Compressed bytes + * can directly be written into this class. The class will + * decompress the bytes and write them to the output stream. + * <p> + * Not for concurrent use. + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + */ +public class InflaterWriter extends OutputStream +{ + private final Inflater inflater = new Inflater(); + + private final OutputStream output; + + private final byte[] writeBuffer = new byte[1024]; + + private int writePointer = 0; + + private final byte[] outputBuffer = new byte[writeBuffer.length * 2]; + + public InflaterWriter(final OutputStream output) + { + this.output = output; + } + + /* + * Write a compressed byte. + */ + @Override + public void write(final int b) throws IOException + { + writeBuffer[writePointer] = (byte)(b & 0xFF); + writePointer++; + + if (writePointer == writeBuffer.length) + { + writePointer = 0; + try + { + doWrite(); + } + catch (DataFormatException e) + { + IOException ie = new IOException("Error decompressing data"); + ie.initCause(e); + throw ie; + } + } + } + + @Override + public void close() throws IOException + { + if (writePointer > 0) + { + inflater.setInput(writeBuffer, 0, writePointer); + try + { + int n = inflater.inflate(outputBuffer); + while (n > 0) + { + output.write(outputBuffer, 0, n); + n = inflater.inflate(outputBuffer); + } + output.close(); + } + catch (DataFormatException e) + { + IOException io = new IOException(e.getMessage()); + io.initCause(e); + throw io; + } + } + } + + private void doWrite() throws DataFormatException, IOException + { + inflater.setInput(writeBuffer); + int n = inflater.inflate(outputBuffer); + + while (n > 0) + { + output.write(outputBuffer, 0, n); + n = inflater.inflate(outputBuffer); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/JNDIUtil.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/JNDIUtil.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/JNDIUtil.java new file mode 100644 index 0000000..0af4b02 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/JNDIUtil.java @@ -0,0 +1,122 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import java.util.StringTokenizer; + +import javax.naming.Binding; +import javax.naming.Context; +import javax.naming.NameNotFoundException; +import javax.naming.NamingEnumeration; +import javax.naming.NamingException; + +/** + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * + */ +public class JNDIUtil +{ + // Constants ----------------------------------------------------- + + // Static -------------------------------------------------------- + + /** + * Create a context path recursively. + */ + public static Context createContext(final Context c, final String path) throws NamingException + { + Context crtContext = c; + for (StringTokenizer st = new StringTokenizer(path, "/"); st.hasMoreTokens();) + { + String tok = st.nextToken(); + + try + { + Object o = crtContext.lookup(tok); + if (!(o instanceof Context)) + { + throw new NamingException("Path " + path + " overwrites and already bound object"); + } + crtContext = (Context)o; + continue; + } + catch (NameNotFoundException e) + { + // OK + } + crtContext = crtContext.createSubcontext(tok); + } + return crtContext; + } + + public static void tearDownRecursively(final Context c) throws Exception + { + for (NamingEnumeration<Binding> ne = c.listBindings(""); ne.hasMore();) + { + Binding b = ne.next(); + String name = b.getName(); + Object object = b.getObject(); + if (object instanceof Context) + { + JNDIUtil.tearDownRecursively((Context)object); + } + c.unbind(name); + } + } + + /** + * Context.rebind() requires that all intermediate contexts and the target context (that named by + * all but terminal atomic component of the name) must already exist, otherwise + * NameNotFoundException is thrown. This method behaves similar to Context.rebind(), but creates + * intermediate contexts, if necessary. + */ + public static void rebind(final Context c, final String jndiName, final Object o) throws NamingException + { + Context context = c; + String name = jndiName; + + int idx = jndiName.lastIndexOf('/'); + if (idx != -1) + { + context = JNDIUtil.createContext(c, jndiName.substring(0, idx)); + name = jndiName.substring(idx + 1); + } + boolean failed = false; + try + { + context.rebind(name, o); + } + catch (Exception ignored) + { + failed = true; + } + if (failed) + { + context.bind(name, o); + } + } + + // Attributes ---------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedList.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedList.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedList.java new file mode 100644 index 0000000..3d288e8 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedList.java @@ -0,0 +1,35 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +/** + * A LinkedList + * + * @author Tim Fox + * + * + */ +public interface LinkedList<E> +{ + void addHead(E e); + + void addTail(E e); + + E poll(); + + LinkedListIterator<E> iterator(); + + void clear(); + + int size(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListImpl.java new file mode 100644 index 0000000..382d471 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListImpl.java @@ -0,0 +1,456 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import java.lang.reflect.Array; +import java.util.NoSuchElementException; + + +/** + * A linked list implementation which allows multiple iterators to exist at the same time on the queue, and which see any + * elements added or removed from the queue either directly or via iterators. + * + * This class is not thread safe. + * + * @author Tim Fox + * + * + */ +public class LinkedListImpl<E> implements LinkedList<E> +{ + private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10; + + private final Node<E> head = new Node<E>(null); + + private Node<E> tail = null; + + private int size; + + // We store in an array rather than a Map for the best performance + private volatile Iterator[] iters; + + private int numIters; + + private int nextIndex; + + public LinkedListImpl() + { + iters = createIteratorArray(INITIAL_ITERATOR_ARRAY_SIZE); + } + + public void addHead(E e) + { + Node<E> node = new Node<E>(e); + + node.next = head.next; + + node.prev = head; + + head.next = node; + + if (size == 0) + { + tail = node; + } + else + { + // Need to set the previous element on the former head + node.next.prev = node; + } + + size++; + } + + public void addTail(E e) + { + if (size == 0) + { + addHead(e); + } + else + { + Node<E> node = new Node<E>(e); + + node.prev = tail; + + tail.next = node; + + tail = node; + + size++; + } + } + + public E poll() + { + Node<E> ret = head.next; + + if (ret != null) + { + removeAfter(head); + + return ret.val; + } + else + { + return null; + } + } + + public void clear() + { + tail = head.next = null; + + size = 0; + } + + public int size() + { + return size; + } + + public LinkedListIterator<E> iterator() + { + return new Iterator(); + } + + public String toString() + { + StringBuilder str = new StringBuilder("LinkedListImpl [ "); + + Node<E> node = head; + + while (node != null) + { + str.append(node.toString()); + + if (node.next != null) + { + str.append(", "); + } + + node = node.next; + } + + return str.toString(); + } + + public int numIters() + { + return numIters; + } + + private Iterator[] createIteratorArray(int size) + { + return (Iterator[])Array.newInstance(Iterator.class, size); + } + + private void removeAfter(Node<E> node) + { + Node<E> toRemove = node.next; + + node.next = toRemove.next; + + if (toRemove.next != null) + { + toRemove.next.prev = node; + } + + if (toRemove == tail) + { + tail = node; + } + + size--; + + if (toRemove.iterCount != 0) + { + LinkedListImpl.this.nudgeIterators(toRemove); + } + + //Help GC - otherwise GC potentially has to traverse a very long list to see if elements are reachable, this can result in OOM + //https://jira.jboss.org/browse/HORNETQ-469 + toRemove.next = toRemove.prev = null; + } + + private synchronized void nudgeIterators(Node<E> node) + { + for (int i = 0; i < numIters; i++) + { + Iterator iter = iters[i]; + if (iter != null) + { + iter.nudged(node); + } + } + } + + private synchronized void addIter(Iterator iter) + { + if (numIters == iters.length) + { + resize(2 * numIters); + } + + iters[nextIndex++] = iter; + + numIters++; + } + + private synchronized void resize(int newSize) + { + Iterator[] newIters = createIteratorArray(newSize); + + System.arraycopy(iters, 0, newIters, 0, numIters); + + iters = newIters; + } + + private synchronized void removeIter(Iterator iter) + { + for (int i = 0; i < numIters; i++) + { + if (iter == iters[i]) + { + iters[i] = null; + + if (i != numIters - 1) + { + // Fill in the hole + + System.arraycopy(iters, i + 1, iters, i, numIters - i - 1); + } + + numIters--; + + if (numIters >= INITIAL_ITERATOR_ARRAY_SIZE && numIters == iters.length / 2) + { + resize(numIters); + } + + nextIndex--; + + return; + } + } + + throw new IllegalStateException("Cannot find iter to remove"); + } + + private static final class Node<E> + { + Node<E> next; + + Node<E> prev; + + final E val; + + int iterCount; + + Node(E e) + { + val = e; + } + + public String toString() + { + return "Node, value = " + val; + } + } + + private class Iterator implements LinkedListIterator<E> + { + Node<E> last; + + Node<E> current = head.next; + + boolean repeat; + + Iterator() + { + if (current != null) + { + current.iterCount++; + } + + addIter(this); + } + + public void repeat() + { + repeat = true; + } + + public boolean hasNext() + { + Node<E> e = getNode(); + + if (e != null && (e != last || repeat)) + { + return true; + } + + return canAdvance(); + } + + public E next() + { + Node<E> e = getNode(); + + if (repeat) + { + repeat = false; + + if (e != null) + { + return e.val; + } + else + { + if (canAdvance()) + { + advance(); + + e = getNode(); + + return e.val; + } + else + { + throw new NoSuchElementException(); + } + } + } + + if (e == null || e == last) + { + if (canAdvance()) + { + advance(); + + e = getNode(); + } + else + { + throw new NoSuchElementException(); + } + } + + last = e; + + repeat = false; + + return e.val; + } + + public void remove() + { + if (last == null) + { + throw new NoSuchElementException(); + } + + if (current == null) + { + throw new NoSuchElementException(); + } + + LinkedListImpl.this.removeAfter(current.prev); + + last = null; + } + + public void close() + { + removeIter(this); + } + + public void nudged(Node<E> node) + { + if (current == node) + { + if (canAdvance()) + { + advance(); + } + else + { + if (current.prev != head) + { + current.iterCount--; + + current = current.prev; + + current.iterCount++; + } + else + { + current = null; + } + } + } + } + + private Node<E> getNode() + { + if (current == null) + { + current = head.next; + + if (current != null) + { + current.iterCount++; + } + } + + if (current != null) + { + return current; + } + else + { + return null; + } + } + + private boolean canAdvance() + { + if (current == null) + { + current = head.next; + + if (current != null) + { + current.iterCount++; + } + } + + return current != null && current.next != null; + } + + private void advance() + { + if (current == null || current.next == null) + { + throw new NoSuchElementException(); + } + + current.iterCount--; + + current = current.next; + + current.iterCount++; + } + + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListIterator.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListIterator.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListIterator.java new file mode 100644 index 0000000..8100a78 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/LinkedListIterator.java @@ -0,0 +1,32 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import java.util.Iterator; + + +/** + * A LinkedListIterator + * + * This iterator allows the last element to be repeated in the next call to hasNext or next + * + * @author Tim Fox + * + * + */ +public interface LinkedListIterator<E> extends Iterator<E> +{ + void repeat(); + + void close(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/utils/MemorySize.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/utils/MemorySize.java b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/MemorySize.java new file mode 100644 index 0000000..b9633b6 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/utils/MemorySize.java @@ -0,0 +1,140 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.utils; + +import org.apache.activemq6.core.client.HornetQClientLogger; + +import java.lang.ref.WeakReference; + + +/** + * A MemorySize + * + * @author Clebert Suconic + * @author Tim Fox + * + * + */ +public class MemorySize +{ + private static final int numberOfObjects = 10000; + + private static Object newObject(final ObjectFactory factory) throws Exception + { + return factory.createObject(); + } + + public static boolean is64bitArch() + { + boolean is64bit = true; // Default to 64 e.g. if can't retrieve property + + try + { + String arch = System.getProperty("os.arch"); + + if (arch != null) + { + is64bit = arch.contains("64"); + } + } + catch (Exception e) + { + // Ignore + } + + return is64bit; + } + + public interface ObjectFactory + { + Object createObject(); + } + + public static int calculateSize(final ObjectFactory factory) throws Exception + { + final Runtime runtime = Runtime.getRuntime(); + + MemorySize.getMemorySize(runtime); + + MemorySize.newObject(factory); + + int i = 0; + long heap1 = 0; + long heap2 = 0; + long totalMemory1 = 0; + long totalMemory2 = 0; + + // First we do a dry run with twice as many then throw away the results + + Object[] obj = new Object[MemorySize.numberOfObjects * 2]; + + for (i = 0; i < MemorySize.numberOfObjects * 2; i++) + { + obj[i] = MemorySize.newObject(factory); + } + + obj = new Object[MemorySize.numberOfObjects * 2]; + + heap1 = MemorySize.getMemorySize(runtime); + + totalMemory1 = runtime.totalMemory(); + + for (i = 0; i < MemorySize.numberOfObjects; i++) + { + obj[i] = MemorySize.newObject(factory); + } + + heap2 = MemorySize.getMemorySize(runtime); + + totalMemory2 = runtime.totalMemory(); + + final int size = Math.round((float)(heap2 - heap1) / MemorySize.numberOfObjects); + + if (totalMemory1 != totalMemory2) + { + // throw new IllegalStateException("Warning: JVM allocated more data what would make results invalid " + + // totalMemory1 + ":" + totalMemory2); + + HornetQClientLogger.LOGGER.jvmAllocatedMoreMemory(totalMemory1, totalMemory2); + } + + return size; + } + + private static long getMemorySize(final Runtime runtime) + { + for (int i = 0; i < 5; i++) + { + MemorySize.forceGC(); + } + return runtime.totalMemory() - runtime.freeMemory(); + } + + private static void forceGC() + { + WeakReference<Object> dumbReference = new WeakReference<Object>(new Object()); + // A loop that will wait GC, using the minimal time as possible + while (dumbReference.get() != null) + { + System.gc(); + try + { + Thread.sleep(500); + } + catch (InterruptedException e) + { + } + } + } + +}
