MLHR-1734 #resolve Added idempotency changes for RabbitMQ input and output operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/13a3fbea Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/13a3fbea Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/13a3fbea Branch: refs/heads/feature-AppData Commit: 13a3fbea74b7deaf674c5f42dd945ebd01e17f65 Parents: 8e94665 Author: ishark <[email protected]> Authored: Mon Jun 29 16:14:14 2015 -0700 Committer: ishark <[email protected]> Committed: Mon Aug 10 19:00:55 2015 -0700 ---------------------------------------------------------------------- contrib/pom.xml | 6 + .../rabbitmq/AbstractRabbitMQInputOperator.java | 141 +++++++++++++++++-- .../AbstractRabbitMQOutputOperator.java | 65 ++++++++- ...bstractSinglePortRabbitMQOutputOperator.java | 5 +- .../rabbitmq/RabbitMQInputOperatorTest.java | 86 ++++++++--- .../rabbitmq/RabbitMQOutputOperatorTest.java | 28 ++-- 6 files changed, 283 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 9776e2f..76e8144 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -565,5 +565,11 @@ <version>${dt.framework.version}</version> <type>jar</type> </dependency> + <dependency> + <groupId>com.datatorrent</groupId> + <artifactId>dt-engine</artifactId> + <version>${dt.framework.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java index 06b3b88..e408f5e 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java @@ -16,12 +16,22 @@ package com.datatorrent.contrib.rabbitmq; import com.datatorrent.api.*; -import com.datatorrent.api.InputOperator; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.netlet.util.DTThrowable; import com.rabbitmq.client.*; + import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; + import javax.validation.constraints.NotNull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,9 +71,9 @@ import org.slf4j.LoggerFactory; * * @since 0.3.2 */ -public abstract class AbstractRabbitMQInputOperator<T> - implements InputOperator, -Operator.ActivationListener<OperatorContext> +public abstract class AbstractRabbitMQInputOperator<T> implements + InputOperator, Operator.ActivationListener<OperatorContext>, + Operator.CheckpointListener { private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class); @NotNull @@ -87,8 +97,23 @@ Operator.ActivationListener<OperatorContext> protected transient Channel channel; protected transient TracingConsumer tracingConsumer; protected transient String cTag; - protected transient ArrayBlockingQueue<byte[]> holdingBuffer; + + protected transient ArrayBlockingQueue<KeyValPair<Long,byte[]>> holdingBuffer; + private IdempotentStorageManager idempotentStorageManager; + protected final transient Map<Long, byte[]> currentWindowRecoveryState; + private transient final Set<Long> pendingAck; + private transient final Set<Long> recoveredTags; + private transient long currentWindowId; + private transient int operatorContextId; + + public AbstractRabbitMQInputOperator() + { + currentWindowRecoveryState = new HashMap<Long, byte[]>(); + pendingAck = new HashSet<Long>(); + recoveredTags = new HashSet<Long>(); + } + /** * define a consumer which can asynchronously receive data, * and added to holdingBuffer @@ -124,8 +149,19 @@ Operator.ActivationListener<OperatorContext> @Override public void handleDelivery(String consumer_Tag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - holdingBuffer.add(body); -// logger.debug("Received Async message:" + new String(body)+" buffersize:"+holdingBuffer.size()); + long tag = envelope.getDeliveryTag(); + if(envelope.isRedeliver() && (recoveredTags.contains(tag) || pendingAck.contains(tag))) + { + if(recoveredTags.contains(tag)) { + pendingAck.add(tag); + } + return; + } + + // Acknowledgements are sent at the end of the window after adding to idempotency manager + pendingAck.add(tag); + holdingBuffer.add(new KeyValPair<Long, byte[]>(tag, body)); + logger.debug("Received Async message: {} buffersize: {} ", new String(body), holdingBuffer.size()); } } @@ -137,7 +173,9 @@ Operator.ActivationListener<OperatorContext> ntuples = holdingBuffer.size(); } for (int i = ntuples; i-- > 0;) { - emitTuple(holdingBuffer.poll()); + KeyValPair<Long, byte[]> message = holdingBuffer.poll(); + currentWindowRecoveryState.put(message.getKey(), message.getValue()); + emitTuple(message.getValue()); } } @@ -146,22 +184,72 @@ Operator.ActivationListener<OperatorContext> @Override public void beginWindow(long windowId) { + currentWindowId = windowId; + if (windowId <= this.idempotentStorageManager.getLargestRecoveryWindow()) { + replay(windowId); + } } + @SuppressWarnings("unchecked") + private void replay(long windowId) { + Map<Long, byte[]> recoveredData; + try { + recoveredData = (Map<Long, byte[]>) this.idempotentStorageManager.load(operatorContextId, windowId); + if (recoveredData == null) { + return; + } + for (Entry<Long, byte[]> recoveredEntry : recoveredData.entrySet()) { + recoveredTags.add(recoveredEntry.getKey()); + emitTuple(recoveredEntry.getValue()); + } + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + + @Override public void endWindow() { + //No more messages can be consumed now. so we will call emit tuples once more + //so that any pending messages can be emitted. + KeyValPair<Long, byte[]> message; + while ((message = holdingBuffer.poll()) != null) { + currentWindowRecoveryState.put(message.getKey(), message.getValue()); + emitTuple(message.getValue()); + } + + try { + this.idempotentStorageManager.save(currentWindowRecoveryState, operatorContextId, currentWindowId); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + + currentWindowRecoveryState.clear(); + + for (Long deliveryTag : pendingAck) { + try { + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + + pendingAck.clear(); } @Override public void setup(OperatorContext context) { - holdingBuffer = new ArrayBlockingQueue<byte[]>(bufferSize); + this.operatorContextId = context.getId(); + holdingBuffer = new ArrayBlockingQueue<KeyValPair<Long, byte[]>>(bufferSize); + this.idempotentStorageManager.setup(context); } @Override public void teardown() { + this.idempotentStorageManager.teardown(); } @Override @@ -178,10 +266,12 @@ Operator.ActivationListener<OperatorContext> channel = connection.createChannel(); channel.exchangeDeclare(exchange, exchangeType); + boolean resetQueueName = false; if (queueName == null){ // unique queuename is generated // used in case of fanout exchange queueName = channel.queueDeclare().getQueue(); + resetQueueName = true; } else { // user supplied name // used in case of direct exchange @@ -193,7 +283,11 @@ Operator.ActivationListener<OperatorContext> // consumer = new QueueingConsumer(channel); // channel.basicConsume(queueName, true, consumer); tracingConsumer = new TracingConsumer(channel); - cTag = channel.basicConsume(queueName, true, tracingConsumer); + cTag = channel.basicConsume(queueName, false, tracingConsumer); + if(resetQueueName) + { + queueName = null; + } } catch (IOException ex) { throw new RuntimeException("Connection Failure", ex); @@ -211,6 +305,23 @@ Operator.ActivationListener<OperatorContext> logger.debug(ex.toString()); } } + + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + try { + idempotentStorageManager.deleteUpTo(operatorContextId, windowId); + } + catch (IOException e) { + throw new RuntimeException("committing", e); + } + } + public void setTupleBlast(int i) { this.tuple_blast = i; @@ -275,5 +386,15 @@ Operator.ActivationListener<OperatorContext> { this.routingKey = routingKey; } + + public IdempotentStorageManager getIdempotentStorageManager() { + return idempotentStorageManager; + } + + public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) { + this.idempotentStorageManager = idempotentStorageManager; + } + + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java index a78febb..cc6b7db 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java @@ -16,12 +16,16 @@ package com.datatorrent.contrib.rabbitmq; import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.api.Context.OperatorContext; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; + import java.io.IOException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,21 +70,70 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator transient Channel channel = null; transient String exchange = "testEx"; transient String queueName="testQ"; + + private IdempotentStorageManager idempotentStorageManager; + private transient long currentWindowId; + private transient long largestRecoveryWindowId; + private transient int operatorContextId; + protected transient boolean skipProcessingTuple = false; + private transient OperatorContext context; + @Override public void setup(OperatorContext context) { + // Needed to setup idempotency storage manager in setter + this.context = context; + this.operatorContextId = context.getId(); + try { connFactory.setHost("localhost"); connection = connFactory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(exchange, "fanout"); -// channel.queueDeclare(queueName, false, false, false, null); + + this.idempotentStorageManager.setup(context); + } catch (IOException ex) { logger.debug(ex.toString()); + DTThrowable.rethrow(ex); + } + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + largestRecoveryWindowId = idempotentStorageManager.getLargestRecoveryWindow(); + if (windowId <= largestRecoveryWindowId) { + // Do not resend already sent tuples + skipProcessingTuple = true; + } + else + { + skipProcessingTuple = false; } } + + /** + * {@inheritDoc} + */ + @Override + public void endWindow() + { + if(currentWindowId < largestRecoveryWindowId) + { + // ignore + return; + } + try { + idempotentStorageManager.save("processedWindow", operatorContextId, currentWindowId); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + public void setQueueName(String queueName) { this.queueName = queueName; @@ -95,9 +148,19 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator try { channel.close(); connection.close(); + this.idempotentStorageManager.teardown(); } catch (IOException ex) { logger.debug(ex.toString()); } } + + public IdempotentStorageManager getIdempotentStorageManager() { + return idempotentStorageManager; + } + + public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) { + this.idempotentStorageManager = idempotentStorageManager; + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java index c16f70f..8e6ff39 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java @@ -60,7 +60,10 @@ public abstract class AbstractSinglePortRabbitMQOutputOperator<T> extends Abstra @Override public void process(T tuple) { - processTuple(tuple); // This is an abstract call + if(!skipProcessingTuple) + { + processTuple(tuple); // This is an abstract call + } } }; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java index 041e362..a14f4e7 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java @@ -25,17 +25,21 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.contrib.helper.CollectorModule; import com.datatorrent.contrib.helper.MessageQueueTestHelper; - +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Attribute; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.LocalMode; - +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.netlet.util.DTThrowable; /** @@ -44,7 +48,7 @@ import com.datatorrent.netlet.util.DTThrowable; public class RabbitMQInputOperatorTest { private static Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class); - + public static final class TestStringRabbitMQInputOperator extends AbstractSinglePortRabbitMQInputOperator<String> { @Override @@ -75,7 +79,6 @@ public class RabbitMQInputOperatorTest connection = connFactory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(exchange, "fanout"); -// channel.queueDeclare(queueName, false, false, false, null); } public void setQueueName(String queueName) @@ -86,9 +89,7 @@ public class RabbitMQInputOperatorTest public void process(Object message) throws IOException { String msg = message.toString(); -// logger.debug("publish:" + msg); channel.basicPublish(exchange, "", null, msg.getBytes()); -// channel.basicPublish("", queueName, null, msg.getBytes()); } public void teardown() throws IOException @@ -100,12 +101,11 @@ public class RabbitMQInputOperatorTest public void generateMessages(int msgCount) throws InterruptedException, IOException { for (int i = 0; i < msgCount; i++) { - - ArrayList<HashMap<String, Integer>> dataMaps = MessageQueueTestHelper.getMessages(); - for(int j =0; j < dataMaps.size(); j++) - { - process(dataMaps.get(j)); - } + + ArrayList<HashMap<String, Integer>> dataMaps = MessageQueueTestHelper.getMessages(); + for (int j = 0; j < dataMaps.size(); j++) { + process(dataMaps.get(j)); + } } } @@ -124,6 +124,8 @@ public class RabbitMQInputOperatorTest LocalMode lma = LocalMode.newInstance(); DAG dag = lma.getDAG(); RabbitMQInputOperator consumer = dag.addOperator("Consumer", RabbitMQInputOperator.class); + consumer.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + final CollectorModule<byte[]> collector = dag.addOperator("Collector", new CollectorModule<byte[]>()); consumer.setHost("localhost"); @@ -144,7 +146,7 @@ public class RabbitMQInputOperatorTest public void run() { long startTms = System.currentTimeMillis(); - long timeout = 10000L; + long timeout = 100000L; try { while (!collector.inputPort.collections.containsKey("collector") && System.currentTimeMillis() - startTms < timeout) { Thread.sleep(500); @@ -153,16 +155,14 @@ public class RabbitMQInputOperatorTest startTms = System.currentTimeMillis(); while (System.currentTimeMillis() - startTms < timeout) { List<?> list = collector.inputPort.collections.get("collector"); - + if (list.size() < testNum * 3) { Thread.sleep(10); - } - else { + } else { break; } } - } - catch (IOException ex) { + } catch (IOException ex) { logger.error(ex.getMessage(), ex); DTThrowable.rethrow(ex); } catch (InterruptedException ex) { @@ -179,5 +179,53 @@ public class RabbitMQInputOperatorTest logger.debug("collection size: {} {}", collector.inputPort.collections.size(), collector.inputPort.collections); MessageQueueTestHelper.validateResults(testNum, collector.inputPort.collections); - } + } + + @Test + public void testRecoveryAndIdempotency() throws Exception + { + RabbitMQInputOperator operator = new RabbitMQInputOperator(); + operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + operator.setHost("localhost"); + operator.setExchange("testEx"); + operator.setExchangeType("fanout"); + + Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + CollectorTestSink<Object> sink = new CollectorTestSink<Object>(); + + operator.outputPort.setSink(sink); + OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap); + + operator.setup(context); + operator.activate(context); + + final RabbitMQMessageGenerator publisher = new RabbitMQMessageGenerator(); + publisher.setup(); + publisher.generateMessages(5); + + Thread.sleep(10000); + + operator.beginWindow(1); + operator.emitTuples(); + operator.endWindow(); + + operator.deactivate(); + Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size()); + + // failure and then re-deployment of operator + sink.collectedTuples.clear(); + operator.setup(context); + operator.activate(context); + + Assert.assertEquals("largest recovery window", 1, operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + operator.beginWindow(1); + operator.endWindow(); + Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size()); + sink.collectedTuples.clear(); + + operator.deactivate(); + operator.teardown(); + operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 1); + publisher.teardown(); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java index a170a0e..27213c3 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java @@ -27,7 +27,7 @@ import org.junit.Test; import org.slf4j.LoggerFactory; import com.datatorrent.contrib.helper.SourceModule; - +import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.LocalMode; @@ -45,7 +45,7 @@ public class RabbitMQOutputOperatorTest public int count = 0; private final String host = "localhost"; ConnectionFactory connFactory = new ConnectionFactory(); -// QueueingConsumer consumer = null; + // QueueingConsumer consumer = null; Connection connection = null; Channel channel = null; TracingConsumer tracingConsumer = null; @@ -64,8 +64,6 @@ public class RabbitMQOutputOperatorTest queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchange, ""); -// consumer = new QueueingConsumer(channel); -// channel.basicConsume(queueName, true, consumer); tracingConsumer = new TracingConsumer(channel); cTag = channel.basicConsume(queueName, true, tracingConsumer); } @@ -125,7 +123,6 @@ public class RabbitMQOutputOperatorTest } } - @Test public void testDag() throws InterruptedException, MalformedURLException, IOException, Exception { @@ -133,7 +130,7 @@ public class RabbitMQOutputOperatorTest runTest(testNum); logger.debug("end of test"); } - + protected void runTest(int testNum) throws IOException { RabbitMQMessageReceiver receiver = new RabbitMQMessageReceiver(); @@ -144,23 +141,22 @@ public class RabbitMQOutputOperatorTest SourceModule source = dag.addOperator("source", new SourceModule()); source.setTestNum(testNum); RabbitMQOutputOperator collector = dag.addOperator("generator", new RabbitMQOutputOperator()); - + collector.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + collector.setExchange("testEx"); dag.addStream("Stream", source.outPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL); final LocalMode.Controller lc = lma.getController(); lc.setHeartbeatMonitoringEnabled(false); lc.runAsync(); - try { + try { Thread.sleep(1000); long timeout = 10000L; long startTms = System.currentTimeMillis(); - while((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout)) - { + while ((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout)) { Thread.sleep(100); - } - } - catch (InterruptedException ex) { + } + } catch (InterruptedException ex) { Assert.fail(ex.getMessage()); } finally { lc.shutdown(); @@ -170,11 +166,9 @@ public class RabbitMQOutputOperatorTest for (Map.Entry<String, Integer> e : receiver.dataMap.entrySet()) { if (e.getKey().equals("a")) { Assert.assertEquals("emitted value for 'a' was ", new Integer(2), e.getValue()); - } - else if (e.getKey().equals("b")) { + } else if (e.getKey().equals("b")) { Assert.assertEquals("emitted value for 'b' was ", new Integer(20), e.getValue()); - } - else if (e.getKey().equals("c")) { + } else if (e.getKey().equals("c")) { Assert.assertEquals("emitted value for 'c' was ", new Integer(1000), e.getValue()); } }
