Hi,

You need to download activemq-jaas-*.jar separately for using JAAS. All configuration files from http://incubator.apache.org/activemq/security.html works fine with it.

I implemented my custom security for 2 user groups stored in database without JAAS:

msg_producers - can write to messages.input
msg_consumers - can read from messages.output.{$username}
msg_dispatchers - can move messages from messages.input to messages.output.*

Spring context.xml looks like:

<beans>
...
<bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
                ...
                <property name="plugins">
                        <list>
<bean class="manager.broker.AuthenticationPlugin"> <property name="userManager" ref="userManager"/>
                            </bean>
<bean class="manager.broker.AuthorizationPlugin"/>
                        </list>
                </property>
        </bean>
        <bean id="userManager" class="manager.broker.UserManager">
<property name="sessionFactory" ref="hibernateSessionFactory"/>
        </bean>
...
</beans>

manager.broker.AuthenticationPlugin.java looks like:

public class AuthenticationPlugin extends BrokerPluginSupport {

        private Log log = LogFactory.getLog(getClass());

        private UserManagerAuth userManager;

        public void setUserManager(UserManagerAuth auth) {
                this.userManager = auth;
        }

public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
                final Set<String> principals = userManager.getUserGroups(
                        info.getUserName(),
                info.getPassword());
                if (principals == null) {
String message = "User ["+info.getUserName()+"] not authenticated";
                        log.info(message);
                        throw new SecurityException(message);
                } else {
log.info("User ["+info.getUserName()+"] connected"); SecurityContext securityContext = new SecurityContext(info.getUserName()) {
                public Set<String> getPrincipals() {
                    return principals;
                }
            };
            context.setSecurityContext(securityContext);
            super.addConnection(context, info);
                }
        }

public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
                log.info("User ["+info.getUserName()+"] disconnected");
                context.setSecurityContext(null);
                super.removeConnection(context, info, error);
        }
}

manager.broker.AuthorizationPlugin.java looks like:

public class AuthorizationPlugin extends BrokerPluginSupport {

        private Log log = LogFactory.getLog(getClass());

public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { String username = context.getSecurityContext().getUserName(); Set principals = context.getSecurityContext().getPrincipals();
                if (destination.isTemporary() &&

((ActiveMQTempDestination)destination).getConnectionId().
                        equals(context.getConnectionId().getValue())) {
log.info("Destination ["+destination.getQualifiedName()+"] created");
                        return super.addDestination(context, destination);
                } else if (principals.contains("msg_dispatcher") &&

(destination.getQualifiedName().startsWith("topic://ActiveMQ.Advisory.Producer.Queue.messages.output.")) ||

(destination.getQualifiedName().startsWith("topic://ActiveMQ.Advisory.Consumer.Queue.messages.output.")) ||

(destination.getQualifiedName().startsWith("queue://messages.output."))) {
log.info("Creating ["+destination.getQualifiedName()+"] by dispatcher ["+username+"]");
                        return super.addDestination(context, destination);
                } else if (principals.contains("msg_consumer") &&

(destination.getQualifiedName().equals("topic://ActiveMQ.Advisory.Producer.Queue.messages.output."+username)) ||

(destination.getQualifiedName().equals("topic://ActiveMQ.Advisory.Consumer.Queue.messages.output."+username)) ||

(destination.getQualifiedName().equals("queue://messages.output."+username))) { log.info("Creating ["+destination.getQualifiedName()+"] by consumer ["+username+"]");
                        return super.addDestination(context, destination);
                } else {
exceptionLog("User ["+username+"] not allowed to create ["+destination.getQualifiedName()+"]");
                        return null;
                }
        }

public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { String destination = info.getDestination().getQualifiedName(); String username = context.getSecurityContext().getUserName(); Set principals = context.getSecurityContext().getPrincipals(); if (destination.equals("topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic")) { log.info("Consumer ["+username+"] connected to advisory topics");
                        return super.addConsumer(context, info);
} else if (principals.contains("msg_consumer") && destination.equals("queue://messages.output."+username)) { log.info("Consumer ["+username+"] connected to ["+destination+"]");
                        return super.addConsumer(context, info);
} else if (principals.contains("msg_dispatcher") && destination.equals("queue://messages.input")) { log.info("Consumer ["+username+"] connected to ["+destination+"]");
                        return super.addConsumer(context, info);
                } else {
exceptionLog("Consuming from ["+destination+"] not allowed to user ["+username+"]");
                        return null;
                }
        }

public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { String destination = info.getDestination().getQualifiedName(); String username = context.getSecurityContext().getUserName(); Set principals = context.getSecurityContext().getPrincipals(); if (destination.equals("topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic")) { log.info("Producer ["+username+"] connected to advisory topics");
                        super.addProducer(context, info);
} else if (principals.contains("msg_producer") && destination.equals("queue://messages.input")) { log.info("Producer ["+username+"] connected to ["+destination+"]");
                        super.addProducer(context, info);
} else if (principals.contains("msg_dispatcher") && destination.startsWith("queue://messages.output.")) { log.info("Producer ["+username+"] connected to ["+destination+"]");
                        super.addProducer(context, info);
                } else {
exceptionLog("Producing to ["+destination+"] not allowed to user ["+username+"]");
                }
        }

public void send(ConnectionContext context, Message message) throws Exception { String username = context.getSecurityContext().getUserName(); Set principals = context.getSecurityContext().getPrincipals();
                Object messageObject;
                try {
messageObject = ((ActiveMQObjectMessage)message).getObject(); if ((principals.contains("msg_producer") || principals.contains("msg_dispatcher")) && messageObject instanceof manager.messages.model.Message) { log.info("User ["+username+"] sent message ["+message+"]");
                                super.send(context, message);
                        } else {
exceptionLog("User ["+username+"] not allowed to send messages");
                        }
                } catch (RuntimeException e) {
exceptionLog("Only object messages allowed to send");
                }
        }

        private void exceptionLog(String error) {
                log.info(error);
                throw new SecurityException(error);
        }

}

My code works only with last snapshots (not 4.0.1 release) because some bugs are exists in release.

Hope this helps

--
Eugene Prokopiev

Reply via email to