Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2639#discussion_r187487737 --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java --- @@ -262,42 +189,26 @@ public void onMessage(Message msg) { * topic/queue. */ @Override - public void open(Map<String, Object> conf, - TopologyContext context, - SpoutOutputCollector collector) { + public void open(final Map<String, Object> conf, + final TopologyContext context, + final SpoutOutputCollector spoutOutputCollector) { - if (this.jmsProvider == null) { - throw new IllegalStateException("JMS provider has not been set."); - } - if (this.tupleProducer == null) { - throw new IllegalStateException("JMS Tuple Producer has not been set."); + if (jmsProvider == null) { + throw new IllegalStateException( + "JMS provider has not been set."); } - // TODO get the default value from storm instead of hard coding 30 secs - Long topologyTimeout = - ((Number) conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue(); - if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > this.recoveryPeriodMs) { - LOG.warn("*** WARNING *** : " - + "Recovery period (" + this.recoveryPeriodMs + " ms.) is less then the configured " - + "'topology.message.timeout.secs' of " + topologyTimeout - + " secs. This could lead to a message replay flood!"); + if (tupleProducer == null) { + throw new IllegalStateException( + "JMS Tuple Producer has not been set."); } - this.queue = new LinkedBlockingQueue<Message>(); - this.toCommit = new TreeSet<JmsMessageID>(); - this.pendingMessages = new HashMap<JmsMessageID, Message>(); - this.collector = collector; + collector = spoutOutputCollector; try { - ConnectionFactory cf = this.jmsProvider.connectionFactory(); - Destination dest = this.jmsProvider.destination(); - this.connection = cf.createConnection(); - this.session = connection.createSession(false, this.jmsAcknowledgeMode); - MessageConsumer consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - this.connection.start(); - if (this.isDurableSubscription() && this.recoveryPeriodMs > 0) { - this.recoveryTimer = new Timer(); - this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), RECOVERY_DELAY_MS, this.recoveryPeriodMs); - } - + ConnectionFactory cf = jmsProvider.connectionFactory(); + Destination dest = jmsProvider.destination(); + connection = cf.createConnection(); + session = messageHandler.createSession(connection); --- End diff -- We may want to consider the case: users provide mode which is not in JMS standard, and also setIndividualAck() is not called. Now the case is handled as same as AUTO_ACKNOWLEDGE because of providing default value of messageHandler. Is it intended? We prevented the case with IllegalArgumentException.
---