[
https://issues.apache.org/jira/browse/BEAM-5759?focusedWorklogId=155400&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155400
]
ASF GitHub Bot logged work on BEAM-5759:
----------------------------------------
Author: ASF GitHub Bot
Created on: 17/Oct/18 13:51
Start Date: 17/Oct/18 13:51
Worklog Time Spent: 10m
Work Description: jbonofre closed pull request #6702: [BEAM-5759]
Ensuring JmsIO checkpoint state is accessed and modified safely
URL: https://github.com/apache/beam/pull/6702
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
index 3f106ff3442..f33abd8b555 100644
---
a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
+++
b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
@@ -19,12 +19,17 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
import javax.jms.Message;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Checkpoint for an unbounded JmsIO.Read. Consists of JMS destination name,
and the latest message
@@ -33,25 +38,27 @@
@DefaultCoder(AvroCoder.class)
public class JmsCheckpointMark implements UnboundedSource.CheckpointMark {
- private final List<Message> messages = new ArrayList<>();
- private Instant oldestPendingTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ private static final Logger LOG =
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+ private final State state = new State();
public JmsCheckpointMark() {}
protected List<Message> getMessages() {
- return this.messages;
+ return state.getMessages();
}
protected void addMessage(Message message) throws Exception {
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
- if (currentMessageTimestamp.isBefore(oldestPendingTimestamp)) {
- oldestPendingTimestamp = currentMessageTimestamp;
- }
- messages.add(message);
+ state.atomicWrite(
+ () -> {
+ state.updateOldestPendingTimestampIf(currentMessageTimestamp,
Instant::isBefore);
+ state.addMessage(message);
+ });
}
protected Instant getOldestPendingTimestamp() {
- return oldestPendingTimestamp;
+ return state.getOldestPendingTimestamp();
}
/**
@@ -61,17 +68,117 @@ protected Instant getOldestPendingTimestamp() {
*/
@Override
public void finalizeCheckpoint() {
- for (Message message : messages) {
+ State snapshot = state.snapshot();
+ for (Message message : snapshot.messages) {
try {
message.acknowledge();
Instant currentMessageTimestamp = new
Instant(message.getJMSTimestamp());
- if (currentMessageTimestamp.isAfter(oldestPendingTimestamp)) {
- oldestPendingTimestamp = currentMessageTimestamp;
- }
+ snapshot.updateOldestPendingTimestampIf(currentMessageTimestamp,
Instant::isAfter);
} catch (Exception e) {
- // nothing to do
+ LOG.error("Exception while finalizing message: {}", e);
+ }
+ }
+ state.atomicWrite(
+ () -> {
+ state.removeMessages(snapshot.messages);
+
state.updateOldestPendingTimestampIf(snapshot.oldestPendingTimestamp,
Instant::isAfter);
+ });
+ }
+
+ /**
+ * Encapsulates the state of a checkpoint mark; the list of messages pending
finalisation and the
+ * oldest pending timestamp. Read/write-exclusive access is provided
throughout, and constructs
+ * allowing multiple operations to be performed atomically -- i.e. performed
within the context of
+ * a single lock operation -- are made available.
+ */
+ private class State {
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private final List<Message> messages;
+ private Instant oldestPendingTimestamp;
+
+ public State() {
+ this(new ArrayList<>(), BoundedWindow.TIMESTAMP_MIN_VALUE);
+ }
+
+ private State(List<Message> messages, Instant oldestPendingTimestamp) {
+ this.messages = messages;
+ this.oldestPendingTimestamp = oldestPendingTimestamp;
+ }
+
+ /**
+ * Create and return a copy of the current state.
+ *
+ * @return A new {@code State} instance which is a deep copy of the target
instance at the time
+ * of execution.
+ */
+ public State snapshot() {
+ return atomicRead(() -> new State(new ArrayList<>(messages),
oldestPendingTimestamp));
+ }
+
+ public Instant getOldestPendingTimestamp() {
+ return atomicRead(() -> oldestPendingTimestamp);
+ }
+
+ public List<Message> getMessages() {
+ return atomicRead(() -> messages);
+ }
+
+ public void addMessage(Message message) {
+ atomicWrite(() -> messages.add(message));
+ }
+
+ public void removeMessages(List<Message> messages) {
+ atomicWrite(() -> this.messages.removeAll(messages));
+ }
+
+ /**
+ * Conditionally sets {@code oldestPendingTimestamp} to the value of the
supplied {@code
+ * candidate}, iff the provided {@code check} yields true for the {@code
candidate} when called
+ * with the existing {@code oldestPendingTimestamp} value.
+ *
+ * @param candidate The potential new value.
+ * @param check The comparison method to call on {@code candidate} passing
the existing {@code
+ * oldestPendingTimestamp} value as a parameter.
+ */
+ private void updateOldestPendingTimestampIf(
+ Instant candidate, BiFunction<Instant, Instant, Boolean> check) {
+ atomicWrite(
+ () -> {
+ if (check.apply(candidate, oldestPendingTimestamp)) {
+ oldestPendingTimestamp = candidate;
+ }
+ });
+ }
+
+ /**
+ * Call the provided {@link Supplier} under this State's read lock and
return its result.
+ *
+ * @param operation The code to execute in the context of this State's
read lock.
+ * @param <T> The return type of the provided {@link Supplier}.
+ * @return The value produced by the provided {@link Supplier}.
+ */
+ public <T> T atomicRead(Supplier<T> operation) {
+ lock.readLock().lock();
+ try {
+ return operation.get();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Call the provided {@link Runnable} under this State's write lock.
+ *
+ * @param operation The code to execute in the context of this State's
write lock.
+ */
+ public void atomicWrite(Runnable operation) {
+ lock.writeLock().lock();
+ try {
+ operation.run();
+ } finally {
+ lock.writeLock().unlock();
}
}
- messages.clear();
}
}
diff --git
a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index ee5aa541c37..9dad9e6dde6 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -25,10 +25,13 @@
import static org.junit.Assert.fail;
import com.google.common.base.Throwables;
+import java.io.IOException;
+import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
+import java.util.function.Function;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -41,9 +44,11 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.util.Callback;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -91,7 +96,6 @@ public void startBroker() throws Exception {
// username and password to use to connect to the broker.
// This user has users privilege (able to browse, consume, produce, list
destinations)
users.add(new AuthenticationUser(USERNAME, PASSWORD, "users"));
- users.add(new AuthenticationUser(USERNAME, PASSWORD, "users"));
SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(users);
BrokerPlugin[] plugins = new BrokerPlugin[] {plugin};
broker.setPlugins(plugins);
@@ -329,6 +333,76 @@ public void testCheckpointMark() throws Exception {
assertEquals(0, count(QUEUE));
}
+ @Test
+ public void testCheckpointMarkSafety() throws Exception {
+
+ final int messagesToProcess = 100;
+
+ // we are using no prefetch here
+ // prefetch is an ActiveMQ feature: to make efficient use of network
resources the broker
+ // utilizes a 'push' model to dispatch messages to consumers. However, in
the case of our
+ // test, it means that we can have some latency between the
receiveNoWait() method used by
+ // the consumer and the prefetch buffer populated by the broker. Using a
prefetch to 0 means
+ // that the consumer will poll for message, which is exactly what we want
for the test.
+ // We are also sending message acknowledgements synchronously to ensure
that they are
+ // processed before any subsequent assertions.
+ Connection connection =
+
connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME,
PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ // Fill the queue with messages
+ MessageProducer producer =
session.createProducer(session.createQueue(QUEUE));
+ for (int i = 0; i < messagesToProcess; i++) {
+ producer.send(session.createTextMessage("test " + i));
+ }
+ producer.close();
+ session.close();
+ connection.close();
+
+ // create a JmsIO.Read with a decorated ConnectionFactory which will
introduce a delay in sending
+ // acknowledgements - this should help uncover threading issues around
checkpoint management.
+ JmsIO.Read spec =
+ JmsIO.read()
+ .withConnectionFactory(
+ withSlowAcks(connectionFactoryWithSyncAcksAndWithoutPrefetch,
10))
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE);
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+ JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+ // start the reader and move to the first record
+ assertTrue(reader.start());
+
+ // consume half the messages (NB: start already consumed the first message)
+ for (int i = 0; i < (messagesToProcess / 2) - 1; i++) {
+ assertTrue(reader.advance());
+ }
+
+ // the messages are still pending in the queue (no ACK yet)
+ assertEquals(messagesToProcess, count(QUEUE));
+
+ // we finalize the checkpoint for the already-processed messages while
simultaneously consuming the remainder of
+ // messages from the queue
+ Thread runner =
+ new Thread(
+ () -> {
+ try {
+ for (int i = 0; i < messagesToProcess / 2; i++) {
+ assertTrue(reader.advance());
+ }
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ runner.start();
+ reader.getCheckpointMark().finalizeCheckpoint();
+
+ // Concurrency issues would cause an exception to be thrown before this
method exits, failing the test
+ runner.join();
+ }
+
private int count(String queue) throws Exception {
Connection connection = connectionFactory.createConnection(USERNAME,
PASSWORD);
connection.start();
@@ -355,4 +429,63 @@ public String mapMessage(Message message) throws Exception
{
return new String(bytes, StandardCharsets.UTF_8);
}
}
+
+ /*
+ * A utility method which replaces a ConnectionFactory with one where
calling receiveNoWait() -- i.e. pulling a
+ * message -- will return a message with its acknowledgement callback
decorated to include a sleep for a specified
+ * duration. This gives the effect of ensuring messages take at least {@code
delay} milliseconds to be processed.
+ */
+ private ConnectionFactory withSlowAcks(ConnectionFactory factory, long
delay) {
+ return proxyMethod(
+ factory,
+ ConnectionFactory.class,
+ "createConnection",
+ (Connection connection) ->
+ proxyMethod(
+ connection,
+ Connection.class,
+ "createSession",
+ (Session session) ->
+ proxyMethod(
+ session,
+ Session.class,
+ "createConsumer",
+ (MessageConsumer consumer) ->
+ proxyMethod(
+ consumer,
+ MessageConsumer.class,
+ "receiveNoWait",
+ (ActiveMQMessage message) -> {
+ final Callback originalCallback =
+ message.getAcknowledgeCallback();
+ message.setAcknowledgeCallback(
+ () -> {
+ Thread.sleep(delay);
+ originalCallback.execute();
+ });
+ return message;
+ }))));
+ }
+
+ /*
+ * A utility method which decorates an existing object with a proxy instance
adhering to a given interface, with the
+ * specified method name having its return value transformed by the provided
function.
+ */
+ private <T, MethodArgT, MethodResultT> T proxyMethod(
+ T target,
+ Class<? super T> proxyInterface,
+ String methodName,
+ Function<MethodArgT, MethodResultT> resultTransformer) {
+ return (T)
+ Proxy.newProxyInstance(
+ this.getClass().getClassLoader(),
+ new Class[] {proxyInterface},
+ (proxy, method, args) -> {
+ Object result = method.invoke(target, args);
+ if (method.getName().equals(methodName)) {
+ result = resultTransformer.apply((MethodArgT) result);
+ }
+ return result;
+ });
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 155400)
Time Spent: 40m (was: 0.5h)
> ConcurrentModificationException on JmsIO checkpoint finalization
> ----------------------------------------------------------------
>
> Key: BEAM-5759
> URL: https://issues.apache.org/jira/browse/BEAM-5759
> Project: Beam
> Issue Type: Bug
> Components: io-java-jms
> Affects Versions: 2.8.0
> Reporter: Andrew Fulton
> Assignee: Andrew Fulton
> Fix For: 2.9.0
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> When reading from a JmsIO source, a ConcurrentModificationException can be
> thrown when checkpoint finalization occurs under heavy load.
> For example:
> {{jsonPayload: {}}
> {{ exception: "java.util.ConcurrentModificationException}}
> {{ at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:903)}}
> {{ at java.util.ArrayList$Itr.next(ArrayList.java:853)}}
> {{ at
> org.apache.beam.sdk.io.jms.JmsCheckpointMark.finalizeCheckpoint(JmsCheckpointMark.java:65)}}
> {{ at
> com.google.cloud.dataflow.worker.StreamingModeExecutionContext$1.run(StreamingModeExecutionContext.java:379)}}
> {{ at
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:846)}}
> {{ at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)}}
> {{ at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)}}
> {{ at java.lang.Thread.run(Thread.java:745)}}
> {{"}}
> {{ job: "2018-09-27_08_55_18-6454085774348718625" }}
> {{ logger: "com.google.cloud.dataflow.worker.StreamingDataflowWorker" }}
> {{ message: "Source checkpoint finalization failed:" }}
> {{ thread: "309" }}
> {{ work: "<nil>" }}
> {{ worker: "test-andrew-092715504-09270855-tkfp-harness-dnmb" }}
>
> Looking at the JmsCheckpointMark code, it appears that access to the pending
> message list is unprotected - thus if a thread calls finalizeCheckpoint while
> a separate processing thread adds more messages to the checkpoint mark list
> then an exception will be thrown.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)