[JACOB-6] Add dsl support for synchronizing and sequencing processes

Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/2cf09b89
Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/2cf09b89
Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/2cf09b89

Branch: refs/heads/master
Commit: 2cf09b89275609cd45c78081ae6eca9401166440
Parents: 0c2a708
Author: Hadrian Zbarcea <[email protected]>
Authored: Thu Jan 16 09:32:48 2014 -0500
Committer: Hadrian Zbarcea <[email protected]>
Committed: Thu Jan 16 09:32:48 2014 -0500

----------------------------------------------------------------------
 src/main/java/org/apache/ode/jacob/Jacob.java   |   2 +-
 .../org/apache/ode/jacob/oo/ProcessUtil.java    | 186 +++++++++++++
 .../java/org/apache/ode/jacob/vpu/JacobVPU.java |   3 +-
 .../jacob/examples/helloworld/HelloWorld.java   |   2 +-
 .../ode/jacob/examples/sequence/Sequence.java   |   2 +-
 .../apache/ode/jacob/oo/JacobChannelsTest.java  |   5 +-
 .../ode/jacob/oo/SequentialProcessingTest.java  | 269 +++++++++++++++++++
 .../apache/ode/jacob/vpu/ChannelRefTest.java    | 140 +++++-----
 src/test/resources/log4j.properties             |   5 +-
 9 files changed, 537 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/main/java/org/apache/ode/jacob/Jacob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/Jacob.java 
