Repository: apex-core Updated Branches: refs/heads/master eaf041931 -> ce74fe78e
APEXCORE-510 enforce emit() on the operator thread Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/ce74fe78 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/ce74fe78 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/ce74fe78 Branch: refs/heads/master Commit: ce74fe78eb0dae5250bccbf1bb4ada755c3c2ebe Parents: eaf0419 Author: Sanjay Pujare <[email protected]> Authored: Thu Sep 29 05:53:06 2016 +0530 Committer: Vlad Rozov <[email protected]> Committed: Fri Oct 21 14:01:51 2016 -0700 ---------------------------------------------------------------------- .../com/datatorrent/api/DefaultOutputPort.java | 17 +++ .../datatorrent/api/DefaultOutputPortTest.java | 136 +++++++++++++++++++ 2 files changed, 153 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/ce74fe78/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java index 078a372..71be22c 100644 --- a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java +++ b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java @@ -18,6 +18,9 @@ */ package com.datatorrent.api; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.Operator.Unifier; @@ -31,7 +34,11 @@ import com.datatorrent.api.Operator.Unifier; */ public class DefaultOutputPort<T> implements Operator.OutputPort<T> { + public static final String THREAD_AFFINITY_DISABLE_CHECK = "com.datatorrent.api.DefaultOutputPort.thread.check.disable"; + private static final Logger logger = LoggerFactory.getLogger(DefaultOutputPort.class); + private transient Sink<Object> sink; + private transient Thread operatorThread; /** * <p>Constructor for DefaultOutputPort.</p> @@ -48,6 +55,12 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T> */ public void emit(T tuple) { + // operatorThread could be null if setup() never got called. + if (operatorThread != null && Thread.currentThread() != operatorThread) { + // only under certain modes: enforce this + throw new IllegalStateException("Current thread " + Thread.currentThread().getName() + + " is different from the operator thread " + operatorThread.getName()); + } sink.put(tuple); } @@ -88,6 +101,10 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T> @Override public void setup(PortContext context) { + if (Boolean.getBoolean(THREAD_AFFINITY_DISABLE_CHECK) == false) { + operatorThread = Thread.currentThread(); + logger.debug("Enforcing emit on {}", operatorThread.getName()); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/apex-core/blob/ce74fe78/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java ---------------------------------------------------------------------- diff --git a/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java b/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java new file mode 100644 index 0000000..43d04e5 --- /dev/null +++ b/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.api; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultOutputPortTest +{ + private static final Logger logger = LoggerFactory.getLogger(DefaultOutputPortTest.class); + + private DefaultOutputPort<Object> port; + private Sink<Object> sink; + + @Before + public void setupTest() + { + port = new DefaultOutputPort<>(); + sink = new Sink<Object>() + { + private volatile int count = 0; + + @Override + public void put(Object tuple) + { + count++; + } + + @Override + public int getCount(boolean reset) + { + return count; + } + }; + port.setSink(sink); + } + + /* + * Same thread for setup() and emit() + */ + @Test + public void testSameThreadForSetupAndEmit() + { + port.setup(null); + port.emit(null); + Assert.assertEquals(1, sink.getCount(false)); + // if it comes here it passes + } + + /* + * setup() not called : null thread object should not cause exception + */ + @Test + public void testSetupNotCalledAndEmit() + { + port.emit(null); + Assert.assertEquals(1, sink.getCount(false)); + // if it comes here it passes + } + + volatile boolean pass = false; + + /* + * Different thread for setup() and emit() + */ + @Test + public void testDifferentThreadForSetupAndEmit() throws InterruptedException + { + System.clearProperty(DefaultOutputPort.THREAD_AFFINITY_DISABLE_CHECK); // do not suppress the check + pass = false; + port.setup(null); + Thread thread = new Thread("test-thread-xyz") + { + @Override + public void run() + { + try { + port.emit(null); + } catch (IllegalStateException ise) { + pass = ise.getMessage().startsWith("Current thread test-thread-xyz is different from the operator thread "); + } + } + }; + thread.start(); + thread.join(); + Assert.assertTrue("same thread check didn't take place!", pass); + Assert.assertEquals(0, sink.getCount(false)); // no put() on sink + } + + /* + * Different thread for setup() and emit() but suppress check property set + */ + @Test + public void testDifferentThreadForSetupAndEmit_CheckSuppressed() throws InterruptedException + { + System.setProperty(DefaultOutputPort.THREAD_AFFINITY_DISABLE_CHECK, "true"); // suppress the check + port.setup(null); + pass = true; + Thread thread = new Thread() + { + @Override + public void run() + { + try { + port.emit(null); + } catch (IllegalStateException ise) { + pass = false; + } + } + }; + thread.start(); + thread.join(); + Assert.assertEquals("same thread check was not suppressed!", 1, sink.getCount(false)); + Assert.assertTrue("Exception was thrown!", pass); + System.clearProperty(DefaultOutputPort.THREAD_AFFINITY_DISABLE_CHECK); + } +}
