Repository: samza
Updated Branches:
  refs/heads/master e87165d40 -> 29ecae891


SAMZA-1537: StreamAppender can deadlock due to locks held by Kafka an…

…d Log4j

Author: Jacob Maes <[email protected]>

Reviewers: Jagadish <[email protected]>,Yi Pan (Data Infrastructure) 
<[email protected]>

Closes #388 from jmakes/async-stream-appender


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/29ecae89
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/29ecae89
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/29ecae89

Branch: refs/heads/master
Commit: 29ecae891a2c9158b59e81d060234504d6844f70
Parents: e87165d
Author: Jacob Maes <[email protected]>
Authored: Thu Dec 21 11:43:09 2017 -0800
Committer: Jacob Maes <--global>
Committed: Thu Dec 21 11:43:09 2017 -0800

----------------------------------------------------------------------
 .../samza/logging/log4j/StreamAppender.java     | 109 ++++++++++++--
 .../logging/log4j/StreamAppenderMetrics.java    |  43 ++++++
 .../samza/logging/log4j/MockSystemProducer.java |  12 +-
 .../samza/logging/log4j/TestStreamAppender.java | 144 +++++++++++++++----
 4 files changed, 270 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/29ecae89/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 8436835..0ea8b68 100644
--- 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -20,9 +20,13 @@
 package org.apache.samza.logging.log4j;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
