IGNITE-1314: Moved messaging to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/27a59cf8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/27a59cf8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/27a59cf8 Branch: refs/heads/ignite-1093 Commit: 27a59cf8a18a9f12e42fd0dc54890f6e44d91515 Parents: f4c7107 Author: vozerov-gridgain <[email protected]> Authored: Fri Aug 28 09:54:10 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Aug 28 09:54:10 2015 +0300 ---------------------------------------------------------------------- .../processors/platform/PlatformContext.java | 10 ++ .../messaging/PlatformMessageFilter.java | 109 +++++++++++++ .../messaging/PlatformMessageLocalFilter.java | 102 ++++++++++++ .../platform/messaging/PlatformMessaging.java | 162 +++++++++++++++++++ 4 files changed, 383 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java index 461fb84..68e0e35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.portable.*; import org.apache.ignite.internal.processors.cache.query.continuous.*; import org.apache.ignite.internal.processors.platform.cache.query.*; @@ -154,4 +155,13 @@ public interface PlatformContext { * @return Filter. */ public CacheContinuousQueryFilterEx createContinuousQueryFilter(Object filter); + + /** + * Create remote message filter. + * + * @param filter Native filter. + * @param ptr Pointer of deployed native filter. + * @return Filter. + */ + public GridLifecycleAwareMessageFilter<UUID, Object> createRemoteMessageFilter(Object filter, long ptr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java new file mode 100644 index 0000000..8a433ac --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.messaging; + +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.memory.*; +import org.apache.ignite.internal.processors.platform.utils.*; + +import java.util.*; + +/** + * Interop filter. Delegates apply to native platform. + */ +public class PlatformMessageFilter extends PlatformAbstractPredicate + implements GridLifecycleAwareMessageFilter<UUID, Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Constructor. + */ + public PlatformMessageFilter() + { + super(); + } + + /** + * Constructor. + * + * @param pred .Net portable predicate. + * @param ptr Pointer to predicate in the native platform. + * @param ctx Kernal context. + */ + protected PlatformMessageFilter(Object pred, long ptr, PlatformContext ctx) { + super(pred, ptr, ctx); + } + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object m) { + if (ptr == 0) + return false; // Destroyed. + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(uuid); + writer.writeObject(m); + + out.synchronize(); + + return ctx.gateway().messagingFilterApply(ptr, mem.pointer()) != 0; + } + } + + /** {@inheritDoc} */ + @Override public void initialize(GridKernalContext kernalCtx) { + if (ptr != 0) + return; + + ctx = PlatformUtils.platformContext(kernalCtx.grid()); + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(pred); + + out.synchronize(); + + ptr = ctx.gateway().messagingFilterCreate(mem.pointer()); + } + } + + /** {@inheritDoc} */ + @Override public void close() { + if (ptr == 0) // Already destroyed or not initialized yet. + return; + + try { + assert ctx != null; + + ctx.gateway().messagingFilterDestroy(ptr); + } + finally { + ptr = 0; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java new file mode 100644 index 0000000..71bb918 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.messaging; + +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.memory.*; + +import java.util.*; + +/** + * Interop local filter. Delegates apply to native platform, uses id to identify native target. + */ +public class PlatformMessageLocalFilter implements GridLifecycleAwareMessageFilter<UUID, Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + protected final long hnd; + + /** */ + protected final PlatformContext platformCtx; + + /** + * Constructor. + * + * @param hnd Handle in the native platform. + * @param ctx Context. + */ + public PlatformMessageLocalFilter(long hnd, PlatformContext ctx) { + assert ctx != null; + assert hnd != 0; + + this.hnd = hnd; + this.platformCtx = ctx; + } + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object m) { + try (PlatformMemory mem = platformCtx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = platformCtx.writer(out); + + writer.writeObject(uuid); + writer.writeObject(m); + + out.synchronize(); + + int res = platformCtx.gateway().messagingFilterApply(hnd, mem.pointer()); + + return res != 0; + } + } + + /** {@inheritDoc} */ + @Override public void close() { + platformCtx.gateway().messagingFilterDestroy(hnd); + } + + /** {@inheritDoc} */ + @Override public void initialize(GridKernalContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + PlatformMessageLocalFilter filter = (PlatformMessageLocalFilter)o; + + return hnd == filter.hnd; + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(hnd ^ (hnd >>> 32)); + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java new file mode 100644 index 0000000..ffc2ab3 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.messaging; + +import org.apache.ignite.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Interop messaging. + */ +public class PlatformMessaging extends PlatformAbstractTarget { + /** */ + public static final int OP_LOC_LISTEN = 1; + + /** */ + public static final int OP_REMOTE_LISTEN = 2; + + /** */ + public static final int OP_SEND = 3; + + /** */ + public static final int OP_SEND_MULTI = 4; + + /** */ + public static final int OP_SEND_ORDERED = 5; + + /** */ + public static final int OP_STOP_LOC_LISTEN = 6; + + /** */ + public static final int OP_STOP_REMOTE_LISTEN = 7; + + /** */ + private final IgniteMessaging messaging; + + /** + * Ctor. + * + * @param platformCtx Context. + * @param messaging Ignite messaging. + */ + public PlatformMessaging(PlatformContext platformCtx, IgniteMessaging messaging) { + super(platformCtx); + + assert messaging != null; + + this.messaging = messaging; + } + + /** + * Gets messaging with asynchronous mode enabled. + * + * @return Messaging with asynchronous mode enabled. + */ + public PlatformMessaging withAsync() { + if (messaging.isAsync()) + return this; + + return new PlatformMessaging (platformCtx, messaging.withAsync()); + } + + /** {@inheritDoc} */ + @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case OP_SEND: + messaging.send(reader.readObjectDetached(), reader.readObjectDetached()); + + return TRUE; + + case OP_SEND_MULTI: + messaging.send(reader.readObjectDetached(), PlatformUtils.readCollection(reader)); + + return TRUE; + + case OP_SEND_ORDERED: + messaging.sendOrdered(reader.readObjectDetached(), reader.readObjectDetached(), reader.readLong()); + + return TRUE; + + case OP_LOC_LISTEN: { + PlatformMessageLocalFilter filter = new PlatformMessageLocalFilter(reader.readLong(), platformCtx); + + Object topic = reader.readObjectDetached(); + + messaging.localListen(topic, filter); + + return TRUE; + } + + case OP_STOP_LOC_LISTEN: { + PlatformMessageLocalFilter filter = new PlatformMessageLocalFilter(reader.readLong(), platformCtx); + + Object topic = reader.readObjectDetached(); + + messaging.stopLocalListen(topic, filter); + + return TRUE; + } + + case OP_STOP_REMOTE_LISTEN: { + messaging.stopRemoteListen(reader.readUuid()); + + return TRUE; + } + + default: + throw new IgniteCheckedException("Unsupported operation type: " + type); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"}) + @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer, + Object arg) throws IgniteCheckedException { + switch (type) { + case OP_REMOTE_LISTEN:{ + Object nativeFilter = reader.readObjectDetached(); + + long ptr = reader.readLong(); // interop pointer + + Object topic = reader.readObjectDetached(); + + GridLifecycleAwareMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr); + + UUID listenId = messaging.remoteListen(topic, filter); + + writer.writeUuid(listenId); + + break; + } + + default: + throw new IgniteCheckedException("Unsupported operation type: " + type); + } + } + + /** <inheritDoc /> */ + @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { + return messaging.future(); + } +} \ No newline at end of file
