This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 5a0c853 AMQ-7353 - fix visibility of marshalledProperties to ensure
competing threads don't see partial objects in error. Little test case that
demonstrates the problem in isolation
5a0c853 is described below
commit 5a0c853ba0e2ade0b29fa4319857bd8a557995e4
Author: gtully <[email protected]>
AuthorDate: Tue Nov 26 16:58:30 2019 +0000
AMQ-7353 - fix visibility of marshalledProperties to ensure competing
threads don't see partial objects in error. Little test case that demonstrates
the problem in isolation
---
.../java/org/apache/activemq/ActiveMQSession.java | 2 +-
.../java/org/apache/activemq/command/Message.java | 2 +-
.../apache/activemq/command/VisibilityTest.java | 214 +++++++++++++++++++++
3 files changed, 216 insertions(+), 2 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 9b18d89..5910634 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -746,8 +746,8 @@ public class ActiveMQSession implements Session,
QueueSession, TopicSession, Sta
} finally {
connection.removeSession(this);
- this.transactionContext = null;
closed = true;
+ this.transactionContext = null;
}
}
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/command/Message.java
b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
index fca3b46..65b560d 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
@@ -79,7 +79,7 @@ public abstract class Message extends BaseCommand implements
MarshallAware, Mess
protected String userID;
protected ByteSequence content;
- protected ByteSequence marshalledProperties;
+ protected volatile ByteSequence marshalledProperties;
protected DataStructure dataStructure;
protected int redeliveryCounter;
diff --git
a/activemq-client/src/test/java/org/apache/activemq/command/VisibilityTest.java
b/activemq-client/src/test/java/org/apache/activemq/command/VisibilityTest.java
new file mode 100644
index 0000000..d0b362d
--- /dev/null
+++
b/activemq-client/src/test/java/org/apache/activemq/command/VisibilityTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.command;
+
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertFalse;
+
+// https://issues.apache.org/jira/browse/AMQ-7353
+
+public class VisibilityTest {
+
+ // seems to reproduce easily with a direct referene to bytesSequence
+ // a simpler message LocalMessage with less logic, reproduces ok.
+ // however adding more logic to
org.apache.activemq.command.VisibilityTest.LocalMessage.beforeMarshall will
throw it off.
+ // It could be down to cache lines, it is a brittle test - but it does
demonstrate the problem in theory
+ // an allocation in one thread may not be fully visible in another even
after the init has complete!
+ // I wanted to prove the need for the volatile to avoid the npe, doing the
extra work when it is not visible is fine
+ // but the NPE is a real problem when it happens.
+ static ActiveMQBytesMessage bytesMessage;
+ static ByteSequence byteSequence;
+ static LocalMessage localMessage;
+
+ static class LocalMessage {
+ public HashMap<String, Object> properties = new HashMap<>();
+ public /* the fix */ volatile ByteSequence marshalledProperties;
+ public int total;
+ public void setBooleanProperty(String name, boolean v) {
+ properties.put(name, v);
+ }
+
+ public void beforeMarshall(WireFormat ignored) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ // putting the following (real work) in and it won't reproduce
+ //DataOutputStream os = new DataOutputStream(baos);
+ //MarshallingSupport.marshalPrimitiveMap(properties, os);
+ //os.close();
+ total += properties.size();
+ marshalledProperties = baos.toByteSequence();
+ }
+
+ public ByteSequence getMarshalledProperties() {
+ return marshalledProperties;
+ }
+ }
+
+ public static int checkNull() {
+ ByteSequence local = byteSequence;
+ if (local != null) {
+ // if local is non null, the internal buffer may not be visible!
+ return local.getData().length;
+ }
+ return 0;
+ }
+
+ public static int checkNullReference() {
+ ActiveMQBytesMessage message = bytesMessage;
+ if (message != null) {
+ ByteSequence local = message.getMarshalledProperties();
+ if (local != null) {
+ // if local is non null, the internal buffer may not be
visible!
+ return local.getData().length;
+ }
+ }
+ return 0;
+ }
+
+ public static int checkNullReferenceOnLocalMessage() {
+ LocalMessage message = localMessage;
+ if (message != null) {
+ ByteSequence local = message.getMarshalledProperties();
+ if (local != null) {
+ // if local is non null, the internal buffer may not be
visible!
+ return local.getData().length;
+ }
+ }
+ return 0;
+ }
+
+ @Ignore
+ public void doTestNested() throws Exception {
+ final AtomicBoolean gotError = new AtomicBoolean();
+ final Thread tryingToMarshall = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ long total = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ total += checkNullReference();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ gotError.set(true);
+ }
+ }
+ System.out.println("from other thread " + total);
+ }
+ });
+ long len = 0;
+ tryingToMarshall.start();
+ for (int t = 0; t < 10; t++) {
+ for (int i = 0; i < 1000_000; i++) {
+ // real world
+ ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+ // needs non null properties to init marshalledProperties
+ message.setBooleanProperty("B", true);
+ message.beforeMarshall(null);
+ bytesMessage = message;
+ // local access after publish
+ len += message.getMarshalledProperties().getData().length;
+ }
+ }
+ tryingToMarshall.interrupt();
+ tryingToMarshall.join();
+ System.out.println(len);
+ assertFalse("no errors, no npe!", gotError.get());
+ }
+
+
+ @Test
+ public void doTestNestedLocalMessage() throws Exception {
+ final AtomicBoolean gotError = new AtomicBoolean();
+ final Thread tryingToMarshall = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ long total = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ total += checkNullReferenceOnLocalMessage();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ gotError.set(true);
+ }
+ }
+ System.out.println("from other thread " + total);
+ }
+ });
+ long len = 0;
+ tryingToMarshall.start();
+ for (int t = 0; t < 10; t++) {
+ for (int i = 0; i < 1000_000; i++) {
+ // real world
+ LocalMessage message = new LocalMessage();
+ // needs non null properties to init marshalledProperties
+ message.setBooleanProperty("B", true);
+ message.beforeMarshall(null);
+ localMessage = message;
+ // local access after publish
+ len += message.getMarshalledProperties().getData().length;
+ }
+ }
+ tryingToMarshall.interrupt();
+ tryingToMarshall.join();
+ System.out.println(len);
+ assertFalse("no errors, no npe!", gotError.get());
+ }
+
+ @Ignore
+ public void doTestDirect() throws Exception {
+ final AtomicBoolean gotError = new AtomicBoolean();
+ final Thread tryingToMarshall = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ long total = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ total += checkNull();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ gotError.set(true);
+ }
+ }
+ System.out.println("from other thread " + total);
+ }
+ });
+ long len = 0;
+ tryingToMarshall.start();
+ for (int t = 0; t < 10; t++) {
+ for (int i = 0; i < 1000_000; i++) {
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ byteSequence = byteArrayOutputStream.toByteSequence();
+ // local access after publish
+ len += byteSequence.getData().length;
+ }
+ }
+ tryingToMarshall.interrupt();
+ tryingToMarshall.join();
+ System.out.println(len);
+ assertFalse("no errors, no npe!", gotError.get());
+ }
+
+}
\ No newline at end of file