Improving platform interfaces.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/32579d45 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/32579d45 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/32579d45 Branch: refs/heads/ignite-1093-2 Commit: 32579d450377f177fc6078c4d9686de88dd21220 Parents: 0e25f55 Author: vozerov-gridgain <[email protected]> Authored: Tue Sep 1 12:49:16 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Sep 1 12:49:16 2015 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 15 +- .../managers/communication/GridIoManager.java | 50 +++--- .../GridLifecycleAwareMessageFilter.java | 38 ----- .../eventstorage/GridEventStorageManager.java | 16 +- .../platform/PlatformAwareEventFilter.java | 39 ----- .../processors/platform/PlatformContext.java | 15 +- .../platform/PlatformEventFilterListener.java | 39 +++++ .../platform/PlatformLocalEventListener.java | 28 ---- .../platform/message/PlatformMessageFilter.java | 40 +++++ .../platform/events/PlatformEventFilter.java | 164 ------------------- .../events/PlatformEventFilterListenerImpl.java | 163 ++++++++++++++++++ .../platform/events/PlatformEvents.java | 15 +- .../messaging/PlatformMessageFilter.java | 110 ------------- .../messaging/PlatformMessageFilterImpl.java | 110 +++++++++++++ .../messaging/PlatformMessageLocalFilter.java | 9 +- .../platform/messaging/PlatformMessaging.java | 7 +- 16 files changed, 416 insertions(+), 442 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 93e01e5..599d301 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -38,8 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; -import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter; -import org.apache.ignite.internal.processors.platform.PlatformLocalEventListener; +import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.T3; @@ -139,8 +138,8 @@ class GridEventConsumeHandler implements GridContinuousHandler { if (filter != null) ctx.resource().injectGeneric(filter); - if (filter instanceof PlatformAwareEventFilter) - ((PlatformAwareEventFilter)filter).initialize(ctx); + if (filter instanceof PlatformEventFilterListener) + ((PlatformEventFilterListener)filter).initialize(ctx); final boolean loc = nodeId.equals(ctx.localNodeId()); @@ -260,16 +259,16 @@ class GridEventConsumeHandler implements GridContinuousHandler { RuntimeException err = null; try { - if (filter instanceof PlatformAwareEventFilter) - ((PlatformAwareEventFilter)filter).close(); + if (filter instanceof PlatformEventFilterListener) + ((PlatformEventFilterListener)filter).onClose(); } catch(RuntimeException ex) { err = ex; } try { - if (cb instanceof PlatformLocalEventListener) - ((PlatformLocalEventListener)cb).close(); + if (cb instanceof PlatformEventFilterListener) + ((PlatformEventFilterListener)cb).onClose(); } catch (RuntimeException ex) { if (err == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index aa73296..b8af8da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -17,26 +17,6 @@ package org.apache.ignite.internal.managers.communication; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -54,6 +34,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.GridSpinReadWriteLock; @@ -83,6 +64,27 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -1457,8 +1459,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) { if (p != null) { try { - if (p instanceof GridLifecycleAwareMessageFilter) - ((GridLifecycleAwareMessageFilter)p).initialize(ctx); + if (p instanceof PlatformMessageFilter) + ((PlatformMessageFilter)p).initialize(ctx); else ctx.resource().injectGeneric(p); @@ -1795,8 +1797,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (lsnr instanceof GridUserMessageListener) { GridUserMessageListener userLsnr = (GridUserMessageListener)lsnr; - if (userLsnr.predLsnr instanceof GridLifecycleAwareMessageFilter) - ((GridLifecycleAwareMessageFilter)userLsnr.predLsnr).close(); + if (userLsnr.predLsnr instanceof PlatformMessageFilter) + ((PlatformMessageFilter)userLsnr.predLsnr).onClose(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java deleted file mode 100644 index 2d33a65..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.communication; - -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.lang.IgniteBiPredicate; - -/** - * Special version of bi-predicate for messaging with initialize/close callbacks. - */ -public interface GridLifecycleAwareMessageFilter<K, V> extends IgniteBiPredicate<K, V> { - /** - * Initializes the filter. - * - * @param ctx Kernal context. - */ - public void initialize(GridKernalContext ctx); - - /** - * Closes the filter. - */ - public void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 7b8c759..ea01e52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -47,8 +47,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; -import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter; -import org.apache.ignite.internal.processors.platform.PlatformLocalEventListener; +import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; @@ -681,8 +680,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> { IgnitePredicate p = ((UserListenerWrapper)lsnr).listener(); - if (p instanceof PlatformLocalEventListener) - ((PlatformLocalEventListener)p).close(); + if (p instanceof PlatformEventFilterListener) + ((PlatformEventFilterListener)p).onClose(); } return found; @@ -784,19 +783,20 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> * @param p Grid event predicate. * @return Collection of grid events. */ + @SuppressWarnings("unchecked") public <T extends Event> Collection<T> localEvents(IgnitePredicate<T> p) { assert p != null; - if (p instanceof PlatformAwareEventFilter) { - PlatformAwareEventFilter p0 = (PlatformAwareEventFilter)p; + if (p instanceof PlatformEventFilterListener) { + PlatformEventFilterListener p0 = (PlatformEventFilterListener)p; p0.initialize(ctx); try { - return getSpi().localEvents(p0); + return (Collection<T>)getSpi().localEvents(p0); } finally { - p0.close(); + p0.onClose(); } } else http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java deleted file mode 100644 index a423578..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform; - -import java.util.UUID; -import org.apache.ignite.events.Event; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.lang.IgniteBiPredicate; -import org.apache.ignite.lang.IgnitePredicate; - -/** - * Special version of predicate for events with initialize/close callbacks. - */ -public interface PlatformAwareEventFilter<E extends Event> extends IgnitePredicate<E>, IgniteBiPredicate<UUID, E> { - /** - * Initializes the filter. - */ - public void initialize(GridKernalContext ctx); - - /** - * Closes the filter. - */ - public void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java index cea8326..1febf07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java @@ -18,13 +18,10 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventAdapter; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter; import org.apache.ignite.internal.portable.PortableRawReaderEx; import org.apache.ignite.internal.portable.PortableRawWriterEx; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilterEx; @@ -39,10 +36,10 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; import org.jetbrains.annotations.Nullable; import java.util.Collection; -import java.util.UUID; /** * Platform context. Acts as an entry point for platform operations. @@ -178,7 +175,7 @@ public interface PlatformContext { * @param ptr Pointer of deployed native filter. * @return Filter. */ - public GridLifecycleAwareMessageFilter<UUID, Object> createRemoteMessageFilter(Object filter, long ptr); + public PlatformMessageFilter createRemoteMessageFilter(Object filter, long ptr); /** * Check whether the given event type is supported. @@ -192,9 +189,9 @@ public interface PlatformContext { * Write event. * * @param writer Writer. - * @param event Event. + * @param evt Event. */ - public void writeEvent(PortableRawWriterEx writer, EventAdapter event); + public void writeEvent(PortableRawWriterEx writer, Event evt); /** * Create local event filter. @@ -202,7 +199,7 @@ public interface PlatformContext { * @param hnd Native handle. * @return Filter. */ - public <E extends Event> PlatformAwareEventFilter<E> createLocalEventFilter(long hnd); + public PlatformEventFilterListener createLocalEventFilter(long hnd); /** * Create remote event filter. @@ -211,7 +208,7 @@ public interface PlatformContext { * @param types Event types. * @return Filter. */ - public <E extends Event> PlatformAwareEventFilter<E> createRemoteEventFilter(Object pred, final int... types); + public PlatformEventFilterListener createRemoteEventFilter(Object pred, final int... types); /** * Create native exception. http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java new file mode 100644 index 0000000..77f0ac8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import java.util.UUID; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; + +/** + * Platform event filter and listener. + */ +public interface PlatformEventFilterListener extends IgnitePredicate<Event>, IgniteBiPredicate<UUID, Event> { + /** + * Initializes the filter. + */ + public void initialize(GridKernalContext ctx); + + /** + * Callback invoked when filter is closed. + */ + public void onClose(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java deleted file mode 100644 index f38d8e0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform; - -/** - * Special version of listener for events with close callbacks. - */ -public interface PlatformLocalEventListener { - /** - * Closes the listener. - */ - public void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java new file mode 100644 index 0000000..e5cec0a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.message; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.lang.IgniteBiPredicate; + +import java.util.UUID; + +/** + * Platform message filter. + */ +public interface PlatformMessageFilter extends IgniteBiPredicate<UUID, Object> { + /** + * Initializes the filter. + * + * @param ctx Kernal context. + */ + public void initialize(GridKernalContext ctx); + + /** + * Closes the filter. + */ + public void onClose(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java deleted file mode 100644 index 32daa1c..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform.events; - -import java.util.UUID; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventAdapter; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.portable.PortableRawWriterEx; -import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter; -import org.apache.ignite.internal.processors.platform.PlatformContext; -import org.apache.ignite.internal.processors.platform.PlatformLocalEventListener; -import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; -import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; - -/** - * Platform event filter. Delegates apply to native platform. - */ -public class PlatformEventFilter<E extends Event> implements PlatformAwareEventFilter<E>, PlatformLocalEventListener -{ - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final Object pred; - - /** Event types. */ - private final int[] types; - - /** */ - protected transient long hnd; - - /** */ - private transient PlatformContext ctx; - - /** - * Constructor. - * - * @param hnd Handle in the native platform. - * @param ctx Context. - */ - public PlatformEventFilter(long hnd, PlatformContext ctx) { - assert ctx != null; - assert hnd != 0; - - this.hnd = hnd; - this.ctx = ctx; - - pred = null; - types = null; - } - - /** - * Constructor. - * - * @param pred .Net portable predicate. - */ - public PlatformEventFilter(Object pred, final int... types) { - assert pred != null; - - this.pred = pred; - this.types = types; - } - - /** {@inheritDoc} */ - @Override public boolean apply(E evt) { - return apply0(null, evt); - } - - /** {@inheritDoc} */ - @Override public boolean apply(UUID uuid, E evt) { - return apply0(uuid, evt); - } - - /** - * Apply impl. - * @param uuid Node if. - * @param evt Event. - * @return Result. - */ - private boolean apply0(final UUID uuid, final E evt) { - if (!ctx.isEventTypeSupported(evt.type())) - return false; - - if (types != null) { - boolean match = false; - - for (int type : types) { - if (type == evt.type()) { - match = true; - break; - } - } - - if (!match) - return false; - } - - try (PlatformMemory mem = ctx.memory().allocate()) { - PlatformOutputStream out = mem.output(); - - PortableRawWriterEx writer = ctx.writer(out); - - ctx.writeEvent(writer, (EventAdapter)evt); - - writer.writeUuid(uuid); - - out.synchronize(); - - int res = ctx.gateway().eventFilterApply(hnd, mem.pointer()); - - return res != 0; - } - } - - /** {@inheritDoc} */ - @Override public void close() { - ctx.gateway().eventFilterDestroy(hnd); - } - - /** {@inheritDoc} */ - @Override public void initialize(GridKernalContext gridCtx) { - ctx = PlatformUtils.platformContext(gridCtx.grid()); - - try (PlatformMemory mem = ctx.memory().allocate()) { - PlatformOutputStream out = mem.output(); - - PortableRawWriterEx writer = ctx.writer(out); - - writer.writeObjectDetached(pred); - - out.synchronize(); - - hnd = ctx.gateway().eventFilterCreate(mem.pointer()); - } - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - return this == o || o != null && o instanceof PlatformEventFilter && - hnd == ((PlatformEventFilter)o).hnd; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return (int)(hnd ^ (hnd >>> 32)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java new file mode 100644 index 0000000..b2dfd1c --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.events; + +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; + +import java.util.UUID; + +/** + * Platform event filter. Delegates apply to native platform. + */ +public class PlatformEventFilterListenerImpl implements PlatformEventFilterListener +{ + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object pred; + + /** Event types. */ + private final int[] types; + + /** */ + protected transient long hnd; + + /** */ + private transient PlatformContext ctx; + + /** + * Constructor. + * + * @param hnd Handle in the native platform. + * @param ctx Context. + */ + public PlatformEventFilterListenerImpl(long hnd, PlatformContext ctx) { + assert ctx != null; + assert hnd != 0; + + this.hnd = hnd; + this.ctx = ctx; + + pred = null; + types = null; + } + + /** + * Constructor. + * + * @param pred .Net portable predicate. + */ + public PlatformEventFilterListenerImpl(Object pred, final int... types) { + assert pred != null; + + this.pred = pred; + this.types = types; + } + + /** {@inheritDoc} */ + @Override public boolean apply(Event evt) { + return apply0(null, evt); + } + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Event evt) { + return apply0(uuid, evt); + } + + /** + * Apply impl. + * @param uuid Node if. + * @param evt Event. + * @return Result. + */ + private boolean apply0(final UUID uuid, final Event evt) { + if (!ctx.isEventTypeSupported(evt.type())) + return false; + + if (types != null) { + boolean match = false; + + for (int type : types) { + if (type == evt.type()) { + match = true; + break; + } + } + + if (!match) + return false; + } + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + ctx.writeEvent(writer, evt); + + writer.writeUuid(uuid); + + out.synchronize(); + + int res = ctx.gateway().eventFilterApply(hnd, mem.pointer()); + + return res != 0; + } + } + + /** {@inheritDoc} */ + @Override public void onClose() { + ctx.gateway().eventFilterDestroy(hnd); + } + + /** {@inheritDoc} */ + @Override public void initialize(GridKernalContext gridCtx) { + ctx = PlatformUtils.platformContext(gridCtx.grid()); + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObjectDetached(pred); + + out.synchronize(); + + hnd = ctx.gateway().eventFilterCreate(mem.pointer()); + } + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return this == o || o != null && o instanceof PlatformEventFilterListenerImpl && + hnd == ((PlatformEventFilterListenerImpl)o).hnd; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(hnd ^ (hnd >>> 32)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java index fde6be5..997c019 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java @@ -22,11 +22,12 @@ import java.util.Collection; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteEvents; +import org.apache.ignite.events.Event; import org.apache.ignite.events.EventAdapter; import org.apache.ignite.internal.portable.PortableRawReaderEx; import org.apache.ignite.internal.portable.PortableRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; -import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter; +import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.util.typedef.F; @@ -205,14 +206,14 @@ public class PlatformEvents extends PlatformAbstractTarget { boolean hasLocFilter = reader.readBoolean(); - PlatformAwareEventFilter locFilter = hasLocFilter ? localFilter(reader.readLong()) : null; + PlatformEventFilterListener locFilter = hasLocFilter ? localFilter(reader.readLong()) : null; boolean hasRmtFilter = reader.readBoolean(); UUID listenId; if (hasRmtFilter) { - PlatformAwareEventFilter rmtFilter = platformCtx.createRemoteEventFilter( + PlatformEventFilterListener rmtFilter = platformCtx.createRemoteEventFilter( reader.readObjectDetached(), readEventTypes(reader)); listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, rmtFilter); @@ -233,16 +234,16 @@ public class PlatformEvents extends PlatformAbstractTarget { int[] types = readEventTypes(reader); - PlatformAwareEventFilter filter = platformCtx.createRemoteEventFilter(pred, types); + PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types); - Collection<EventAdapter> result = events.remoteQuery(filter, timeout); + Collection<Event> result = events.remoteQuery(filter, timeout); if (result == null) writer.writeInt(-1); else { writer.writeInt(result.size()); - for (EventAdapter e : result) + for (Event e : result) platformCtx.writeEvent(writer, e); } @@ -325,7 +326,7 @@ public class PlatformEvents extends PlatformAbstractTarget { * @param hnd Handle. * @return Interop filter. */ - private PlatformAwareEventFilter localFilter(long hnd) { + private PlatformEventFilterListener localFilter(long hnd) { return platformCtx.createLocalEventFilter(hnd); } http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java deleted file mode 100644 index 4237665..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform.messaging; - -import java.util.UUID; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter; -import org.apache.ignite.internal.portable.PortableRawWriterEx; -import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; -import org.apache.ignite.internal.processors.platform.PlatformContext; -import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; -import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; - -/** - * Interop filter. Delegates apply to native platform. - */ -public class PlatformMessageFilter extends PlatformAbstractPredicate - implements GridLifecycleAwareMessageFilter<UUID, Object> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Constructor. - */ - public PlatformMessageFilter() - { - super(); - } - - /** - * Constructor. - * - * @param pred .Net portable predicate. - * @param ptr Pointer to predicate in the native platform. - * @param ctx Kernal context. - */ - protected PlatformMessageFilter(Object pred, long ptr, PlatformContext ctx) { - super(pred, ptr, ctx); - } - - /** {@inheritDoc} */ - @Override public boolean apply(UUID uuid, Object m) { - if (ptr == 0) - return false; // Destroyed. - - try (PlatformMemory mem = ctx.memory().allocate()) { - PlatformOutputStream out = mem.output(); - - PortableRawWriterEx writer = ctx.writer(out); - - writer.writeObject(uuid); - writer.writeObject(m); - - out.synchronize(); - - return ctx.gateway().messagingFilterApply(ptr, mem.pointer()) != 0; - } - } - - /** {@inheritDoc} */ - @Override public void initialize(GridKernalContext kernalCtx) { - if (ptr != 0) - return; - - ctx = PlatformUtils.platformContext(kernalCtx.grid()); - - try (PlatformMemory mem = ctx.memory().allocate()) { - PlatformOutputStream out = mem.output(); - - PortableRawWriterEx writer = ctx.writer(out); - - writer.writeObject(pred); - - out.synchronize(); - - ptr = ctx.gateway().messagingFilterCreate(mem.pointer()); - } - } - - /** {@inheritDoc} */ - @Override public void close() { - if (ptr == 0) // Already destroyed or not initialized yet. - return; - - try { - assert ctx != null; - - ctx.gateway().messagingFilterDestroy(ptr); - } - finally { - ptr = 0; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java new file mode 100644 index 0000000..1e42914 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.messaging; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; + +import java.util.UUID; + +/** + * Platform message filter. Delegates apply to native platform. + */ +public class PlatformMessageFilterImpl extends PlatformAbstractPredicate implements PlatformMessageFilter { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Constructor. + */ + public PlatformMessageFilterImpl() + { + super(); + } + + /** + * Constructor. + * + * @param pred .Net portable predicate. + * @param ptr Pointer to predicate in the native platform. + * @param ctx Kernal context. + */ + protected PlatformMessageFilterImpl(Object pred, long ptr, PlatformContext ctx) { + super(pred, ptr, ctx); + } + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object m) { + if (ptr == 0) + return false; // Destroyed. + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(uuid); + writer.writeObject(m); + + out.synchronize(); + + return ctx.gateway().messagingFilterApply(ptr, mem.pointer()) != 0; + } + } + + /** {@inheritDoc} */ + @Override public void initialize(GridKernalContext kernalCtx) { + if (ptr != 0) + return; + + ctx = PlatformUtils.platformContext(kernalCtx.grid()); + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(pred); + + out.synchronize(); + + ptr = ctx.gateway().messagingFilterCreate(mem.pointer()); + } + } + + /** {@inheritDoc} */ + @Override public void onClose() { + if (ptr == 0) // Already destroyed or not initialized yet. + return; + + try { + assert ctx != null; + + ctx.gateway().messagingFilterDestroy(ptr); + } + finally { + ptr = 0; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java index 8a27508..50643e1 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java @@ -17,18 +17,19 @@ package org.apache.ignite.internal.processors.platform.messaging; -import java.util.UUID; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter; import org.apache.ignite.internal.portable.PortableRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; + +import java.util.UUID; /** * Interop local filter. Delegates apply to native platform, uses id to identify native target. */ -public class PlatformMessageLocalFilter implements GridLifecycleAwareMessageFilter<UUID, Object> { +public class PlatformMessageLocalFilter implements PlatformMessageFilter { /** */ private static final long serialVersionUID = 0L; @@ -71,7 +72,7 @@ public class PlatformMessageLocalFilter implements GridLifecycleAwareMessageFilt } /** {@inheritDoc} */ - @Override public void close() { + @Override public void onClose() { platformCtx.gateway().messagingFilterDestroy(hnd); } http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java index 968edd5..6dfd570 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java @@ -17,17 +17,18 @@ package org.apache.ignite.internal.processors.platform.messaging; -import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteMessaging; -import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter; import org.apache.ignite.internal.portable.PortableRawReaderEx; import org.apache.ignite.internal.portable.PortableRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.lang.IgniteFuture; +import java.util.UUID; + /** * Interop messaging. */ @@ -144,7 +145,7 @@ public class PlatformMessaging extends PlatformAbstractTarget { Object topic = reader.readObjectDetached(); - GridLifecycleAwareMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr); + PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr); UUID listenId = messaging.remoteListen(topic, filter);
