Minor refactoring around channel identity
Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/9fe2dea2 Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/9fe2dea2 Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/9fe2dea2 Branch: refs/heads/master Commit: 9fe2dea23a44bb384f62956e40760dd4beab3061 Parents: 91885ee Author: Hadrian Zbarcea <[email protected]> Authored: Sun Jan 12 15:31:51 2014 -0500 Committer: Hadrian Zbarcea <[email protected]> Committed: Sun Jan 12 15:31:51 2014 -0500 ---------------------------------------------------------------------- .../org/apache/ode/jacob/oo/ProcessUtil.java | 4 +- .../soup/jackson/ChannelProxySerializer.java | 2 +- .../soup/jackson/JacobTypeResolverBuilder.java | 2 +- .../apache/ode/jacob/vpu/ChannelFactory.java | 2 +- .../ode/jacob/vpu/ExecutionQueueImpl.java | 2 +- .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 31 +++++++------ .../apache/ode/jacob/oo/JacobChannelsTest.java | 46 ++++++++++++++++++++ 7 files changed, 69 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/9fe2dea2/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java index 3d11f77..f32c0f0 100644 --- a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java +++ b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java @@ -43,8 +43,8 @@ public final class ProcessUtil { } public static <T extends Channel> ChannelListener receive(T proxy, T listener) { - // TODO: NOTE: this *only* works when the listnere doesn't need to be Serialiazble really - // because we cannot declare a staic serialVersionUID like this + // TODO: NOTE: this *only* works when the listener doesn't need to be Serializable really + // because we cannot declare a static serialVersionUID like this // once we fix serialization, this can be simplified significantly via a dsl return new ReceiveProcess().setChannel(proxy).setReceiver(listener); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/9fe2dea2/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java index 95948db..8a6725c 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java @@ -72,7 +72,7 @@ public class ChannelProxySerializer extends StdSerializer<ChannelProxy>{ private void serializeContents(ChannelProxy value, JsonGenerator jgen, SerializerProvider provider) throws JsonGenerationException, IOException { - CommChannel commChannel = (CommChannel) ChannelFactory.getBackend((Channel)value); + CommChannel commChannel = ChannelFactory.getBackend((Channel)value); ClassNameIdResolver idResolver = new ClassNameIdResolver(provider.constructType(commChannel.getType()), provider.getTypeFactory()); Integer cid = (Integer)commChannel.getId(); jgen.writeStringField("channelType", idResolver.idFromBaseType()); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/9fe2dea2/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java index 399be8f..0f55403 100644 --- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java +++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java @@ -123,7 +123,7 @@ public class JacobTypeResolverBuilder extends StdTypeResolverBuilder { public String idFromValue(Object value) { if (value instanceof ChannelProxy) { - CommChannel commChannel = (CommChannel) ChannelFactory.getBackend((Channel)value); + CommChannel commChannel = ChannelFactory.getBackend((Channel)value); return commChannel.getType().getName(); } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/9fe2dea2/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java b/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java index ab5743d..6fdbf14 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java +++ b/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java @@ -37,7 +37,7 @@ public class ChannelFactory { } } - public static Object getBackend(Channel channel) { + public static CommChannel getBackend(Channel channel) { ChannelInvocationHandler cih = (ChannelInvocationHandler) Proxy.getInvocationHandler(channel); return cih._backend; } http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/9fe2dea2/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java index bb38f89..7520f90 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java +++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java @@ -662,7 +662,7 @@ public class ExecutionQueueImpl implements ExecutionQueue { } if (obj instanceof org.apache.ode.jacob.oo.ChannelProxy) { - CommChannel commChannel = (CommChannel) ChannelFactory.getBackend((Channel)obj); + CommChannel commChannel = ChannelFactory.getBackend((Channel)obj); _serializedChannels.add(commChannel.getId()); return new ChannelRef(commChannel.getType(), (Integer) commChannel.getId()); } else if (_replacementMap != null && _replacementMap.isReplaceable(obj)) { http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/9fe2dea2/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java index d70afd8..f93d0bd 100644 --- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java +++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java @@ -149,6 +149,18 @@ public final class JacobVPU { ++_statistics.runQueueEntries; } + public Channel newChannel(Class<?> channelType, String description) { + CommChannel chnl = new CommChannel(channelType); + chnl.setDescription(description); + _executionQueue.add(chnl); + + Channel ret = ChannelFactory.createChannel(chnl, channelType); + LOG.trace(">> [{}] : new {}", _cycle, ret); + + _statistics.channelsCreated++; + return ret; + } + /** * Get the active Jacob thread, i.e. the one associated with the current Java thread. */ @@ -265,10 +277,10 @@ public final class JacobVPU { "Channel method '" + method + "' must only return void or an implementation of " + Channel.class.getName()); } replyChannel = newChannel(method.getReturnType(), "Reply Channel"); - replyCommChannel = (CommChannel) ChannelFactory.getBackend((Channel)replyChannel); + replyCommChannel = ChannelFactory.getBackend((Channel)replyChannel); } - CommChannel chnl = (CommChannel) ChannelFactory.getBackend((Channel)channel); + CommChannel chnl = ChannelFactory.getBackend((Channel)channel); Message msg = ClassUtil.createMessage(chnl, ClassUtil.getActionForMethod(method), args, replyCommChannel); sendMessage(msg); @@ -288,15 +300,7 @@ public final class JacobVPU { } public Channel newChannel(Class<?> channelType, String description) { - CommChannel chnl = new CommChannel(channelType); - chnl.setDescription(description); - _executionQueue.add(chnl); - - Channel ret = ChannelFactory.createChannel(chnl, channelType); - LOG.trace(">> [{}] : new {}", _cycle, ret); - - _statistics.channelsCreated++; - return ret; + return JacobVPU.this.newChannel(channelType, description); } public ChannelRef newCommChannel(String description) { @@ -313,7 +317,7 @@ public final class JacobVPU { public String exportChannel(Channel channel) { LOG.trace(">> [{}] : export<{}>", _cycle, channel); - CommChannel chnl = (CommChannel)ChannelFactory.getBackend((Channel)channel); + CommChannel chnl = ChannelFactory.getBackend((Channel)channel); return _executionQueue.createExport(chnl); } @@ -432,8 +436,7 @@ public final class JacobVPU { addCommChannel(group, r); } } else if (receiver instanceof ReceiveProcess) { - CommChannel chnl = (CommChannel)ChannelFactory.getBackend( - ((ReceiveProcess)receiver).getChannel()); + CommChannel chnl = ChannelFactory.getBackend(((ReceiveProcess)receiver).getChannel()); // TODO see below.. // oframe.setDebugInfo(fillDebugInfo()); CommRecv recv = new CommRecv(chnl, receiver); http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/9fe2dea2/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java b/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java new file mode 100644 index 0000000..7bf6506 --- /dev/null +++ b/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jacob.oo; + + +import org.apache.ode.jacob.soup.CommChannel; +import org.apache.ode.jacob.vpu.ChannelFactory; +import org.apache.ode.jacob.vpu.ExecutionQueueImpl; +import org.apache.ode.jacob.vpu.JacobVPU; +import org.junit.Assert; +import org.junit.Test; + + +public class JacobChannelsTest { + + @Test + public void testMultipleSameTypeChannels() { + JacobVPU vpu = new JacobVPU(); + ExecutionQueueImpl queue = new ExecutionQueueImpl(); + vpu.setContext(queue); + + Channel one = vpu.newChannel(Val.class, ""); + Channel two = vpu.newChannel(Val.class, ""); + CommChannel back1 = ChannelFactory.getBackend(one); + CommChannel back2 = ChannelFactory.getBackend(two); + Assert.assertEquals(back1.getType(), back2.getType()); + Assert.assertNotEquals(back1.getId(), back2.getId()); + } + +}
