removing more dead code on openwire implementation
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/04ca86c3 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/04ca86c3 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/04ca86c3 Branch: refs/heads/master Commit: 04ca86c3dd97f2845be5005cd81b9acdbfeda9a4 Parents: 3fbf75b Author: Clebert Suconic <[email protected]> Authored: Fri Aug 28 14:11:11 2015 -0400 Committer: Clebert Suconic <[email protected]> Committed: Fri Aug 28 15:05:41 2015 -0400 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 2 - .../amq/AMQAbstractDeadLetterStrategy.java | 96 --------- .../openwire/amq/AMQConnectionContext.java | 30 --- .../openwire/amq/AMQDeadLetterStrategy.java | 69 ------ .../amq/AMQSharedDeadLetterStrategy.java | 51 ----- .../protocol/openwire/amq/AMQTransaction.java | 209 ------------------- 6 files changed, 457 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/04ca86c3/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 35861a9..a489a93 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -48,7 +48,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthoriz import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.security.SecurityAuth; @@ -565,7 +564,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S context.setConnector(this.acceptorUsed); context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); context.setFaultTolerant(faultTolerantConnection); - context.setTransactions(new ConcurrentHashMap<TransactionId, AMQTransaction>()); context.setUserName(info.getUserName()); context.setWireFormatInfo(wireFormatInfo); context.setReconnect(info.isFailoverReconnect()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/04ca86c3/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java deleted file mode 100644 index bcb2eb2..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import org.apache.activemq.ActiveMQMessageAudit; -import org.apache.activemq.command.Message; - -public abstract class AMQAbstractDeadLetterStrategy implements AMQDeadLetterStrategy { - - private boolean processNonPersistent = false; - private boolean processExpired = true; - private boolean enableAudit = true; - private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit(); - - @Override - public void rollback(Message message) { - if (message != null && this.enableAudit) { - messageAudit.rollback(message); - } - } - - @Override - public boolean isSendToDeadLetterQueue(Message message) { - boolean result = false; - if (message != null) { - result = true; - if (enableAudit && messageAudit.isDuplicate(message)) { - result = false; - // LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", - // message.getMessageId(), message.getDestination()); - } - if (!message.isPersistent() && !processNonPersistent) { - result = false; - } - if (message.isExpired() && !processExpired) { - result = false; - } - } - return result; - } - - /** - * @return the processExpired - */ - @Override - public boolean isProcessExpired() { - return this.processExpired; - } - - /** - * @param processExpired the processExpired to set - */ - @Override - public void setProcessExpired(boolean processExpired) { - this.processExpired = processExpired; - } - - /** - * @return the processNonPersistent - */ - @Override - public boolean isProcessNonPersistent() { - return this.processNonPersistent; - } - - /** - * @param processNonPersistent the processNonPersistent to set - */ - @Override - public void setProcessNonPersistent(boolean processNonPersistent) { - this.processNonPersistent = processNonPersistent; - } - - public boolean isEnableAudit() { - return enableAudit; - } - - public void setEnableAudit(boolean enableAudit) { - this.enableAudit = enableAudit; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/04ca86c3/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java index 33c4079..94d8207 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java @@ -37,8 +37,6 @@ public class AMQConnectionContext { private AMQConnector connector; private OpenWireProtocolManager broker; //use protocol manager to represent the broker private boolean inRecoveryMode; - private AMQTransaction transaction; - private ConcurrentMap<TransactionId, AMQTransaction> transactions; private AMQSecurityContext securityContext; private ConnectionId connectionId; private String clientId; @@ -78,8 +76,6 @@ public class AMQConnectionContext { rc.connector = this.connector; rc.broker = this.broker; rc.inRecoveryMode = this.inRecoveryMode; - rc.transaction = this.transaction; - rc.transactions = this.transactions; rc.securityContext = this.securityContext; rc.connectionId = this.connectionId; rc.clientId = this.clientId; @@ -140,20 +136,6 @@ public class AMQConnectionContext { } /** - * @return the transaction being used. - */ - public AMQTransaction getTransaction() { - return transaction; - } - - /** - * @param transaction being used. - */ - public void setTransaction(AMQTransaction transaction) { - this.transaction = transaction; - } - - /** * @return the connector being used. */ public AMQConnector getConnector() { @@ -190,18 +172,6 @@ public class AMQConnectionContext { this.inRecoveryMode = inRecoveryMode; } - public ConcurrentMap<TransactionId, AMQTransaction> getTransactions() { - return transactions; - } - - public void setTransactions(ConcurrentMap<TransactionId, AMQTransaction> transactions) { - this.transactions = transactions; - } - - public boolean isInTransaction() { - return transaction != null; - } - public String getClientId() { return clientId; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/04ca86c3/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDeadLetterStrategy.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDeadLetterStrategy.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDeadLetterStrategy.java deleted file mode 100644 index ef99c54..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDeadLetterStrategy.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; - -public interface AMQDeadLetterStrategy { - - /** - * Allow pluggable strategy for deciding if message should be sent to a dead letter queue - * for example, you might not want to ignore expired or non-persistent messages - * - * @param message - * @return true if message should be sent to a dead letter queue - */ - boolean isSendToDeadLetterQueue(Message message); - - /** - * Returns the dead letter queue for the given message and subscription. - */ - ActiveMQDestination getDeadLetterQueueFor(Message message, AMQSubscription subscription); - - /** - * @return true if processes expired messages - */ - boolean isProcessExpired(); - - /** - * @param processExpired the processExpired to set - */ - void setProcessExpired(boolean processExpired); - - /** - * @return the processNonPersistent - */ - boolean isProcessNonPersistent(); - - /** - * @param processNonPersistent the processNonPersistent to set - */ - void setProcessNonPersistent(boolean processNonPersistent); - - boolean isDLQ(ActiveMQDestination destination); - - /** - * Allows for a Message that was already processed by a DLQ to be rolled back in case - * of a move or a retry of that message, otherwise the Message would be considered a - * duplicate if this strategy is doing Message Auditing. - * - * @param message - */ - void rollback(Message message); - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/04ca86c3/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java deleted file mode 100644 index fe9bdf3..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.Message; - -public class AMQSharedDeadLetterStrategy extends AMQAbstractDeadLetterStrategy { - - public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "ActiveMQ.DLQ"; - - private ActiveMQDestination deadLetterQueue = new ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME); - - public ActiveMQDestination getDeadLetterQueueFor(Message message, AMQSubscription subscription) { - return deadLetterQueue; - } - - public ActiveMQDestination getDeadLetterQueue() { - return deadLetterQueue; - } - - public void setDeadLetterQueue(ActiveMQDestination deadLetterQueue) { - this.deadLetterQueue = deadLetterQueue; - } - - @Override - public boolean isDLQ(ActiveMQDestination destination) { - if (destination.equals(deadLetterQueue)) { - return true; - } - else { - return false; - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/04ca86c3/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java deleted file mode 100644 index ed88110..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransaction.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; - -import javax.transaction.xa.XAException; - -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.transaction.Synchronization; -import org.slf4j.Logger; - -public abstract class AMQTransaction { - - public static final byte START_STATE = 0; // can go to: 1,2,3 - public static final byte IN_USE_STATE = 1; // can go to: 2,3 - public static final byte PREPARED_STATE = 2; // can go to: 3 - public static final byte FINISHED_STATE = 3; - boolean committed = false; - - private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>(); - private byte state = START_STATE; - protected FutureTask<?> preCommitTask = new FutureTask<Object>(new Callable<Object>() { - public Object call() throws Exception { - doPreCommit(); - return null; - } - }); - protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() { - public Object call() throws Exception { - doPostCommit(); - return null; - } - }); - - public byte getState() { - return state; - } - - public void setState(byte state) { - this.state = state; - } - - public boolean isCommitted() { - return committed; - } - - public void setCommitted(boolean committed) { - this.committed = committed; - } - - public void addSynchronization(Synchronization r) { - synchronizations.add(r); - if (state == START_STATE) { - state = IN_USE_STATE; - } - } - - public Synchronization findMatching(Synchronization r) { - int existing = synchronizations.indexOf(r); - if (existing != -1) { - return synchronizations.get(existing); - } - return null; - } - - public void removeSynchronization(Synchronization r) { - synchronizations.remove(r); - } - - public void prePrepare() throws Exception { - - // Is it ok to call prepare now given the state of the - // transaction? - switch (state) { - case START_STATE: - case IN_USE_STATE: - break; - default: - XAException xae = new XAException("Prepare cannot be called now."); - xae.errorCode = XAException.XAER_PROTO; - throw xae; - } - - // // Run the prePrepareTasks - // for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) { - // Callback r = (Callback) iter.next(); - // r.execute(); - // } - } - - protected void fireBeforeCommit() throws Exception { - for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext(); ) { - Synchronization s = iter.next(); - s.beforeCommit(); - } - } - - protected void fireAfterCommit() throws Exception { - for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext(); ) { - Synchronization s = iter.next(); - s.afterCommit(); - } - } - - public void fireAfterRollback() throws Exception { - Collections.reverse(synchronizations); - for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext(); ) { - Synchronization s = iter.next(); - s.afterRollback(); - } - } - - @Override - public String toString() { - return "Local-" + getTransactionId() + "[synchronizations=" + synchronizations + "]"; - } - - public abstract void commit(boolean onePhase) throws XAException, IOException; - - public abstract void rollback() throws XAException, IOException; - - public abstract int prepare() throws XAException, IOException; - - public abstract TransactionId getTransactionId(); - - public abstract Logger getLog(); - - public boolean isPrepared() { - return getState() == PREPARED_STATE; - } - - public int size() { - return synchronizations.size(); - } - - protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException { - try { - postCommitTask.get(); - } - catch (InterruptedException e) { - throw new InterruptedIOException(e.toString()); - } - catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof XAException) { - throw (XAException) t; - } - else if (t instanceof IOException) { - throw (IOException) t; - } - else { - throw new XAException(e.toString()); - } - } - } - - protected void doPreCommit() throws XAException { - try { - fireBeforeCommit(); - } - catch (Throwable e) { - // I guess this could happen. Post commit task failed - // to execute properly. - getLog().warn("PRE COMMIT FAILED: ", e); - XAException xae = new XAException("PRE COMMIT FAILED"); - xae.errorCode = XAException.XAER_RMFAIL; - xae.initCause(e); - throw xae; - } - } - - protected void doPostCommit() throws XAException { - try { - setCommitted(true); - fireAfterCommit(); - } - catch (Throwable e) { - // I guess this could happen. Post commit task failed - // to execute properly. - getLog().warn("POST COMMIT FAILED: ", e); - XAException xae = new XAException("POST COMMIT FAILED"); - xae.errorCode = XAException.XAER_RMFAIL; - xae.initCause(e); - throw xae; - } - } -}
