[ 
https://issues.apache.org/jira/browse/BEAM-5759?focusedWorklogId=155400&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155400
 ]

ASF GitHub Bot logged work on BEAM-5759:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Oct/18 13:51
            Start Date: 17/Oct/18 13:51
    Worklog Time Spent: 10m 
      Work Description: jbonofre closed pull request #6702: [BEAM-5759] 
Ensuring JmsIO checkpoint state is accessed and modified safely
URL: https://github.com/apache/beam/pull/6702
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
 
b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
index 3f106ff3442..f33abd8b555 100644
--- 
a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
+++ 
b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
@@ -19,12 +19,17 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
 import javax.jms.Message;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Checkpoint for an unbounded JmsIO.Read. Consists of JMS destination name, 
and the latest message
@@ -33,25 +38,27 @@
 @DefaultCoder(AvroCoder.class)
 public class JmsCheckpointMark implements UnboundedSource.CheckpointMark {
 
-  private final List<Message> messages = new ArrayList<>();
-  private Instant oldestPendingTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+  private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+  private final State state = new State();
 
   public JmsCheckpointMark() {}
 
   protected List<Message> getMessages() {
-    return this.messages;
+    return state.getMessages();
   }
 
   protected void addMessage(Message message) throws Exception {
     Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-    if (currentMessageTimestamp.isBefore(oldestPendingTimestamp)) {
-      oldestPendingTimestamp = currentMessageTimestamp;
-    }
-    messages.add(message);
+    state.atomicWrite(
+        () -> {
+          state.updateOldestPendingTimestampIf(currentMessageTimestamp, 
Instant::isBefore);
+          state.addMessage(message);
+        });
   }
 
   protected Instant getOldestPendingTimestamp() {
-    return oldestPendingTimestamp;
+    return state.getOldestPendingTimestamp();
   }
 
   /**
@@ -61,17 +68,117 @@ protected Instant getOldestPendingTimestamp() {
    */
   @Override
   public void finalizeCheckpoint() {
-    for (Message message : messages) {
+    State snapshot = state.snapshot();
+    for (Message message : snapshot.messages) {
       try {
         message.acknowledge();
         Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
-        if (currentMessageTimestamp.isAfter(oldestPendingTimestamp)) {
-          oldestPendingTimestamp = currentMessageTimestamp;
-        }
+        snapshot.updateOldestPendingTimestampIf(currentMessageTimestamp, 
Instant::isAfter);
       } catch (Exception e) {
-        // nothing to do
+        LOG.error("Exception while finalizing message: {}", e);
+      }
+    }
+    state.atomicWrite(
+        () -> {
+          state.removeMessages(snapshot.messages);
+          
state.updateOldestPendingTimestampIf(snapshot.oldestPendingTimestamp, 
Instant::isAfter);
+        });
+  }
+
+  /**
+   * Encapsulates the state of a checkpoint mark; the list of messages pending 
finalisation and the
+   * oldest pending timestamp. Read/write-exclusive access is provided 
throughout, and constructs
+   * allowing multiple operations to be performed atomically -- i.e. performed 
within the context of
+   * a single lock operation -- are made available.
+   */
+  private class State {
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private final List<Message> messages;
+    private Instant oldestPendingTimestamp;
+
+    public State() {
+      this(new ArrayList<>(), BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    private State(List<Message> messages, Instant oldestPendingTimestamp) {
+      this.messages = messages;
+      this.oldestPendingTimestamp = oldestPendingTimestamp;
+    }
+
+    /**
+     * Create and return a copy of the current state.
+     *
+     * @return A new {@code State} instance which is a deep copy of the target 
instance at the time
+     *     of execution.
+     */
+    public State snapshot() {
+      return atomicRead(() -> new State(new ArrayList<>(messages), 
oldestPendingTimestamp));
+    }
+
+    public Instant getOldestPendingTimestamp() {
+      return atomicRead(() -> oldestPendingTimestamp);
+    }
+
+    public List<Message> getMessages() {
+      return atomicRead(() -> messages);
+    }
+
+    public void addMessage(Message message) {
+      atomicWrite(() -> messages.add(message));
+    }
+
+    public void removeMessages(List<Message> messages) {
+      atomicWrite(() -> this.messages.removeAll(messages));
+    }
+
+    /**
+     * Conditionally sets {@code oldestPendingTimestamp} to the value of the 
supplied {@code
+     * candidate}, iff the provided {@code check} yields true for the {@code 
candidate} when called
+     * with the existing {@code oldestPendingTimestamp} value.
+     *
+     * @param candidate The potential new value.
+     * @param check The comparison method to call on {@code candidate} passing 
the existing {@code
+     *     oldestPendingTimestamp} value as a parameter.
+     */
+    private void updateOldestPendingTimestampIf(
+        Instant candidate, BiFunction<Instant, Instant, Boolean> check) {
+      atomicWrite(
+          () -> {
+            if (check.apply(candidate, oldestPendingTimestamp)) {
+              oldestPendingTimestamp = candidate;
+            }
+          });
+    }
+
+    /**
+     * Call the provided {@link Supplier} under this State's read lock and 
return its result.
+     *
+     * @param operation The code to execute in the context of this State's 
read lock.
+     * @param <T> The return type of the provided {@link Supplier}.
+     * @return The value produced by the provided {@link Supplier}.
+     */
+    public <T> T atomicRead(Supplier<T> operation) {
+      lock.readLock().lock();
+      try {
+        return operation.get();
+      } finally {
+        lock.readLock().unlock();
+      }
+    }
+
+    /**
+     * Call the provided {@link Runnable} under this State's write lock.
+     *
+     * @param operation The code to execute in the context of this State's 
write lock.
+     */
+    public void atomicWrite(Runnable operation) {
+      lock.writeLock().lock();
+      try {
+        operation.run();
+      } finally {
+        lock.writeLock().unlock();
       }
     }
-    messages.clear();
   }
 }
diff --git 
a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java 
b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index ee5aa541c37..9dad9e6dde6 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -25,10 +25,13 @@
 import static org.junit.Assert.fail;
 
 import com.google.common.base.Throwables;
+import java.io.IOException;
+import java.lang.reflect.Proxy;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.function.Function;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -41,9 +44,11 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.security.AuthenticationUser;
 import org.apache.activemq.security.SimpleAuthenticationPlugin;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.util.Callback;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -91,7 +96,6 @@ public void startBroker() throws Exception {
     // username and password to use to connect to the broker.
     // This user has users privilege (able to browse, consume, produce, list 
destinations)
     users.add(new AuthenticationUser(USERNAME, PASSWORD, "users"));
-    users.add(new AuthenticationUser(USERNAME, PASSWORD, "users"));
     SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(users);
     BrokerPlugin[] plugins = new BrokerPlugin[] {plugin};
     broker.setPlugins(plugins);
@@ -329,6 +333,76 @@ public void testCheckpointMark() throws Exception {
     assertEquals(0, count(QUEUE));
   }
 
+  @Test
+  public void testCheckpointMarkSafety() throws Exception {
+
+    final int messagesToProcess = 100;
+
+    // we are using no prefetch here
+    // prefetch is an ActiveMQ feature: to make efficient use of network 
resources the broker
+    // utilizes a 'push' model to dispatch messages to consumers. However, in 
the case of our
+    // test, it means that we can have some latency between the 
receiveNoWait() method used by
+    // the consumer and the prefetch buffer populated by the broker. Using a 
prefetch to 0 means
+    // that the consumer will poll for message, which is exactly what we want 
for the test.
+    // We are also sending message acknowledgements synchronously to ensure 
that they are
+    // processed before any subsequent assertions.
+    Connection connection =
+        
connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME, 
PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+    // Fill the queue with messages
+    MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE));
+    for (int i = 0; i < messagesToProcess; i++) {
+      producer.send(session.createTextMessage("test " + i));
+    }
+    producer.close();
+    session.close();
+    connection.close();
+
+    // create a JmsIO.Read with a decorated ConnectionFactory which will 
introduce a delay in sending
+    // acknowledgements - this should help uncover threading issues around 
checkpoint management.
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(
+                withSlowAcks(connectionFactoryWithSyncAcksAndWithoutPrefetch, 
10))
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE);
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    // start the reader and move to the first record
+    assertTrue(reader.start());
+
+    // consume half the messages (NB: start already consumed the first message)
+    for (int i = 0; i < (messagesToProcess / 2) - 1; i++) {
+      assertTrue(reader.advance());
+    }
+
+    // the messages are still pending in the queue (no ACK yet)
+    assertEquals(messagesToProcess, count(QUEUE));
+
+    // we finalize the checkpoint for the already-processed messages while 
simultaneously consuming the remainder of
+    // messages from the queue
+    Thread runner =
+        new Thread(
+            () -> {
+              try {
+                for (int i = 0; i < messagesToProcess / 2; i++) {
+                  assertTrue(reader.advance());
+                }
+              } catch (IOException ex) {
+                throw new RuntimeException(ex);
+              }
+            });
+    runner.start();
+    reader.getCheckpointMark().finalizeCheckpoint();
+
+    // Concurrency issues would cause an exception to be thrown before this 
method exits, failing the test
+    runner.join();
+  }
+
   private int count(String queue) throws Exception {
     Connection connection = connectionFactory.createConnection(USERNAME, 
PASSWORD);
     connection.start();
@@ -355,4 +429,63 @@ public String mapMessage(Message message) throws Exception 
{
       return new String(bytes, StandardCharsets.UTF_8);
     }
   }
