IGNITE-1315: Moved events to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a0eeea6f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a0eeea6f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a0eeea6f Branch: refs/heads/ignite-1093 Commit: a0eeea6fb61c203f5a3ec7b7e394839223c27eb3 Parents: 27a59cf Author: vozerov-gridgain <[email protected]> Authored: Fri Aug 28 11:00:44 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Aug 28 11:00:44 2015 +0300 ---------------------------------------------------------------------- .../platform/PlatformAwareEventFilter.java | 4 +- .../processors/platform/PlatformContext.java | 34 ++ .../platform/events/PlatformEventFilter.java | 161 ++++++++ .../platform/events/PlatformEvents.java | 388 +++++++++++++++++++ 4 files changed, 586 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java index f056bbf..b09d889 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java @@ -21,10 +21,12 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.lang.*; +import java.util.*; + /** * Special version of predicate for events with initialize/close callbacks. */ -public interface PlatformAwareEventFilter<E extends Event> extends IgnitePredicate<E> { +public interface PlatformAwareEventFilter<E extends Event> extends IgnitePredicate<E>, IgniteBiPredicate<UUID, E> { /** * Initializes the filter. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java index 68e0e35..82a42d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.portable.*; @@ -164,4 +165,37 @@ public interface PlatformContext { * @return Filter. */ public GridLifecycleAwareMessageFilter<UUID, Object> createRemoteMessageFilter(Object filter, long ptr); + + /** + * Check whether the given event type is supported. + * + * @param evtTyp Event type. + * @return {@code True} if supported. + */ + public boolean isEventTypeSupported(int evtTyp); + + /** + * Write event. + * + * @param writer Writer. + * @param event Event. + */ + public void writeEvent(PortableRawWriterEx writer, EventAdapter event); + + /** + * Create local event filter. + * + * @param hnd Native handle. + * @return Filter. + */ + public <E extends Event> PlatformAwareEventFilter<E> createLocalEventFilter(long hnd); + + /** + * Create remote event filter. + * + * @param pred Native predicate. + * @param types Event types. + * @return Filter. + */ + public <E extends Event> PlatformAwareEventFilter<E> createRemoteEventFilter(Object pred, final int... types); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java new file mode 100644 index 0000000..7255dbb --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.events; + +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.memory.*; +import org.apache.ignite.internal.processors.platform.utils.*; + +import java.util.*; + +/** + * Platform event filter. Delegates apply to native platform. + */ +public class PlatformEventFilter<E extends Event> implements PlatformAwareEventFilter<E>, PlatformLocalEventListener +{ + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object pred; + + /** Event types. */ + private final int[] types; + + /** */ + protected transient long hnd; + + /** */ + private transient PlatformContext ctx; + + /** + * Constructor. + * + * @param hnd Handle in the native platform. + * @param ctx Context. + */ + public PlatformEventFilter(long hnd, PlatformContext ctx) { + assert ctx != null; + assert hnd != 0; + + this.hnd = hnd; + this.ctx = ctx; + + pred = null; + types = null; + } + + /** + * Constructor. + * + * @param pred .Net portable predicate. + */ + public PlatformEventFilter(Object pred, final int... types) { + assert pred != null; + + this.pred = pred; + this.types = types; + } + + /** {@inheritDoc} */ + @Override public boolean apply(E evt) { + return apply0(null, evt); + } + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, E evt) { + return apply0(uuid, evt); + } + + /** + * Apply impl. + * @param uuid Node if. + * @param evt Event. + * @return Result. + */ + private boolean apply0(final UUID uuid, final E evt) { + if (!ctx.isEventTypeSupported(evt.type())) + return false; + + if (types != null) { + boolean match = false; + + for (int type : types) { + if (type == evt.type()) { + match = true; + break; + } + } + + if (!match) + return false; + } + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + ctx.writeEvent(writer, (EventAdapter)evt); + + writer.writeUuid(uuid); + + out.synchronize(); + + int res = ctx.gateway().eventFilterApply(hnd, mem.pointer()); + + return res != 0; + } + } + + /** {@inheritDoc} */ + @Override public void close() { + ctx.gateway().eventFilterDestroy(hnd); + } + + /** {@inheritDoc} */ + @Override public void initialize(GridKernalContext gridCtx) { + ctx = PlatformUtils.platformContext(gridCtx.grid()); + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObjectDetached(pred); + + out.synchronize(); + + hnd = ctx.gateway().eventFilterCreate(mem.pointer()); + } + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return this == o || o != null && o instanceof PlatformEventFilter && + hnd == ((PlatformEventFilter)o).hnd; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(hnd ^ (hnd >>> 32)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java new file mode 100644 index 0000000..befc3bd --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.events; + +import org.apache.ignite.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Interop events. + */ +public class PlatformEvents extends PlatformAbstractTarget { + /** */ + private static final int OP_REMOTE_QUERY = 1; + + /** */ + private static final int OP_REMOTE_LISTEN = 2; + + /** */ + private static final int OP_STOP_REMOTE_LISTEN = 3; + + /** */ + private static final int OP_WAIT_FOR_LOCAL = 4; + + /** */ + private static final int OP_LOCAL_QUERY = 5; + + /** */ + private static final int OP_RECORD_LOCAL = 6; + + /** */ + private static final int OP_ENABLE_LOCAL = 8; + + /** */ + private static final int OP_DISABLE_LOCAL = 9; + + /** */ + private static final int OP_GET_ENABLED_EVENTS = 10; + + /** */ + private final IgniteEvents events; + + /** */ + private final EventResultWriter eventResWriter; + + /** */ + private final EventCollectionResultWriter eventColResWriter; + + /** + * Ctor. + * + * @param platformCtx Context. + * @param events Ignite events. + */ + public PlatformEvents(PlatformContext platformCtx, IgniteEvents events) { + super(platformCtx); + + assert events != null; + + this.events = events; + + eventResWriter = new EventResultWriter(platformCtx); + eventColResWriter = new EventCollectionResultWriter(platformCtx); + } + + /** + * Gets events with asynchronous mode enabled. + * + * @return Events with asynchronous mode enabled. + */ + public PlatformEvents withAsync() { + if (events.isAsync()) + return this; + + return new PlatformEvents(platformCtx, events.withAsync()); + } + + /** + * Adds an event listener for local events. + * + * @param hnd Interop listener handle. + * @param type Event type. + */ + @SuppressWarnings({"unchecked"}) + public void localListen(long hnd, int type) { + events.localListen(localFilter(hnd), type); + } + + /** + * Removes an event listener for local events. + * + * @param hnd Interop listener handle. + */ + @SuppressWarnings({"UnusedDeclaration", "unchecked"}) + public boolean stopLocalListen(long hnd) { + return events.stopLocalListen(localFilter(hnd)); + } + + /** + * Check if event is enabled. + * + * @param type Event type. + * @return {@code True} if event of passed in type is enabled. + */ + @SuppressWarnings("UnusedDeclaration") + public boolean isEnabled(int type) { + return events.isEnabled(type); + } + + /** {@inheritDoc} */ + @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case OP_RECORD_LOCAL: + // TODO: GG-10244 + break; + + case OP_ENABLE_LOCAL: + + events.enableLocal(readEventTypes(reader)); + + return TRUE; + + case OP_DISABLE_LOCAL: + + events.disableLocal(readEventTypes(reader)); + + return TRUE; + + case OP_STOP_REMOTE_LISTEN: + events.stopRemoteListen(reader.readUuid()); + + return TRUE; + } + + throw new IgniteCheckedException("Unsupported operation type: " + type); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"}) + @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer, + Object arg) throws IgniteCheckedException { + switch (type) { + case OP_LOCAL_QUERY: { + Collection<EventAdapter> result = + events.localQuery(F.<EventAdapter>alwaysTrue(), readEventTypes(reader)); + + writer.writeInt(result.size()); + + for (EventAdapter e : result) + platformCtx.writeEvent(writer, e); + + break; + } + + case OP_WAIT_FOR_LOCAL: { + boolean hasFilter = reader.readBoolean(); + + IgnitePredicate pred = hasFilter ? localFilter(reader.readLong()) : null; + + int[] eventTypes = readEventTypes(reader); + + EventAdapter result = (EventAdapter) events.waitForLocal(pred, eventTypes); + + platformCtx.writeEvent(writer, result); + + break; + } + + case OP_REMOTE_LISTEN: { + int bufSize = reader.readInt(); + + long interval = reader.readLong(); + + boolean autoUnsubscribe = reader.readBoolean(); + + boolean hasLocFilter = reader.readBoolean(); + + PlatformAwareEventFilter locFilter = hasLocFilter ? localFilter(reader.readLong()) : null; + + boolean hasRmtFilter = reader.readBoolean(); + + UUID listenId; + + if (hasRmtFilter) { + PlatformAwareEventFilter rmtFilter = platformCtx.createRemoteEventFilter( + reader.readObjectDetached(), readEventTypes(reader)); + + listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, rmtFilter); + } + else + listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, null, + readEventTypes(reader)); + + writer.writeUuid(listenId); + + break; + } + + case OP_REMOTE_QUERY: { + Object pred = reader.readObjectDetached(); + + long timeout = reader.readLong(); + + int[] types = readEventTypes(reader); + + PlatformAwareEventFilter filter = platformCtx.createRemoteEventFilter(pred, types); + + Collection<EventAdapter> result = events.remoteQuery(filter, timeout); + + if (result == null) + writer.writeInt(-1); + else { + writer.writeInt(result.size()); + + for (EventAdapter e : result) + platformCtx.writeEvent(writer, e); + } + + break; + } + + default: + throw new IgniteCheckedException("Unsupported operation type: " + type); + } + } + + /** {@inheritDoc} */ + @Override protected void processOutOp(int type, PortableRawWriterEx writer) throws IgniteCheckedException { + switch (type) { + case OP_GET_ENABLED_EVENTS: + writeEventTypes(events.enabledEvents(), writer); + + break; + + default: + throwUnsupported(type); + } + } + + /** <inheritDoc /> */ + @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { + return events.future(); + } + + /** <inheritDoc /> */ + @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) { + switch (opId) { + case OP_WAIT_FOR_LOCAL: + return eventResWriter; + + case OP_REMOTE_QUERY: + return eventColResWriter; + } + + return null; + } + + /** + * Reads event types array. + * + * @param reader Reader + * @return Event types, or null. + */ + private int[] readEventTypes(PortableRawReaderEx reader) { + return reader.readIntArray(); + } + + /** + * Reads event types array. + * + * @param writer Writer + * @param types Types. + */ + private void writeEventTypes(int[] types, PortableRawWriterEx writer) { + if (types == null) { + writer.writeIntArray(null); + + return; + } + + int[] resultTypes = new int[types.length]; + + int idx = 0; + + for (int t : types) + if (platformCtx.isEventTypeSupported(t)) + resultTypes[idx++] = t; + + writer.writeIntArray(Arrays.copyOf(resultTypes, idx)); + } + + /** + * Creates an interop filter from handle. + * + * @param hnd Handle. + * @return Interop filter. + */ + private PlatformAwareEventFilter localFilter(long hnd) { + return platformCtx.createLocalEventFilter(hnd); + } + + /** + * Writes an EventBase. + */ + private static class EventResultWriter implements PlatformFutureUtils.Writer { + /** */ + private final PlatformContext platformCtx; + + /** + * Constructor. + * + * @param platformCtx Context. + */ + public EventResultWriter(PlatformContext platformCtx) { + assert platformCtx != null; + + this.platformCtx = platformCtx; + } + + /** <inheritDoc /> */ + @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { + platformCtx.writeEvent(writer, (EventAdapter)obj); + } + + /** <inheritDoc /> */ + @Override public boolean canWrite(Object obj, Throwable err) { + return obj instanceof EventAdapter && err == null; + } + } + + /** + * Writes a collection of EventAdapter. + */ + private static class EventCollectionResultWriter implements PlatformFutureUtils.Writer { + /** */ + private final PlatformContext platformCtx; + + /** + * Constructor. + * + * @param platformCtx Context. + */ + public EventCollectionResultWriter(PlatformContext platformCtx) { + assert platformCtx != null; + + this.platformCtx = platformCtx; + } + + /** <inheritDoc /> */ + @SuppressWarnings("unchecked") + @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { + Collection<EventAdapter> events = (Collection<EventAdapter>)obj; + + writer.writeInt(events.size()); + + for (EventAdapter e : events) + platformCtx.writeEvent(writer, e); + } + + /** <inheritDoc /> */ + @Override public boolean canWrite(Object obj, Throwable err) { + return obj instanceof Collection && err == null; + } + } +} +
