[JACOB-6] Add dsl support for synchronizing and sequencing processes
Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/2cf09b89 Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/2cf09b89 Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/2cf09b89 Branch: refs/heads/master Commit: 2cf09b89275609cd45c78081ae6eca9401166440 Parents: 0c2a708 Author: Hadrian Zbarcea <[email protected]> Authored: Thu Jan 16 09:32:48 2014 -0500 Committer: Hadrian Zbarcea <[email protected]> Committed: Thu Jan 16 09:32:48 2014 -0500 ---------------------------------------------------------------------- src/main/java/org/apache/ode/jacob/Jacob.java | 2 +- .../org/apache/ode/jacob/oo/ProcessUtil.java | 186 +++++++++++++ .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 3 +- .../jacob/examples/helloworld/HelloWorld.java | 2 +- .../ode/jacob/examples/sequence/Sequence.java | 2 +- .../apache/ode/jacob/oo/JacobChannelsTest.java | 5 +- .../ode/jacob/oo/SequentialProcessingTest.java | 269 +++++++++++++++++++ .../apache/ode/jacob/vpu/ChannelRefTest.java | 140 +++++----- src/test/resources/log4j.properties | 5 +- 9 files changed, 537 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/main/java/org/apache/ode/jacob/Jacob.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/Jacob.java b/src/main/java/org/apache/ode/jacob/Jacob.java index cc7570c..51967c0 100644 --- a/src/main/java/org/apache/ode/jacob/Jacob.java +++ b/src/main/java/org/apache/ode/jacob/Jacob.java @@ -41,7 +41,7 @@ public class Jacob { * * @param concretion the concretion of a process template */ - public static void instance(Runnable concretion) { + public static void instance(RunnableProcess concretion) { JacobVPU.activeJacobThread().instance(concretion); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java index b7e61a6..5b9fabd 100644 --- a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java +++ b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java @@ -18,10 +18,22 @@ */ package org.apache.ode.jacob.oo; +import java.util.Arrays; + +import org.apache.ode.jacob.RunnableProcess; import org.apache.ode.jacob.vpu.JacobVPU; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.ode.jacob.Jacob.instance; +import static org.apache.ode.jacob.Jacob.newChannel; +import static org.apache.ode.jacob.Jacob.object; public final class ProcessUtil { + // TODO: add more logging at TRACE level + private static final Logger LOG = LoggerFactory.getLogger(ProcessUtil.class); + private ProcessUtil() { // Utility class } @@ -49,4 +61,178 @@ public final class ProcessUtil { return new ReceiveProcess().setChannel(proxy).setReceiver(listener); } + /** + * + * @return A noop RunnableProcess + */ + public static RunnableProcess nil() { + return new Nil(); + } + + /** + * + * @param callback + * @return a RunnableProcess that wraps a return notification in a separate process + * + */ + public static RunnableProcess terminator(final Synch callback) { + return callback != null ? new Terminator(callback) : null; + } + + /** + * + * @param callback + * @param process + * @return + * + * Returns a synchronized process embedding the runnable process. Once the process finishes it + * will notify that on the callback return channel + */ + public static Synchronized sync(final Synch callback, final RunnableProcess process) { + return new SynchronizedWrapper(callback, process); + } + + /** + * + * @param callback + * @param process + * @return + * + * Intercepts the execution of a synchronized process and executes an interceptor before the + * termination of the process is actually signaled + */ + public static Synchronized intercept(final Synchronized process, final RunnableProcess interceptor) { + if (interceptor == null) { + return process; + } + Synch callback = newChannel(Synch.class, ""); + object(receive(callback, new InterceptorSynch(process.getCallback(), interceptor))); + process.setCallback(callback); + return process; + } + + /** + * + * @param processes + * @return a Synchronized process + * + * Ensures the sequential execution of processes + */ + public static Synchronized sequence(final RunnableProcess... processes) { + return sequence(null, processes); + } + + /** + * + * @param callback + * @param processes + * @return + * + * Ensures the sequential execution of processes. After the execution is complete a + * notification is sent to the callback channel + */ + public static Synchronized sequence(final Synch callback, final RunnableProcess... processes) { + return new SequenceProcess(callback, processes); + } + + // Helpers Process composers + + /** + * TODO: Document me + */ + public static class Nil extends RunnableProcess { + private static final long serialVersionUID = 1L; + public void run() { + // do nothing + } + } + + /** + * TODO: Document me + */ + public static class Terminator extends RunnableProcess { + private static final long serialVersionUID = 1L; + protected Synch callback; + public Terminator(final Synch callback) { + this.callback = callback; + } + public Synch getCallback() { + return callback; + } + public void run() { + callback.ret(); + } + } + + public static abstract class Synchronized extends RunnableProcess { + private static final long serialVersionUID = 1L; + protected Synch callback; + + public abstract void execute(); + + public Synchronized(final Synch callback) { + setCallback(callback); + } + public Synch getCallback() { + return callback; + } + public void setCallback(final Synch callback) { + this.callback = callback; + } + public void run() { + execute(); + if (callback != null) { + callback.ret(); + } + } + } + + public static class SynchronizedWrapper extends Synchronized { + private static final long serialVersionUID = 1L; + protected final RunnableProcess process; + public SynchronizedWrapper(final Synch callback, final RunnableProcess process) { + super(callback); + this.process = process; + } + public void execute() { + process.run(); + } + } + + public static final class InterceptorSynch implements Synch { + private static final long serialVersionUID = 1L; + protected final RunnableProcess interceptor; + private final Synch target; + public InterceptorSynch(final Synch target, final RunnableProcess interceptor) { + this.target = target; + this.interceptor = interceptor; + } + public void ret() { + instance(sync(target, interceptor)); + } + } + + public static final class SequenceProcess extends Synchronized { + private static final long serialVersionUID = 1L; + private final RunnableProcess[] processes; + + public SequenceProcess(final Synch callback, final RunnableProcess[] processes) { + super(callback); + this.processes = processes; + } + + public void execute() { + // can only sequence synchronized processes + final Synchronized current = ensureSynchronized(processes[0]); + instance(intercept(current, processes.length > 1 ? + sequence(this.callback, Arrays.copyOfRange(processes, 1, processes.length)) : + terminator(this.callback))); + this.callback = null; + } + + public Synchronized ensureSynchronized(RunnableProcess process) { + return process instanceof Synchronized ? (Synchronized)process : sync(null, process); + } + } + } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java index c709a95..72e33ce 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java +++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java @@ -27,6 +27,7 @@ import org.apache.ode.jacob.ChannelRef; import org.apache.ode.jacob.JacobThread; import org.apache.ode.jacob.Message; import org.apache.ode.jacob.MessageListener; +import org.apache.ode.jacob.RunnableProcess; import org.apache.ode.jacob.oo.Channel; import org.apache.ode.jacob.oo.ChannelListener; import org.apache.ode.jacob.oo.ClassUtil; @@ -175,7 +176,7 @@ public final class JacobVPU { * the injected process. This method is equivalent to the parallel operator, * but is intended to be used from outside of an active {@link JacobThread}. */ - public void inject(Runnable concretion) { + public void inject(RunnableProcess concretion) { LOG.debug("injecting {}", concretion); addReaction(concretion, ClassUtil.RUN_METHOD_ACTION, new Class[]{}, (LOG.isInfoEnabled() ? concretion.toString() : null)); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java index 4dac54a..5165f8b 100644 --- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java +++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java @@ -296,7 +296,7 @@ public class HelloWorld extends RunnableProcess { } @Override - protected Runnable doStep(int step, Synch done) { + protected RunnableProcess doStep(int step, Synch done) { return new SequenceItemEmitter(greetings[step], done, out); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java b/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java index 8ea152c..c7b7e6c 100644 --- a/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java +++ b/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java @@ -68,7 +68,7 @@ public abstract class Sequence extends RunnableProcess { * @param done notification after step completion * @return runnable process */ - protected abstract Runnable doStep(int step, Synch done); + protected abstract RunnableProcess doStep(int step, Synch done); public static class SequenceData { public int _steps; http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java b/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java index c68c7f4..e11a517 100644 --- a/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java +++ b/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java @@ -25,6 +25,7 @@ import static org.apache.ode.jacob.oo.ProcessUtil.receive; import java.util.ArrayList; import java.util.List; +import org.apache.ode.jacob.RunnableProcess; import org.apache.ode.jacob.soup.CommChannel; import org.apache.ode.jacob.vpu.ChannelFactory; import org.apache.ode.jacob.vpu.ExecutionQueueImpl; @@ -55,7 +56,7 @@ public class JacobChannelsTest { vpu.setContext(new ExecutionQueueImpl()); final List<String> result = new ArrayList<String>(); - vpu.inject(new Runnable() { + vpu.inject(new RunnableProcess() { public void run() { Val v = (Val)vpu.newChannel(Val.class, ""); object(receive(v, new Val() { @@ -88,7 +89,7 @@ public class JacobChannelsTest { vpu.setContext(new ExecutionQueueImpl()); final List<String> result = new ArrayList<String>(); - vpu.inject(new Runnable() { + vpu.inject(new RunnableProcess() { public void run() { Val v = (Val)vpu.newChannel(Val.class, ""); object(ProcessUtil.compose(receive(v, new Val() { http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java b/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java new file mode 100644 index 0000000..1bd938c --- /dev/null +++ b/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.oo; + + +import org.apache.ode.jacob.Jacob; +import org.apache.ode.jacob.RunnableProcess; +import org.apache.ode.jacob.oo.ProcessUtil.Synchronized; +import org.apache.ode.jacob.vpu.ExecutionQueueImpl; +import org.apache.ode.jacob.vpu.JacobVPU; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.ode.jacob.Jacob.instance; +import static org.apache.ode.jacob.Jacob.object; +import static org.apache.ode.jacob.oo.ProcessUtil.intercept; +import static org.apache.ode.jacob.oo.ProcessUtil.receive; +import static org.apache.ode.jacob.oo.ProcessUtil.sequence; +import static org.apache.ode.jacob.oo.ProcessUtil.sync; + + +public class SequentialProcessingTest { + + @SuppressWarnings("serial") + @Test + public void testParallelProcesses() { + final StringBuffer out = new StringBuffer(); + executeProcess(new RunnableProcess() { + public void run() { + instance(intercept(sync(null, atomicProcess("A", out)), atomicProcess("0", out))); + instance(intercept(sync(null, atomicProcess("B", out)), atomicProcess("1", out))); + } + }); + // parallelism is proven by process "B" being executed before "A0" and "A1" + Assert.assertEquals("AB01", out.toString()); + } + + @SuppressWarnings("serial") + @Test + public void testSynchronizeProcess() { + final StringBuffer out = new StringBuffer(); + executeProcess(new RunnableProcess() { + public void run() { + Synch c1 = Jacob.newChannel(Synch.class, ""); + object(receive(c1, new Synch() { + public void ret() { + out.append("x"); + } + })); + instance(sync(c1, atomicProcess("A", out))); + } + }); + // Return hook "x" is executed after process "A" + Assert.assertEquals("Ax", out.toString()); + } + + @SuppressWarnings("serial") + @Test + public void testSynchronizeSynchronizedProcess() { + final StringBuffer out = new StringBuffer(); + executeProcess(new RunnableProcess() { + public void run() { + Synch c1 = Jacob.newChannel(Synch.class, ""); + object(receive(c1, new Synch() { + public void ret() { + out.append("x"); + } + })); + Synch c2 = Jacob.newChannel(Synch.class, ""); + object(receive(c2, new Synch() { + public void ret() { + out.append("y"); + } + })); + Synchronized process = synchronizedProcess(c1, "S", out); + instance(sync(c2, process)); + } + }); + // Both return hooks "x" and "y" are executed after synchronized process "S" + Assert.assertEquals("Sxy", out.toString()); + } + + @SuppressWarnings("serial") + @Test + public void testInterceptProcess() { + final StringBuffer out = new StringBuffer(); + executeProcess(new RunnableProcess() { + public void run() { + Synch c1 = Jacob.newChannel(Synch.class, ""); + object(receive(c1, new Synch() { + public void ret() { + out.append("x"); + } + })); + instance(intercept(sync(c1, atomicProcess("A", out)), atomicProcess("B", out))); + } + }); + // Return interceptor "B" is executed after process "A", but before the hook "x" + Assert.assertEquals("ABx", out.toString()); + } + + @SuppressWarnings("serial") + @Test + public void testInterceptSynchronizedProcess() { + final StringBuffer out = new StringBuffer(); + executeProcess(new RunnableProcess() { + public void run() { + Synch c1 = Jacob.newChannel(Synch.class, ""); + object(receive(c1, new Synch() { + public void ret() { + out.append("x"); + } + })); + Synch c2 = Jacob.newChannel(Synch.class, ""); + object(receive(c2, new Synch() { + public void ret() { + out.append("y"); + } + })); + instance(intercept(sync(c1, atomicProcess("A", out)), sync(c2, atomicProcess("B", out)))); + } + }); + // Return interceptor "B" is executed after process "A", but before the hook "x" + Assert.assertEquals("AByx", out.toString()); + } + + @SuppressWarnings("serial") + @Test + public void testSimpleSequence() { + final StringBuffer out = new StringBuffer(); + executeProcess(new RunnableProcess() { + public void run() { + Jacob.instance(sequence( + atomicProcess("A", out), + atomicProcess("B", out), + atomicProcess("C", out))); + } + }); + // TODO: explain + Assert.assertEquals("ABC", out.toString()); + } + + @SuppressWarnings("serial") + @Test + public void testSynchronizedSequence() { + final StringBuffer out = new StringBuffer(); + executeProcess(new RunnableProcess() { + public void run() { + Synch c1 = Jacob.newChannel(Synch.class, ""); + object(receive(c1, new Synch() { + public void ret() { + out.append("x"); + } + })); + Jacob.instance(sequence(c1, + atomicProcess("A", out), + atomicProcess("B", out), + atomicProcess("C", out), + atomicProcess("D", out))); + } + }); + // TODO: explain + Assert.assertEquals("ABCDx", out.toString()); + } + + @SuppressWarnings("serial") + @Test + public void testTransitiveSequence() { + final StringBuffer out = new StringBuffer(); + executeProcess(new RunnableProcess() { + public void run() { + Synch c1 = Jacob.newChannel(Synch.class, ""); + object(receive(c1, new Synch() { + public void ret() { + out.append("x"); + } + })); + Jacob.instance(sequence(c1, + atomicProcess("A", out), + sequence(atomicProcess("B", out), atomicProcess("C", out)), + atomicProcess("D", out))); + } + }); + // TODO: explain + Assert.assertEquals("ABCDx", out.toString()); + } + + @SuppressWarnings("serial") + @Test + public void testSequenceComposition() { + final StringBuffer out = new StringBuffer(); + executeProcess(new RunnableProcess() { + public void run() { + Synch c1 = Jacob.newChannel(Synch.class, ""); + object(receive(c1, new Synch() { + public void ret() { + out.append("x"); + } + })); + Synch c2 = Jacob.newChannel(Synch.class, ""); + object(receive(c2, new Synch() { + public void ret() { + out.append("y"); + } + })); + // just test a more complex scenario once + Jacob.instance(sequence(c1, + sequence( + sequence( + atomicProcess("A", out), + atomicProcess("B", out), + atomicProcess("C", out)), + atomicProcess("D", out)), + atomicProcess("E", out), + sequence(c2, + atomicProcess("F", out), + sequence( + atomicProcess("G", out), + atomicProcess("H", out))))); + } + }); + // TODO: explain + Assert.assertEquals("ABCDEFGHxy", out.toString()); + } + + @SuppressWarnings("serial") + protected RunnableProcess atomicProcess(final String id, final StringBuffer out) { + return new RunnableProcess() { + public void run() { + out.append(id); + } + }; + } + + @SuppressWarnings("serial") + protected Synchronized synchronizedProcess(final Synch callback, final String id, final StringBuffer out) { + return new Synchronized(callback) { + public void execute() { + out.append(id); + } + }; + } + + protected void executeProcess(final RunnableProcess process) { + final JacobVPU vpu = new JacobVPU(); + vpu.setContext(new ExecutionQueueImpl()); + vpu.inject(process); + while (vpu.execute()) { + // keep doing it... + } + } + +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java b/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java index b50e45b..50ccad9 100644 --- a/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java +++ b/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java @@ -1,69 +1,71 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ode.jacob.vpu; - -import static org.apache.ode.jacob.Jacob.newCommChannel; -import static org.junit.Assert.*; -import org.apache.ode.jacob.ChannelRef; -import org.apache.ode.jacob.oo.Synch; -import org.apache.ode.jacob.oo.Val; -import org.apache.ode.jacob.soup.CommChannel; - -import org.junit.Test; - -public class ChannelRefTest { - - @Test - public void testConnectWithInterface() { - JacobVPU vpu = new JacobVPU(); - vpu.setContext(new ExecutionQueueImpl()); - - vpu.inject(new Runnable() { - @Override - public void run() { - ChannelRef cref = newCommChannel("unbound channel"); - CommChannel cchannel = cref.getEndpoint(CommChannel.class); - assertNotNull(cchannel); - assertNull(cchannel.getType()); - - // now connect it to Val.class - Val val = cref.getEndpoint(Val.class); - assertNotNull(val); - assertEquals(Val.class, cchannel.getType()); - - // now try to associate it with a different channel interface - try { - cref.getEndpoint(Synch.class); - fail("we should get an IllegalStateException"); - } catch (IllegalStateException e) { - assertEquals("ChannelRef is already associated with a channel of a different type", e.getMessage()); - } - - // now try to associate with the same channel - Val val2 = cref.getEndpoint(Val.class); - assertNotNull(val2); - assertSame(val, val2); - } - }); - - assertEquals(true, vpu.getContext().hasReactions()); - vpu.execute(); - assertEquals(false, vpu.getContext().hasReactions()); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.vpu; + +import static org.apache.ode.jacob.Jacob.newCommChannel; +import static org.junit.Assert.*; + +import org.apache.ode.jacob.ChannelRef; +import org.apache.ode.jacob.RunnableProcess; +import org.apache.ode.jacob.oo.Synch; +import org.apache.ode.jacob.oo.Val; +import org.apache.ode.jacob.soup.CommChannel; +import org.junit.Test; + +public class ChannelRefTest { + + @Test + public void testConnectWithInterface() { + JacobVPU vpu = new JacobVPU(); + vpu.setContext(new ExecutionQueueImpl()); + + vpu.inject(new RunnableProcess() { + @Override + public void run() { + ChannelRef cref = newCommChannel("unbound channel"); + CommChannel cchannel = cref.getEndpoint(CommChannel.class); + assertNotNull(cchannel); + assertNull(cchannel.getType()); + + // now connect it to Val.class + Val val = cref.getEndpoint(Val.class); + assertNotNull(val); + assertEquals(Val.class, cchannel.getType()); + + // now try to associate it with a different channel interface + try { + cref.getEndpoint(Synch.class); + fail("we should get an IllegalStateException"); + } catch (IllegalStateException e) { + assertEquals("ChannelRef is already associated with a channel of a different type", e.getMessage()); + } + + // now try to associate with the same channel + Val val2 = cref.getEndpoint(Val.class); + assertNotNull(val2); + assertSame(val, val2); + } + }); + + assertEquals(true, vpu.getContext().hasReactions()); + vpu.execute(); + assertEquals(false, vpu.getContext().hasReactions()); + } + +} http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index 91bd688..c1d9d01 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -16,10 +16,11 @@ # # Set root logger level to WARN and its only appender to CONSOLE -log4j.rootLogger=TRACE, file +log4j.rootLogger=INFO, file # log4j properties to work with command line tools. -log4j.category.org.apache.ode=TRACE +log4j.category.org.apache.ode=DEBUG +#log4j.category.org.apache.ode.jacob.oo=TRACE # Console appender log4j.appender.stdout=org.apache.log4j.ConsoleAppender