+
+  /*
+   * A utility method which replaces a ConnectionFactory with one where 
calling receiveNoWait() -- i.e. pulling a
+   * message -- will return a message with its acknowledgement callback 
decorated to include a sleep for a specified
+   * duration. This gives the effect of ensuring messages take at least {@code 
delay} milliseconds to be processed.
+   */
+  private ConnectionFactory withSlowAcks(ConnectionFactory factory, long 
delay) {
+    return proxyMethod(
+        factory,
+        ConnectionFactory.class,
+        "createConnection",
+        (Connection connection) ->
+            proxyMethod(
+                connection,
+                Connection.class,
+                "createSession",
+                (Session session) ->
+                    proxyMethod(
+                        session,
+                        Session.class,
+                        "createConsumer",
+                        (MessageConsumer consumer) ->
+                            proxyMethod(
+                                consumer,
+                                MessageConsumer.class,
+                                "receiveNoWait",
+                                (ActiveMQMessage message) -> {
+                                  final Callback originalCallback =
+                                      message.getAcknowledgeCallback();
+                                  message.setAcknowledgeCallback(
+                                      () -> {
+                                        Thread.sleep(delay);
+                                        originalCallback.execute();
+                                      });
+                                  return message;
+                                }))));
+  }
+
+  /*
+   * A utility method which decorates an existing object with a proxy instance 
adhering to a given interface, with the
+   * specified method name having its return value transformed by the provided 
function.
+   */
+  private <T, MethodArgT, MethodResultT> T proxyMethod(
+      T target,
+      Class<? super T> proxyInterface,
+      String methodName,
+      Function<MethodArgT, MethodResultT> resultTransformer) {
+    return (T)
+        Proxy.newProxyInstance(
+            this.getClass().getClassLoader(),
+            new Class[] {proxyInterface},
+            (proxy, method, args) -> {
+              Object result = method.invoke(target, args);
+              if (method.getName().equals(methodName)) {
+                result = resultTransformer.apply((MethodArgT) result);
+              }
+              return result;
+            });
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 155400)
    Time Spent: 40m  (was: 0.5h)