@@ -34,6 +38,7 @@ import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
@@ -54,6 +59,12 @@ public class StreamAppender extends AppenderSkeleton {
   private static final String JAVA_OPTS_CONTAINER_NAME = 
"samza.container.name";
   private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
   private static final String SOURCE = "log4j-log";
+
+  protected static final int DEFAULT_QUEUE_SIZE = 100;
+  private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
+
+  protected static volatile boolean systemInitialized = false;
+
   private Config config = null;
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
@@ -62,7 +73,12 @@ public class StreamAppender extends AppenderSkeleton {
   private boolean isApplicationMaster = false;
   private Serde<LoggingEvent> serde = null;
   private Logger log = Logger.getLogger(StreamAppender.class);
-  protected static volatile boolean systemInitialized = false;
+  protected StreamAppenderMetrics metrics;
+
+  private final BlockingQueue<byte[]> logQueue = new 
LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
+  protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
+
+  private Thread transferThread;
 
   /**
    * used to detect if this thread is called recursively
@@ -87,6 +103,7 @@ public class StreamAppender extends AppenderSkeleton {
           ". This is used as the key for the log appender, so can't proceed.");
     }
     key = containerName; // use the container name as the key for the logs
+
     // StreamAppender has to wait until the JobCoordinator is up when the log 
is in the AM
     if (isApplicationMaster) {
       systemInitialized = false;
@@ -110,9 +127,32 @@ public class StreamAppender extends AppenderSkeleton {
             log.trace("Waiting for the JobCoordinator to be instantiated...");
           }
         } else {
-          OutgoingMessageEnvelope outgoingMessageEnvelope =
-              new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), 
serde.toBytes(subLog(event)));
-          systemProducer.send(SOURCE, outgoingMessageEnvelope);
+          // Serialize the event before adding to the queue to leverage the 
caller thread
+          // and ensure that the transferThread can keep up.
+          if (!logQueue.offer(serde.toBytes(subLog(event)), queueTimeoutS, 
TimeUnit.SECONDS)) {
+            // Do NOT retry adding to the queue. Dropping the event allows us 
to alleviate the unlikely
+            // possibility of a deadlock, which can arise due to a circular 
dependency between the SystemProducer
+            // which is used for StreamAppender and the log, which uses 
StreamAppender. Any locks held in the callstack
+            // of those two code paths can cause a deadlock. Dropping the 
event allows us to proceed.
+
+            // Scenario:
+            // T1: holds L1 and is waiting for L2
+            // T2: holds L2 and is waiting to produce to BQ1 which is drained 
by T3 (SystemProducer) which is waiting for L1
+
+            // This has happened due to locks in Kafka and log4j (see 
SAMZA-1537), which are both out of our control,
+            // so dropping events in the StreamAppender is our best recourse.
+
+            // Drain the queue instead of dropping one message just to reduce 
the frequency of warn logs above.
+            int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // 
+1 because of the current log event
+            log.warn(String.format("Exceeded timeout %ss while trying to log 
to %s. Dropping %d log messages.",
+                queueTimeoutS,
+                systemStream.toString(),
+                messagesDropped));
+
+            // Emit a metric which can be monitored to ensure it doesn't 
happen often.
+            metrics.logMessagesDropped.inc(messagesDropped);
+          }
+          metrics.bufferFillPct.set(Math.round(100 * logQueue.size() / 
DEFAULT_QUEUE_SIZE));
         }
       } catch (Exception e) {
         System.err.println("[StreamAppender] Error sending log message:");
@@ -120,6 +160,8 @@ public class StreamAppender extends AppenderSkeleton {
       } finally {
         recursiveCall.set(false);
       }
+    } else if (metrics != null) { // setupSystem() may not have been invoked 
yet so metrics can be null here.
+      metrics.recursiveCalls.inc();
     }
   }
 
@@ -142,6 +184,13 @@ public class StreamAppender extends AppenderSkeleton {
     log.info("Shutting down the StreamAppender...");
     if (!this.closed) {
       this.closed = true;
+      transferThread.interrupt();
+      try {
+        transferThread.join();
+      } catch (InterruptedException e) {
+        log.error("Interrupted while waiting for sink thread to finish.", e);
+      }
+
       flushSystemProducer();
       if (systemProducer !=  null) {
         systemProducer.stop();
@@ -169,7 +218,7 @@ public class StreamAppender extends AppenderSkeleton {
    * @return Config the config of this container
    */
   protected Config getConfig() {
-    Config config = null;
+    Config config;
 
     try {
       if (isApplicationMaster) {
@@ -196,6 +245,10 @@ public class StreamAppender extends AppenderSkeleton {
       streamName = getStreamName(log4jSystemConfig.getJobName(), 
log4jSystemConfig.getJobId());
     }
 
+    // TODO we need the ACTUAL metrics registry, or the metrics won't get 
reported by the metric reporters!
+    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
+    metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+
     String systemName = log4jSystemConfig.getSystemName();
     String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
     if (systemFactoryName != null) {
@@ -206,13 +259,51 @@ public class StreamAppender extends AppenderSkeleton {
 
     setSerde(log4jSystemConfig, systemName, streamName);
 
-    systemProducer = systemFactory.getProducer(systemName, config, new 
MetricsRegistryMap());
+    systemProducer = systemFactory.getProducer(systemName, config, 
metricsRegistry);
     systemStream = new SystemStream(systemName, streamName);
     systemProducer.register(SOURCE);
     systemProducer.start();
 
     log.info(SOURCE + " has been registered in " + systemName + ". So all the 
logs will be sent to " + streamName
         + " in " + systemName + ". Logs are partitioned by " + key);
+
+    startTransferThread();
+  }
+
+  private void startTransferThread() {
+
+    try {
+      // Serialize the key once, since we will use it for every event.
+      final byte[] keyBytes = key.getBytes("UTF-8");
+
+      Runnable transferFromQueueToSystem = () -> {
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            byte[] serializedLogEvent = logQueue.take();
+
+            OutgoingMessageEnvelope outgoingMessageEnvelope =
+                new OutgoingMessageEnvelope(systemStream, keyBytes, 
serializedLogEvent);
+            systemProducer.send(SOURCE, outgoingMessageEnvelope);
+
+          } catch (InterruptedException e) {
+            // Preserve the interrupted status for the loop condition.
+            Thread.currentThread().interrupt();
+          } catch (Throwable t) {
+            log.error("Error sending StreamAppender event to SystemProducer", 
t);
+          }
+        }
+      };
+
+      transferThread = new Thread(transferFromQueueToSystem);
+      transferThread.setDaemon(true);
+      transferThread.setName("Samza StreamAppender Producer " + 
transferThread.getName());
+      transferThread.start();
+
+    } catch (UnsupportedEncodingException e) {
+      throw new SamzaException(String.format(
+          "Container name: %s could not be encoded to bytes. StreamAppender 
cannot proceed.", key),
+          e);
+    }
   }
 
   protected static String getStreamName(String jobName, String jobId) {
@@ -228,7 +319,7 @@ public class StreamAppender extends AppenderSkeleton {
 
   /**
    * set the serde for this appender. It looks for the stream serde first, 
then system serde.
-   * If still can not get the serde, throws exceptions. 
+   * If still can not get the serde, throws exceptions.
    *
    * @param log4jSystemConfig log4jSystemConfig for this appender
    * @param systemName name of the system
@@ -254,7 +345,7 @@ public class StreamAppender extends AppenderSkeleton {
 
   /**
    * Returns the serde that is being used for the stream appender.
-   * 
+   *
    * @return The Serde&lt;LoggingEvent&gt; that the appender is using.
    */
   public Serde<LoggingEvent> getSerde() {

http://git-wip-us.apache.org/repos/asf/samza/blob/29ecae89/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppenderMetrics.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppenderMetrics.java
 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppenderMetrics.java
new file mode 100644
index 0000000..0273cb5
--- /dev/null
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppenderMetrics.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.logging.log4j;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+public class StreamAppenderMetrics extends MetricsBase {
+  /** The percentage of the log queue capacity that is currently filled with 
messages from 0 to 100. */
+  public final Gauge<Integer> bufferFillPct;
+
+  /** The number of recursive calls to the StreamAppender. These events will 
not be logged. */
+  public final Counter recursiveCalls;
+
+  /** The number of log messages dropped e.g. because of buffer overflow. Does 
not include recursive calls. */
+  public final Counter logMessagesDropped;
+
+  public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
+    super(prefix, registry);
+    bufferFillPct = newGauge("buffer-fill-percent", 0);
+    recursiveCalls = newCounter("recursive-calls");
+    logMessagesDropped = newCounter("log-messages-dropped");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/29ecae89/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java
 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java
index 8d99094..fc8de0b 100644
--- 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java
+++ 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java
@@ -21,13 +21,15 @@ package org.apache.samza.logging.log4j;
 
 import java.util.ArrayList;
 
+import java.util.List;
 import org.apache.log4j.Logger;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 
 public class MockSystemProducer implements SystemProducer {
-  static public ArrayList<Object> messagesReceived = new ArrayList<Object>();
-  static private Logger log = Logger.getLogger(MockSystemProducer.class);
+  public static ArrayList<Object> messagesReceived = new ArrayList<>();
+  private static Logger log = Logger.getLogger(MockSystemProducer.class);
+  public static List<MockSystemProducerListener> listeners = new ArrayList<>();
 
   @Override
   public void start() {
@@ -45,9 +47,15 @@ public class MockSystemProducer implements SystemProducer {
   @Override
   public void send(String source, OutgoingMessageEnvelope envelope) {
     messagesReceived.add(envelope.getMessage());
+
+    listeners.forEach((listener) -> listener.onSend(source, envelope));
   }
 
   @Override
   public void flush(String source) {
   }
+
+  public interface MockSystemProducerListener {
+    void onSend(String source, OutgoingMessageEnvelope envelope);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/29ecae89/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index 59669b7..d93c5d1 100644
--- 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -21,9 +21,14 @@ package org.apache.samza.logging.log4j;
 
 import static org.junit.Assert.*;
 
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 import org.apache.samza.config.Config;
@@ -31,12 +36,20 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde;
 import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde;
 import 
org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
+import org.junit.After;
 import org.junit.Test;
 
 public class TestStreamAppender {
 
   static Logger log = Logger.getLogger(TestStreamAppender.class);
 
+  @After
+  public void tearDown() {
+    log.removeAllAppenders();
+    MockSystemProducer.listeners.clear();
+    MockSystemProducer.messagesReceived.clear();
+  }
+
   @Test
   public void testDefaultSerde() {
     System.setProperty("samza.container.name", "samza-container-1");
@@ -70,7 +83,7 @@ public class TestStreamAppender {
   }
 
   @Test
-  public void testSystemProducerAppenderInContainer() {
+  public void testSystemProducerAppenderInContainer() throws 
InterruptedException {
     System.setProperty("samza.container.name", "samza-container-1");
 
     MockSystemProducerAppender systemProducerAppender = new 
MockSystemProducerAppender();
@@ -79,22 +92,13 @@ public class TestStreamAppender {
     systemProducerAppender.setLayout(layout);
     systemProducerAppender.activateOptions();
     log.addAppender(systemProducerAppender);
-    log.info("testing");
-    log.info("testing2");
-
-    systemProducerAppender.flushSystemProducer();
-
-    assertEquals(2, MockSystemProducer.messagesReceived.size());
-    assertTrue(new String((byte[]) 
MockSystemProducer.messagesReceived.get(0)).contains("\"message\":\"testing\""));
-    assertTrue(new String((byte[]) 
MockSystemProducer.messagesReceived.get(1)).contains("\"message\":\"testing2\""));
 
-    // reset
-    log.removeAllAppenders();
-    MockSystemProducer.messagesReceived.clear();
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    logAndVerifyMessages(messages);
   }
 
   @Test
-  public void testSystemProducerAppenderInAM() {
+  public void testSystemProducerAppenderInAM() throws InterruptedException {
     System.setProperty("samza.container.name", "samza-job-coordinator");
 
     MockSystemProducerAppender systemProducerAppender = new 
MockSystemProducerAppender();
@@ -104,26 +108,112 @@ public class TestStreamAppender {
     systemProducerAppender.activateOptions();
     log.addAppender(systemProducerAppender);
 
-    log.info("no-received");
-    systemProducerAppender.flushSystemProducer();
-    // it should not receive anything because the system is not setup
-    assertEquals(0, MockSystemProducer.messagesReceived.size());
+    log.info("no-received"); // System isn't initialized yet, so this message 
should be dropped
 
     systemProducerAppender.setupSystem();
     MockSystemProducerAppender.systemInitialized = true;
 
-    log.info("testing3");
-    log.info("testing4");
-    systemProducerAppender.flushSystemProducer();
+    List<String> messages = Lists.newArrayList("testing3", "testing4");
+    logAndVerifyMessages(messages);
+  }
 
-    // be able to received msgs now
-    assertEquals(2, MockSystemProducer.messagesReceived.size());
-    assertTrue(new String((byte[]) 
MockSystemProducer.messagesReceived.get(0)).contains("\"message\":\"testing3\""));
-    assertTrue(new String((byte[]) 
MockSystemProducer.messagesReceived.get(1)).contains("\"message\":\"testing4\""));
+  @Test
+  public void testExceptionsDoNotKillTransferThread() throws 
InterruptedException {
+    System.setProperty("samza.container.name", "samza-container-1");
 
-    // reset
-    log.removeAllAppenders();
-    MockSystemProducer.messagesReceived.clear();
+    MockSystemProducerAppender systemProducerAppender = new 
MockSystemProducerAppender();
+    PatternLayout layout = new PatternLayout();
+    layout.setConversionPattern("%m");
+    systemProducerAppender.setLayout(layout);
+    systemProducerAppender.activateOptions();
+    log.addAppender(systemProducerAppender);
+
+    List<String> messages = Lists.newArrayList("testing5", "testing6", 
"testing7");
+
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> {
+        allMessagesSent.countDown();
+        if (allMessagesSent.getCount() == messages.size() - 1) {
+          throw new RuntimeException(); // Throw on the first message
+        }
+      });
+
+    // Log the messages
+    messages.forEach((message) -> log.info(message));
+
+    // Wait for messages
+    assertTrue("Thread did not send all messages. Count: " + 
allMessagesSent.getCount(),
+        allMessagesSent.await(60, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testQueueTimeout() throws InterruptedException {
+    System.setProperty("samza.container.name", "samza-container-1");
+
+    MockSystemProducerAppender systemProducerAppender = new 
MockSystemProducerAppender();
+    systemProducerAppender.queueTimeoutS = 1;
+    PatternLayout layout = new PatternLayout();
+    layout.setConversionPattern("%m");
+    systemProducerAppender.setLayout(layout);
+    systemProducerAppender.activateOptions();
+    log.addAppender(systemProducerAppender);
+
+    int extraMessageCount = 5;
+    int expectedMessagesSent = extraMessageCount - 1; // -1 because when the 
queue is drained there is one additional message that couldn't be added
+    List<String> messages = new ArrayList<>(StreamAppender.DEFAULT_QUEUE_SIZE 
+ extraMessageCount);
+    for (int i = 0; i < StreamAppender.DEFAULT_QUEUE_SIZE + extraMessageCount; 
i++) {
+      messages.add(String.valueOf(i));
+    }
+
+    // Set up latch
+    final CountDownLatch allMessagesSent = new 
CountDownLatch(expectedMessagesSent); // We expect to drop all but the extra 
messages
+    final CountDownLatch waitForTimeout = new CountDownLatch(1);
+    MockSystemProducer.listeners.add((source, envelope) -> {
+        allMessagesSent.countDown();
+        try {
+          waitForTimeout.await();
+        } catch (InterruptedException e) {
+          fail("Test could not run properly because of a thread interrupt.");
+        }
+      });
+
+    // Log the messages. This is where the timeout will happen!
+    messages.forEach((message) -> log.info(message));
+
+    assertEquals(messages.size() - expectedMessagesSent, 
systemProducerAppender.metrics.logMessagesDropped.getCount());
+
+    // Allow all the rest of the messages to send.
+    waitForTimeout.countDown();
+
+    // Wait for messages
+    assertTrue("Thread did not send all messages. Count: " + 
allMessagesSent.getCount(),
+        allMessagesSent.await(60, TimeUnit.SECONDS));
+    assertEquals(expectedMessagesSent, 
MockSystemProducer.messagesReceived.size());
+  }
+
+  private void logAndVerifyMessages(List<String> messages) throws 
InterruptedException {
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> 
allMessagesSent.countDown());
+
+    // Log the messages
+    messages.forEach((message) -> log.info(message));
+
+    // Wait for messages
+    assertTrue("Timeout while waiting for StreamAppender to send all messages. 
Count: " + allMessagesSent.getCount(),
+        allMessagesSent.await(60, TimeUnit.SECONDS));
+
+    // Verify
+    assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
+    for (int i = 0; i < messages.size(); i++) {
+      assertTrue("Message mismatch at index " + i,
+          new String((byte[]) 
MockSystemProducer.messagesReceived.get(i)).contains(asJsonMessageSegment(messages.get(i))));
+    }
+  }
+
+  private String asJsonMessageSegment(String message) {
+    return String.format("\"message\":\"%s\"", message);
   }
 
   /**

Reply via email to