Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187487737
  
    --- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -262,42 +189,26 @@ public void onMessage(Message msg) {
          * topic/queue.
          */
         @Override
    -    public void open(Map<String, Object> conf,
    -                     TopologyContext context,
    -                     SpoutOutputCollector collector) {
    +    public void open(final Map<String, Object> conf,
    +                     final TopologyContext context,
    +                     final SpoutOutputCollector spoutOutputCollector) {
     
    -        if (this.jmsProvider == null) {
    -            throw new IllegalStateException("JMS provider has not been 
set.");
    -        }
    -        if (this.tupleProducer == null) {
    -            throw new IllegalStateException("JMS Tuple Producer has not 
been set.");
    +        if (jmsProvider == null) {
    +            throw new IllegalStateException(
    +                "JMS provider has not been set.");
             }
    -        // TODO get the default value from storm instead of hard coding 30 
secs
    -        Long topologyTimeout =
    -            ((Number) 
conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 
DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
    -        if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > 
this.recoveryPeriodMs) {
    -            LOG.warn("*** WARNING *** : "
    -                     + "Recovery period (" + this.recoveryPeriodMs + " 
ms.) is less then the configured "
    -                     + "'topology.message.timeout.secs' of " + 
topologyTimeout
    -                     + " secs. This could lead to a message replay 
flood!");
    +        if (tupleProducer == null) {
    +            throw new IllegalStateException(
    +                "JMS Tuple Producer has not been set.");
             }
    -        this.queue = new LinkedBlockingQueue<Message>();
    -        this.toCommit = new TreeSet<JmsMessageID>();
    -        this.pendingMessages = new HashMap<JmsMessageID, Message>();
    -        this.collector = collector;
    +        collector = spoutOutputCollector;
             try {
    -            ConnectionFactory cf = this.jmsProvider.connectionFactory();
    -            Destination dest = this.jmsProvider.destination();
    -            this.connection = cf.createConnection();
    -            this.session = connection.createSession(false, 
this.jmsAcknowledgeMode);
    -            MessageConsumer consumer = session.createConsumer(dest);
    -            consumer.setMessageListener(this);
    -            this.connection.start();
    -            if (this.isDurableSubscription() && this.recoveryPeriodMs > 0) 
{
    -                this.recoveryTimer = new Timer();
    -                this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 
RECOVERY_DELAY_MS, this.recoveryPeriodMs);
    -            }
    -
    +            ConnectionFactory cf = jmsProvider.connectionFactory();
    +            Destination dest = jmsProvider.destination();
    +            connection = cf.createConnection();
    +            session = messageHandler.createSession(connection);
    --- End diff --
    
    We may want to consider the case: users provide mode which is not in JMS 
standard, and also setIndividualAck() is not called.
    
    Now the case is handled as same as AUTO_ACKNOWLEDGE because of providing 
default value of messageHandler. Is it intended? We prevented the case with 
IllegalArgumentException.


---

Reply via email to