b/src/main/java/org/apache/ode/jacob/Jacob.java
index cc7570c..51967c0 100644
--- a/src/main/java/org/apache/ode/jacob/Jacob.java
+++ b/src/main/java/org/apache/ode/jacob/Jacob.java
@@ -41,7 +41,7 @@ public class Jacob {
      *
      * @param concretion the concretion of a process template
      */
-    public static void instance(Runnable concretion) {
+    public static void instance(RunnableProcess concretion) {
         JacobVPU.activeJacobThread().instance(concretion);
     }
 

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java 
b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java
index b7e61a6..5b9fabd 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java
@@ -18,10 +18,22 @@
  */
 package org.apache.ode.jacob.oo;
 
+import java.util.Arrays;
+
+import org.apache.ode.jacob.RunnableProcess;
 import org.apache.ode.jacob.vpu.JacobVPU;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ode.jacob.Jacob.instance;
+import static org.apache.ode.jacob.Jacob.newChannel;
+import static org.apache.ode.jacob.Jacob.object;
 
 
 public final class ProcessUtil {
+       // TODO: add more logging at TRACE level
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProcessUtil.class);
+
     private ProcessUtil() {
         // Utility class
     }
@@ -49,4 +61,178 @@ public final class ProcessUtil {
        return new ReceiveProcess().setChannel(proxy).setReceiver(listener);
     }
 
+       /**
+        * 
+        * @return      A noop RunnableProcess
+        */
+       public static RunnableProcess nil() {
+               return new Nil();
+       }
+
+       /**
+        * 
+        * @param callback
+        * @return a RunnableProcess that wraps a return notification in a 
separate process
+        * 
+        */
+       public static RunnableProcess terminator(final Synch callback) {
+               return callback != null ? new Terminator(callback) : null;
+       }
+
+       /**
+        * 
+        * @param callback
+        * @param process
+        * @return
+        * 
+        * Returns a synchronized process embedding the runnable process. Once 
the process finishes it 
+        * will notify that on the callback return channel
+        */
+    public static Synchronized sync(final Synch callback, final 
RunnableProcess process) {
+       return new SynchronizedWrapper(callback, process);
+    }
+
+       /**
+        * 
+        * @param callback
+        * @param process
+        * @return
+        * 
+        * Intercepts the execution of a synchronized process and executes an 
interceptor before the 
+        * termination of the process is actually signaled
+        */
+       public static Synchronized intercept(final Synchronized process, final 
RunnableProcess interceptor) {
+               if (interceptor == null) {
+                       return process;
+               }
+               Synch callback = newChannel(Synch.class, "");
+               object(receive(callback, new 
InterceptorSynch(process.getCallback(), interceptor)));
+               process.setCallback(callback);
+               return process;
+    }
+
+       /**
+        * 
+        * @param processes
+        * @return a Synchronized process
+        * 
+        * Ensures the sequential execution of processes
+        */
+       public static Synchronized sequence(final RunnableProcess... processes) 
{
+       return sequence(null, processes);
+    }
+
+       /**
+        * 
+        * @param callback
+        * @param processes
+        * @return
+        * 
+        * Ensures the sequential execution of processes. After the execution 
is complete a 
+        * notification is sent to the callback channel
+        */
+    public static Synchronized sequence(final Synch callback, final 
RunnableProcess... processes) {
+       return new SequenceProcess(callback, processes);
+    }
+
+    // Helpers Process composers
+
+    /**
+     * TODO: Document me
+     */
+    public static class Nil extends RunnableProcess {
+               private static final long serialVersionUID = 1L;
+               public void run() {
+                       // do nothing
+               }
+    }
+
+    /**
+     * TODO: Document me
+     */
+    public static class Terminator extends RunnableProcess {
+               private static final long serialVersionUID = 1L;
+               protected Synch callback;
+               public Terminator(final Synch callback) {
+                       this.callback = callback;
+               }
+               public Synch getCallback() {
+                       return callback;
+               }
+               public void run() {
+                       callback.ret();
+               }
+    }
+
+    public static abstract class Synchronized extends RunnableProcess {
+               private static final long serialVersionUID = 1L;
+               protected Synch callback;
+
+               public abstract void execute();
+
+               public Synchronized(final Synch callback) {
+                       setCallback(callback);
+               }
+               public Synch getCallback() {
+                       return callback;
+               }
+               public void setCallback(final Synch callback) {
+                       this.callback = callback;
+               }
+               public void run() {
+                       execute();
+                       if (callback != null) {
+                               callback.ret();
+                       }
+               }
+    }
+
+    public static class SynchronizedWrapper extends Synchronized {
+               private static final long serialVersionUID = 1L;
+               protected final RunnableProcess process;
+               public SynchronizedWrapper(final Synch callback, final 
RunnableProcess process) {
+                       super(callback);
+                       this.process = process;
+               }
+               public void execute() {
+                       process.run();
+               }
+    }
+
+    public static final class InterceptorSynch implements Synch {
+               private static final long serialVersionUID = 1L;
+               protected final RunnableProcess interceptor;
+               private final Synch target;
+               public InterceptorSynch(final Synch target, final 
RunnableProcess interceptor) {
+                       this.target = target;
+                       this.interceptor = interceptor;
+               }
+               public void ret() {
+                       instance(sync(target, interceptor));
+               }
+    }
+
+    public static final class SequenceProcess extends Synchronized {
+               private static final long serialVersionUID = 1L;
+               private final RunnableProcess[] processes;
+
+               public SequenceProcess(final Synch callback, final 
RunnableProcess[] processes) {
+                       super(callback);
+                       this.processes = processes;
+               }
+
+               public void execute() {
+                       // can only sequence synchronized processes
+                       final Synchronized current = 
ensureSynchronized(processes[0]);
+                       instance(intercept(current, processes.length > 1 ? 
+                               sequence(this.callback, 
Arrays.copyOfRange(processes, 1, processes.length)) :
+                               terminator(this.callback)));
+                       this.callback = null;
+               }
+               
+               public Synchronized ensureSynchronized(RunnableProcess process) 
{
+                       return process instanceof Synchronized ? 
(Synchronized)process : sync(null, process);
+               }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java 
b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index c709a95..72e33ce 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -27,6 +27,7 @@ import org.apache.ode.jacob.ChannelRef;
 import org.apache.ode.jacob.JacobThread;
 import org.apache.ode.jacob.Message;
 import org.apache.ode.jacob.MessageListener;
+import org.apache.ode.jacob.RunnableProcess;
 import org.apache.ode.jacob.oo.Channel;
 import org.apache.ode.jacob.oo.ChannelListener;
 import org.apache.ode.jacob.oo.ClassUtil;
@@ -175,7 +176,7 @@ public final class JacobVPU {
      * the injected process. This method is equivalent to the parallel 
operator,
      * but is intended to be used from outside of an active {@link 
JacobThread}.
      */
-    public void inject(Runnable concretion) {
+    public void inject(RunnableProcess concretion) {
         LOG.debug("injecting {}", concretion);
         addReaction(concretion, ClassUtil.RUN_METHOD_ACTION, new Class[]{},
             (LOG.isInfoEnabled() ? concretion.toString() : null));

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java 
b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
index 4dac54a..5165f8b 100644
--- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
+++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
@@ -296,7 +296,7 @@ public class HelloWorld extends RunnableProcess {
                }
 
                @Override
-               protected Runnable doStep(int step, Synch done) {
+               protected RunnableProcess doStep(int step, Synch done) {
                        return new SequenceItemEmitter(greetings[step], done, 
out);
         }
 

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java 
b/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java
index 8ea152c..c7b7e6c 100644
--- a/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java
+++ b/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java
@@ -68,7 +68,7 @@ public abstract class Sequence extends RunnableProcess {
      * @param done notification after step completion
      * @return runnable process
      */
-    protected abstract Runnable doStep(int step, Synch done);
+    protected abstract RunnableProcess doStep(int step, Synch done);
 
     public static class SequenceData {
         public int _steps;

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java 
b/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java
index c68c7f4..e11a517 100644
--- a/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java
+++ b/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java
@@ -25,6 +25,7 @@ import static org.apache.ode.jacob.oo.ProcessUtil.receive;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.ode.jacob.RunnableProcess;
 import org.apache.ode.jacob.soup.CommChannel;
 import org.apache.ode.jacob.vpu.ChannelFactory;
 import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
@@ -55,7 +56,7 @@ public class JacobChannelsTest {
         vpu.setContext(new ExecutionQueueImpl());
 
         final List<String> result = new ArrayList<String>();
-        vpu.inject(new Runnable() {
+        vpu.inject(new RunnableProcess() {
             public void run() {
                 Val v = (Val)vpu.newChannel(Val.class, "");
                 object(receive(v, new Val() {
@@ -88,7 +89,7 @@ public class JacobChannelsTest {
         vpu.setContext(new ExecutionQueueImpl());
 
         final List<String> result = new ArrayList<String>();
-        vpu.inject(new Runnable() {
+        vpu.inject(new RunnableProcess() {
             public void run() {
                 Val v = (Val)vpu.newChannel(Val.class, "");
                 object(ProcessUtil.compose(receive(v, new Val() {

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java 
b/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java
new file mode 100644
index 0000000..1bd938c
--- /dev/null
+++ b/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.oo;
+
+
+import org.apache.ode.jacob.Jacob;
+import org.apache.ode.jacob.RunnableProcess;
+import org.apache.ode.jacob.oo.ProcessUtil.Synchronized;
+import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
+import org.apache.ode.jacob.vpu.JacobVPU;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.ode.jacob.Jacob.instance;
+import static org.apache.ode.jacob.Jacob.object;
+import static org.apache.ode.jacob.oo.ProcessUtil.intercept;
+import static org.apache.ode.jacob.oo.ProcessUtil.receive;
+import static org.apache.ode.jacob.oo.ProcessUtil.sequence;
+import static org.apache.ode.jacob.oo.ProcessUtil.sync;
+
+
+public class SequentialProcessingTest {
+
+    @SuppressWarnings("serial")
+       @Test
+    public void testParallelProcesses() {
+       final StringBuffer out = new StringBuffer();
+       executeProcess(new RunnableProcess() {
+                       public void run() {
+                               instance(intercept(sync(null, 
atomicProcess("A", out)), atomicProcess("0", out)));
+                               instance(intercept(sync(null, 
atomicProcess("B", out)), atomicProcess("1", out)));
+                       }
+        });
+       // parallelism is proven by process "B" being executed before "A0" and 
"A1"
+       Assert.assertEquals("AB01", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+       @Test
+    public void testSynchronizeProcess() {
+       final StringBuffer out = new StringBuffer();
+       executeProcess(new RunnableProcess() {
+                       public void run() {
+                       Synch c1 = Jacob.newChannel(Synch.class, "");
+                       object(receive(c1, new Synch() {
+                                       public void ret() {
+                                               out.append("x");
+                                       }
+                               }));
+                               instance(sync(c1, atomicProcess("A", out)));
+                       }
+        });
+       // Return hook "x" is executed after process "A" 
+       Assert.assertEquals("Ax", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+       @Test
+    public void testSynchronizeSynchronizedProcess() {
+       final StringBuffer out = new StringBuffer();
+       executeProcess(new RunnableProcess() {
+                       public void run() {
+                       Synch c1 = Jacob.newChannel(Synch.class, "");
+                       object(receive(c1, new Synch() {
+                                       public void ret() {
+                                               out.append("x");
+                                       }
+                               }));
+                       Synch c2 = Jacob.newChannel(Synch.class, "");
+                       object(receive(c2, new Synch() {
+                                       public void ret() {
+                                               out.append("y");
+                                       }
+                               }));
+                       Synchronized process = synchronizedProcess(c1, "S", 
out);
+                               instance(sync(c2, process));
+                       }
+        });
+       // Both return hooks "x" and "y" are executed after synchronized 
process "S" 
+       Assert.assertEquals("Sxy", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+       @Test
+    public void testInterceptProcess() {
+       final StringBuffer out = new StringBuffer();
+       executeProcess(new RunnableProcess() {
+                       public void run() {
+                       Synch c1 = Jacob.newChannel(Synch.class, "");
+                       object(receive(c1, new Synch() {
+                                       public void ret() {
+                                               out.append("x");
+                                       }
+                               }));
+                               instance(intercept(sync(c1, atomicProcess("A", 
out)), atomicProcess("B", out)));
+                       }
+        });
+       // Return interceptor "B" is executed after process "A", but before the 
hook "x" 
+       Assert.assertEquals("ABx", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+       @Test
+    public void testInterceptSynchronizedProcess() {
+       final StringBuffer out = new StringBuffer();
+       executeProcess(new RunnableProcess() {
+                       public void run() {
+                       Synch c1 = Jacob.newChannel(Synch.class, "");
+                       object(receive(c1, new Synch() {
+                                       public void ret() {
+                                               out.append("x");
+                                       }
+                               }));
+                       Synch c2 = Jacob.newChannel(Synch.class, "");
+                       object(receive(c2, new Synch() {
+                                       public void ret() {
+                                               out.append("y");
+                                       }
+                               }));
+                               instance(intercept(sync(c1, atomicProcess("A", 
out)), sync(c2, atomicProcess("B", out))));
+                       }
+        });
+       // Return interceptor "B" is executed after process "A", but before the 
hook "x" 
+       Assert.assertEquals("AByx", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testSimpleSequence() {
+       final StringBuffer out = new StringBuffer();
+       executeProcess(new RunnableProcess() {
+                       public void run() {
+                               Jacob.instance(sequence(
+                               atomicProcess("A", out), 
+                               atomicProcess("B", out), 
+                               atomicProcess("C", out)));
+                       }
+        });
+       // TODO: explain 
+       Assert.assertEquals("ABC", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testSynchronizedSequence() {
+       final StringBuffer out = new StringBuffer();
+       executeProcess(new RunnableProcess() {
+                       public void run() {
+                       Synch c1 = Jacob.newChannel(Synch.class, "");
+                       object(receive(c1, new Synch() {
+                                       public void ret() {
+                                               out.append("x");
+                                       }
+                               }));
+                               Jacob.instance(sequence(c1,
+                               atomicProcess("A", out), 
+                               atomicProcess("B", out), 
+                               atomicProcess("C", out), 
+                               atomicProcess("D", out)));
+                       }
+        });
+       // TODO: explain 
+       Assert.assertEquals("ABCDx", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testTransitiveSequence() {
+       final StringBuffer out = new StringBuffer();
+       executeProcess(new RunnableProcess() {
+                       public void run() {
+                       Synch c1 = Jacob.newChannel(Synch.class, "");
+                       object(receive(c1, new Synch() {
+                                       public void ret() {
+                                               out.append("x");
+                                       }
+                               }));
+                               Jacob.instance(sequence(c1,
+                               atomicProcess("A", out), 
+                               sequence(atomicProcess("B", out), 
atomicProcess("C", out)), 
+                               atomicProcess("D", out)));
+                       }
+        });
+       // TODO: explain 
+       Assert.assertEquals("ABCDx", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testSequenceComposition() {
+       final StringBuffer out = new StringBuffer();
+       executeProcess(new RunnableProcess() {
+                       public void run() {
+                       Synch c1 = Jacob.newChannel(Synch.class, "");
+                       object(receive(c1, new Synch() {
+                                       public void ret() {
+                                               out.append("x");
+                                       }
+                               }));
+                       Synch c2 = Jacob.newChannel(Synch.class, "");
+                       object(receive(c2, new Synch() {
+                                       public void ret() {
+                                               out.append("y");
+                                       }
+                               }));
+                       // just test a more complex scenario once
+                               Jacob.instance(sequence(c1,
+                                   sequence(
+                                       sequence(
+                                               atomicProcess("A", out),
+                                               atomicProcess("B", out),
+                                               atomicProcess("C", out)),
+                                       atomicProcess("D", out)), 
+                                   atomicProcess("E", out), 
+                               sequence(c2,
+                                       atomicProcess("F", out), 
+                                   sequence(
+                                       atomicProcess("G", out),
+                               atomicProcess("H", out)))));
+                       }
+        });
+       // TODO: explain 
+       Assert.assertEquals("ABCDEFGHxy", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+       protected RunnableProcess atomicProcess(final String id, final 
StringBuffer out) {
+       return new RunnableProcess() {
+                       public void run() {
+                               out.append(id);
+                       }
+       };
+    }
+
+    @SuppressWarnings("serial")
+       protected Synchronized synchronizedProcess(final Synch callback, final 
String id, final StringBuffer out) {
+       return new Synchronized(callback) {
+                       public void execute() {
+                               out.append(id);
+                       }
+       };
+    }
+    
+    protected void executeProcess(final RunnableProcess process) {
+        final JacobVPU vpu = new JacobVPU();
+        vpu.setContext(new ExecutionQueueImpl());
+        vpu.inject(process);
+        while (vpu.execute()) {
+            // keep doing it...
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java 
b/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java
index b50e45b..50ccad9 100644
--- a/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java
+++ b/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java
@@ -1,69 +1,71 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ode.jacob.vpu;
-
-import static org.apache.ode.jacob.Jacob.newCommChannel;
-import static org.junit.Assert.*;
-import org.apache.ode.jacob.ChannelRef;
-import org.apache.ode.jacob.oo.Synch;
-import org.apache.ode.jacob.oo.Val;
-import org.apache.ode.jacob.soup.CommChannel;
-
-import org.junit.Test;
-
-public class ChannelRefTest {
-
-    @Test
-    public void testConnectWithInterface() {
-        JacobVPU vpu = new JacobVPU();
-        vpu.setContext(new ExecutionQueueImpl());
-        
-        vpu.inject(new Runnable() {
-            @Override
-            public void run() {
-                ChannelRef cref = newCommChannel("unbound channel");
-                CommChannel cchannel = cref.getEndpoint(CommChannel.class);
-                assertNotNull(cchannel);
-                assertNull(cchannel.getType());
-                
-                // now connect it to Val.class
-                Val val = cref.getEndpoint(Val.class);
-                assertNotNull(val);
-                assertEquals(Val.class, cchannel.getType());
-                
-                // now try to associate it with a different channel interface
-                try {
-                    cref.getEndpoint(Synch.class);
-                    fail("we should get an IllegalStateException");
-                } catch (IllegalStateException e) {
-                    assertEquals("ChannelRef is already associated with a 
channel of a different type", e.getMessage());
-                }
-                
-                // now try to associate with the same channel
-                Val val2 = cref.getEndpoint(Val.class);
-                assertNotNull(val2);
-                assertSame(val, val2);
-            }
-        });
-        
-        assertEquals(true, vpu.getContext().hasReactions());
-        vpu.execute();
-        assertEquals(false, vpu.getContext().hasReactions());
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.vpu;
+
+import static org.apache.ode.jacob.Jacob.newCommChannel;
+import static org.junit.Assert.*;
+
+import org.apache.ode.jacob.ChannelRef;
+import org.apache.ode.jacob.RunnableProcess;
+import org.apache.ode.jacob.oo.Synch;
+import org.apache.ode.jacob.oo.Val;
+import org.apache.ode.jacob.soup.CommChannel;
+import org.junit.Test;
+
+public class ChannelRefTest {
+
+    @Test
+    public void testConnectWithInterface() {
+        JacobVPU vpu = new JacobVPU();
+        vpu.setContext(new ExecutionQueueImpl());
+
+        vpu.inject(new RunnableProcess() {
+            @Override
+            public void run() {
+                ChannelRef cref = newCommChannel("unbound channel");
+                CommChannel cchannel = cref.getEndpoint(CommChannel.class);
+                assertNotNull(cchannel);
+                assertNull(cchannel.getType());
+
+                // now connect it to Val.class
+                Val val = cref.getEndpoint(Val.class);
+                assertNotNull(val);
+                assertEquals(Val.class, cchannel.getType());
+
+                // now try to associate it with a different channel interface
+                try {
+                    cref.getEndpoint(Synch.class);
+                    fail("we should get an IllegalStateException");
+                } catch (IllegalStateException e) {
+                    assertEquals("ChannelRef is already associated with a 
channel of a different type", e.getMessage());
+                }
+
+                // now try to associate with the same channel
+                Val val2 = cref.getEndpoint(Val.class);
+                assertNotNull(val2);
+                assertSame(val, val2);
+            }
+        });
+
+        assertEquals(true, vpu.getContext().hasReactions());
+        vpu.execute();
+        assertEquals(false, vpu.getContext().hasReactions());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/2cf09b89/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/src/test/resources/log4j.properties 
b/src/test/resources/log4j.properties
index 91bd688..c1d9d01 100644
--- a/src/test/resources/log4j.properties
+++ b/src/test/resources/log4j.properties
@@ -16,10 +16,11 @@
 #
 
 # Set root logger level to WARN and its only appender to CONSOLE
-log4j.rootLogger=TRACE, file
+log4j.rootLogger=INFO, file
 
 # log4j properties to work with command line tools.
-log4j.category.org.apache.ode=TRACE
+log4j.category.org.apache.ode=DEBUG
+#log4j.category.org.apache.ode.jacob.oo=TRACE
 
 # Console appender
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender

Reply via email to