> ConcurrentModificationException on JmsIO checkpoint finalization
> ----------------------------------------------------------------
>
>                 Key: BEAM-5759
>                 URL: https://issues.apache.org/jira/browse/BEAM-5759
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-jms
>    Affects Versions: 2.8.0
>            Reporter: Andrew Fulton
>            Assignee: Andrew Fulton
>             Fix For: 2.9.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> When reading from a JmsIO source, a ConcurrentModificationException can be 
> thrown when checkpoint finalization occurs under heavy load.
> For example:
> {{jsonPayload: {}}
>  {{  exception: "java.util.ConcurrentModificationException}}
>  {{    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:903)}}
>  {{    at java.util.ArrayList$Itr.next(ArrayList.java:853)}}
>  {{    at 
> org.apache.beam.sdk.io.jms.JmsCheckpointMark.finalizeCheckpoint(JmsCheckpointMark.java:65)}}
>  {{    at 
> com.google.cloud.dataflow.worker.StreamingModeExecutionContext$1.run(StreamingModeExecutionContext.java:379)}}
>  {{    at 
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:846)}}
>  {{    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)}}
>  {{    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)}}
>  {{    at java.lang.Thread.run(Thread.java:745)}}
>  {{"}}
>  {{  job: "2018-09-27_08_55_18-6454085774348718625"   }}
>  {{  logger: "com.google.cloud.dataflow.worker.StreamingDataflowWorker"   }}
>  {{  message: "Source checkpoint finalization failed:"   }}
>  {{  thread: "309"   }}
>  {{  work: "<nil>"   }}
>  {{  worker: "test-andrew-092715504-09270855-tkfp-harness-dnmb"   }}
>  
> Looking at the JmsCheckpointMark code, it appears that access to the pending 
> message list is unprotected - thus if a thread calls finalizeCheckpoint while 
> a separate processing thread adds more messages to the checkpoint mark list 
> then an exception will be thrown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to