IGNITE-1318: Moved platform data streamer to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16c095a9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16c095a9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16c095a9 Branch: refs/heads/ignite-1093 Commit: 16c095a9e9a30db630caa8a6ecec98ac5256962e Parents: 207b682 Author: vozerov-gridgain <[email protected]> Authored: Fri Aug 28 15:01:06 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Aug 28 15:01:06 2015 +0300 ---------------------------------------------------------------------- .../processors/platform/PlatformContext.java | 11 + .../datastreamer/PlatformDataStreamer.java | 222 +++++++++++++++++++ .../datastreamer/PlatformStreamReceiver.java | 114 ++++++++++ 3 files changed, 347 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/16c095a9/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java index cbcc91b..9b4a891 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.platform.cache.query.*; import org.apache.ignite.internal.processors.platform.callback.*; import org.apache.ignite.internal.processors.platform.compute.*; import org.apache.ignite.internal.processors.platform.memory.*; +import org.apache.ignite.stream.*; import org.jetbrains.annotations.*; import java.util.*; @@ -249,4 +250,14 @@ public interface PlatformContext { * @return Entry filter. */ public PlatformCacheEntryFilter createCacheEntryFilter(Object filter, long ptr); + + /** + * Create stream receiver. + * + * @param rcv Native receiver. + * @param ptr Pointer. + * @param keepPortable Keep portable flag. + * @return Stream receiver. + */ + public StreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable); } http://git-wip-us.apache.org/repos/asf/ignite/blob/16c095a9/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java new file mode 100644 index 0000000..fc9f535 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.datastreamer; + +import org.apache.ignite.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.datastreamer.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.internal.util.lang.*; + +import java.util.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * Interop data streamer wrapper. + */ +@SuppressWarnings({"UnusedDeclaration", "unchecked"}) +public class PlatformDataStreamer extends PlatformAbstractTarget { + /** Policy: continue. */ + private static final int PLC_CONTINUE = 0; + + /** Policy: close. */ + private static final int PLC_CLOSE = 1; + + /** Policy: cancel and close. */ + private static final int PLC_CANCEL_CLOSE = 2; + + /** Policy: do flush. */ + private static final int PLC_FLUSH = 3; + + /** */ + private static final int OP_UPDATE = 1; + + /** */ + private static final int OP_RECEIVER = 2; + + /** Cache name. */ + private final String cacheName; + + /** Data streamer. */ + private final DataStreamerImpl ldr; + + /** Portable flag. */ + private final boolean keepPortable; + + /** Topology update event listener. */ + private volatile GridLocalEventListener lsnr; + + /** + * Constructor. + * + * @param platformCtx Context. + * @param ldr Data streamer. + */ + public PlatformDataStreamer(PlatformContext platformCtx, String cacheName, DataStreamerImpl ldr, + boolean keepPortable) { + super(platformCtx); + + this.cacheName = cacheName; + this.ldr = ldr; + this.keepPortable = keepPortable; + } + + /** {@inheritDoc} */ + @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case OP_UPDATE: + int plc = reader.readInt(); + + if (plc == PLC_CANCEL_CLOSE) { + // Close with cancel. + platformCtx.kernalContext().event().removeLocalEventListener(lsnr); + + ldr.close(true); + } + else { + final long futPtr = reader.readLong(); + + int valsCnt = reader.readInt(); + + if (valsCnt > 0) { + Collection<GridMapEntry> vals = new ArrayList<>(valsCnt); + + for (int i = 0; i < valsCnt; i++) + vals.add(new GridMapEntry(reader.readObjectDetached(), reader.readObjectDetached())); + + PlatformFutureUtils.listen(platformCtx, ldr.addData(vals), futPtr, + PlatformFutureUtils.TYP_OBJ); + } + + if (plc == PLC_CLOSE) { + platformCtx.kernalContext().event().removeLocalEventListener(lsnr); + + ldr.close(false); + } + else if (plc == PLC_FLUSH) + ldr.tryFlush(); + else + assert plc == PLC_CONTINUE; + } + + return TRUE; + + case OP_RECEIVER: + long ptr = reader.readLong(); + + Object rec = reader.readObjectDetached(); + + ldr.receiver(platformCtx.createStreamReceiver(rec, ptr, keepPortable)); + + return TRUE; + + default: + return throwUnsupported(type); + } + } + + /** + * Listen topology changes. + * + * @param ptr Pointer. + */ + public void listenTopology(final long ptr) { + lsnr = new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + long topVer = discoEvt.topologyVersion(); + int topSize = platformCtx.kernalContext().discovery().cacheNodes( + cacheName, new AffinityTopologyVersion(topVer)).size(); + + platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize); + } + }; + + platformCtx.kernalContext().event().addLocalEventListener(lsnr, EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT); + + GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery(); + + long topVer = discoMgr.topologyVersion(); + int topSize = discoMgr.cacheNodes(cacheName, new AffinityTopologyVersion(topVer)).size(); + + platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize); + } + + /** + * @return Allow-overwrite flag. + */ + public boolean allowOverwrite() { + return ldr.allowOverwrite(); + } + + /** + * @param val Allow-overwrite flag. + */ + public void allowOverwrite(boolean val) { + ldr.allowOverwrite(val); + } + + /** + * @return Skip store flag. + */ + public boolean skipStore() { + return ldr.skipStore(); + } + + /** + * @param skipStore Skip store flag. + */ + public void skipStore(boolean skipStore) { + ldr.skipStore(skipStore); + } + + /** + * @return Per-node buffer size. + */ + public int perNodeBufferSize() { + return ldr.perNodeBufferSize(); + } + + /** + * @param val Per-node buffer size. + */ + public void perNodeBufferSize(int val) { + ldr.perNodeBufferSize(val); + } + + /** + * @return Per-node parallel load operations. + */ + public int perNodeParallelOperations() { + return ldr.perNodeParallelOperations(); + } + + /** + * @param val Per-node parallel load operations. + */ + public void perNodeParallelOperations(int val) { + ldr.perNodeParallelOperations(val); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/16c095a9/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java new file mode 100644 index 0000000..70bfb6b --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.datastreamer; + +import org.apache.ignite.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.cache.*; +import org.apache.ignite.internal.processors.platform.memory.*; +import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.stream.*; + +import java.io.*; +import java.util.*; + +/** + * Interop receiver. + */ +public class PlatformStreamReceiver<K, V> extends PlatformAbstractPredicate implements StreamReceiver<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private boolean keepPortable; + + /** + * Constructor. + */ + public PlatformStreamReceiver() + { + super(); + } + + /** + * Constructor. + * + * @param pred .Net portable receiver. + * @param ptr Pointer to receiver in the native platform. + * @param ctx Kernal context. + */ + public PlatformStreamReceiver(Object pred, long ptr, boolean keepPortable, PlatformContext ctx) { + super(pred, ptr, ctx); + + assert pred != null; + + this.keepPortable = keepPortable; + } + + /** {@inheritDoc} */ + @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> collection) + throws IgniteException { + assert ctx != null; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(pred); + + writer.writeInt(collection.size()); + + for (Map.Entry<K, V> e : collection) { + writer.writeObject(e.getKey()); + writer.writeObject(e.getValue()); + } + + out.synchronize(); + + ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, + new PlatformCache(ctx, cache, keepPortable), mem.pointer(), keepPortable); + } + } + + /** + * @param ignite Ignite instance. + */ + @SuppressWarnings("UnusedDeclaration") + @IgniteInstanceResource + public void setIgniteInstance(Ignite ignite) { + ctx = PlatformUtils.platformContext(ignite); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeBoolean(keepPortable); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + keepPortable = in.readBoolean(); + } + +}
