http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java new file mode 100644 index 0000000..410e4de --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.query; + +import javax.cache.Cache; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; + +/** + * Interop cursor for regular queries. + */ +public class PlatformQueryCursor extends PlatformAbstractQueryCursor<Cache.Entry> { + /** + * Constructor. + * + * @param platformCtx Context. + * @param cursor Backing cursor. + * @param batchSize Batch size. + */ + public PlatformQueryCursor(PlatformContext platformCtx, QueryCursorEx<Cache.Entry> cursor, int batchSize) { + super(platformCtx, cursor, batchSize); + } + + /** {@inheritDoc} */ + @Override protected void write(PortableRawWriterEx writer, Cache.Entry val) { + writer.writeObjectDetached(val.getKey()); + writer.writeObjectDetached(val.getValue()); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java new file mode 100644 index 0000000..a741f0f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.store; + +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; + +/** + * Platform cache store callback. + */ +public abstract class PlatformCacheStoreCallback { + /** Context. */ + protected final PlatformContext ctx; + + /** + * Constructor. + * + * @param ctx Context. + */ + protected PlatformCacheStoreCallback(PlatformContext ctx) { + this.ctx = ctx; + } + + /** + * Invoke the callback. + * + * @param memPtr Memory pointer. + */ + public void invoke(long memPtr) { + if (memPtr > 0) { + try (PlatformMemory mem = ctx.memory().get(memPtr)) { + PortableRawReaderEx reader = ctx.reader(mem); + + invoke0(reader); + } + } + } + + /** + * Internal invoke routine. + * + * @param reader Reader. + */ + protected abstract void invoke0(PortableRawReaderEx reader); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java new file mode 100644 index 0000000..a1c8516 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cluster; + +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCluster; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.cluster.ClusterGroupEx; +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; + +/** + * Interop projection. + */ +@SuppressWarnings({"UnusedDeclaration"}) +public class PlatformClusterGroup extends PlatformAbstractTarget { + /** */ + private static final int OP_ALL_METADATA = 1; + + /** */ + private static final int OP_FOR_ATTRIBUTE = 2; + + /** */ + private static final int OP_FOR_CACHE = 3; + + /** */ + private static final int OP_FOR_CLIENT = 4; + + /** */ + private static final int OP_FOR_DATA = 5; + + /** */ + private static final int OP_FOR_HOST = 6; + + /** */ + private static final int OP_FOR_NODE_IDS = 7; + + /** */ + private static final int OP_METADATA = 8; + + /** */ + private static final int OP_METRICS = 9; + + /** */ + private static final int OP_METRICS_FILTERED = 10; + + /** */ + private static final int OP_NODE_METRICS = 11; + + /** */ + private static final int OP_NODES = 12; + + /** */ + private static final int OP_PING_NODE = 13; + + /** */ + private static final int OP_TOPOLOGY = 14; + + /** Projection. */ + private final ClusterGroupEx prj; + + /** + * Constructor. + * + * @param platformCtx Context. + * @param prj Projection. + */ + public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) { + super(platformCtx); + + this.prj = prj; + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException { + switch (type) { + case OP_METRICS: + platformCtx.writeClusterMetrics(writer, prj.metrics()); + + break; + + case OP_ALL_METADATA: + platformCtx.writeAllMetadata(writer); + + break; + + default: + super.processOutStream(type, writer); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions", "deprecation"}) + @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer) + throws IgniteCheckedException { + switch (type) { + case OP_METRICS_FILTERED: { + Collection<UUID> ids = PlatformUtils.readCollection(reader); + + platformCtx.writeClusterMetrics(writer, prj.forNodeIds(ids).metrics()); + + break; + } + + case OP_NODES: { + long oldTopVer = reader.readLong(); + + long curTopVer = platformCtx.kernalContext().discovery().topologyVersion(); + + if (curTopVer > oldTopVer) { + writer.writeBoolean(true); + + writer.writeLong(curTopVer); + + // At this moment topology version might have advanced, and due to this race + // we return outdated top ver to the callee. But this race is benign, the only + // possible side effect is that the user will re-request nodes and we will return + // the same set of nodes but with more recent topology version. + Collection<ClusterNode> nodes = prj.nodes(); + + platformCtx.writeNodes(writer, nodes); + } + else + // No discovery events since last invocation. + writer.writeBoolean(false); + + break; + } + + case OP_NODE_METRICS: { + UUID nodeId = reader.readUuid(); + + long lastUpdateTime = reader.readLong(); + + // Ask discovery because node might have been filtered out of current projection. + ClusterNode node = platformCtx.kernalContext().discovery().node(nodeId); + + ClusterMetrics metrics = null; + + if (node != null) { + ClusterMetrics metrics0 = node.metrics(); + + long triggerTime = lastUpdateTime + platformCtx.kernalContext().config().getMetricsUpdateFrequency(); + + metrics = metrics0.getLastUpdateTime() > triggerTime ? metrics0 : null; + } + + platformCtx.writeClusterMetrics(writer, metrics); + + break; + } + + case OP_METADATA: { + int typeId = reader.readInt(); + + platformCtx.writeMetadata(writer, typeId); + + break; + } + + case OP_TOPOLOGY: { + long topVer = reader.readLong(); + + platformCtx.writeNodes(writer, topology(topVer)); + + break; + } + + default: + super.processInStreamOutStream(type, reader, writer); + } + } + + /** {@inheritDoc} */ + @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case OP_PING_NODE: + return pingNode(reader.readUuid()) ? TRUE : FALSE; + + default: + return super.processInStreamOutLong(type, reader); + } + } + + /** {@inheritDoc} */ + @Override protected Object processInStreamOutObject(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case OP_FOR_NODE_IDS: { + Collection<UUID> ids = PlatformUtils.readCollection(reader); + + return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forNodeIds(ids)); + } + + case OP_FOR_ATTRIBUTE: + return new PlatformClusterGroup(platformCtx, + (ClusterGroupEx)prj.forAttribute(reader.readString(), reader.readString())); + + case OP_FOR_CACHE: { + String cacheName = reader.readString(); + + return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forCacheNodes(cacheName)); + } + + case OP_FOR_CLIENT: { + String cacheName = reader.readString(); + + return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forClientNodes(cacheName)); + } + + case OP_FOR_DATA: { + String cacheName = reader.readString(); + + return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDataNodes(cacheName)); + } + + case OP_FOR_HOST: { + UUID nodeId = reader.readUuid(); + + ClusterNode node = prj.node(nodeId); + + return new PlatformClusterGroup(platformCtx, (ClusterGroupEx) prj.forHost(node)); + } + + default: + return super.processInStreamOutObject(type, reader); + } + } + + /** + * @param exclude Projection to exclude. + * @return New projection. + */ + public PlatformClusterGroup forOthers(PlatformClusterGroup exclude) { + return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOthers(exclude.prj)); + } + + /** + * @return New projection. + */ + public PlatformClusterGroup forRemotes() { + return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes()); + } + + /** + * @return New projection. + */ + public PlatformClusterGroup forDaemons() { + return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDaemons()); + } + + /** + * @return New projection. + */ + public PlatformClusterGroup forRandom() { + return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRandom()); + } + + /** + * @return New projection. + */ + public PlatformClusterGroup forOldest() { + return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOldest()); + } + + /** + * @return New projection. + */ + public PlatformClusterGroup forYoungest() { + return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forYoungest()); + } + + /** + * @return Projection. + */ + public ClusterGroupEx projection() { + return prj; + } + + /** + * Resets local I/O, job, and task execution metrics. + */ + public void resetMetrics() { + assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group. + + ((IgniteCluster)prj).resetMetrics(); + } + + /** + * Pings a remote node. + */ + private boolean pingNode(UUID nodeId) { + assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group. + + return ((IgniteCluster)prj).pingNode(nodeId); + } + + /** + * Gets a topology by version. Returns {@code null} if topology history storage doesn't contain + * specified topology version (history currently keeps last {@code 1000} snapshots). + * + * @param topVer Topology version. + * @return Collection of grid nodes which represented by specified topology version, + * if it is present in history storage, {@code null} otherwise. + * @throws UnsupportedOperationException If underlying SPI implementation does not support + * topology history. Currently only {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} + * supports topology history. + */ + private Collection<ClusterNode> topology(long topVer) { + assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group. + + return ((IgniteCluster)prj).topology(topVer); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java new file mode 100644 index 0000000..5ba9a85 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cluster; + +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.resources.IgniteInstanceResource; + +/** + * Interop cluster node filter. + */ +public class PlatformClusterNodeFilterImpl extends PlatformAbstractPredicate implements PlatformClusterNodeFilter { + /** */ + private static final long serialVersionUID = 0L; + + /** + * {@link java.io.Externalizable} support. + */ + public PlatformClusterNodeFilterImpl() { + // No-op. + } + + /** + * Constructor. + * + * @param pred .Net portable predicate. + * @param ctx Kernal context. + */ + public PlatformClusterNodeFilterImpl(Object pred, PlatformContext ctx) { + super(pred, 0, ctx); + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(pred); + ctx.writeNode(writer, clusterNode); + + out.synchronize(); + + return ctx.gateway().clusterNodeFilterApply(mem.pointer()) != 0; + } + } + + /** + * @param ignite Ignite instance. + */ + @SuppressWarnings("UnusedDeclaration") + @IgniteInstanceResource + public void setIgniteInstance(Ignite ignite) { + ctx = PlatformUtils.platformContext(ignite); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java new file mode 100644 index 0000000..bf9d9e4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.compute; + +import java.io.Externalizable; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformProcessor; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.jetbrains.annotations.Nullable; + +/** + * Base interop job. + */ +public abstract class PlatformAbstractJob implements PlatformJob, Externalizable { + /** Marker object denoting the job execution result is stored in native platform. */ + static final Object LOC_JOB_RES = new Object(); + + /** Grid name. */ + @IgniteInstanceResource + protected transient Ignite ignite; + + /** Parent task; present only on local job instance. */ + protected transient PlatformAbstractTask task; + + /** Pointer to job in the native platform. */ + protected transient long ptr; + + /** Job. */ + protected Object job; + + /** + * {@link java.io.Externalizable} support. + */ + protected PlatformAbstractJob() { + // No-op. + } + + /** + * Constructor. + * + * @param task Parent task. + * @param ptr Pointer. + * @param job Job. + */ + protected PlatformAbstractJob(PlatformAbstractTask task, long ptr, Object job) { + this.task = task; + this.ptr = ptr; + this.job = job; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + try { + PlatformProcessor interopProc = PlatformUtils.platformProcessor(ignite); + + interopProc.awaitStart(); + + return execute0(interopProc.context()); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * Internal job execution routine. + * + * @param ctx Interop processor. + * @return Result. + * @throws org.apache.ignite.IgniteCheckedException If failed. + */ + protected abstract Object execute0(PlatformContext ctx) throws IgniteCheckedException; + + /** + * Create job in native platform if needed. + * + * @param ctx Context. + * @return {@code True} if job was created, {@code false} if this is local job and creation is not necessary. + * @throws org.apache.ignite.IgniteCheckedException If failed. + */ + protected boolean createJob(PlatformContext ctx) throws IgniteCheckedException { + if (ptr == 0) { + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(job); + + out.synchronize(); + + ptr = ctx.gateway().computeJobCreate(mem.pointer()); + } + + return true; + } + else + return false; + } + + /** + * Run local job. + * + * @param ctx Context. + * @param cancel Cancel flag. + * @return Result. + */ + protected Object runLocal(PlatformContext ctx, boolean cancel) { + // Local job, must execute it with respect to possible concurrent task completion. + if (task.onJobLock()) { + try { + ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, 0); + + return LOC_JOB_RES; + } + finally { + task.onJobUnlock(); + } + } + else + // Task has completed concurrently, no need to run the job. + return null; + } + + /** {@inheritDoc} */ + @Override public long pointer() { + return ptr; + } + + /** {@inheritDoc} */ + @Override public Object job() { + return job; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java new file mode 100644 index 0000000..b17dd97 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.compute; + +import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.IgniteException; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeJobResultPolicy; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformNativeException; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.util.typedef.X; +import org.jetbrains.annotations.Nullable; + +/** + * Base class for all interop tasks. + */ +public abstract class PlatformAbstractTask implements ComputeTask<Object, Void> { + /** Platform context. */ + protected final PlatformContext ctx; + + /** Pointer to the task in the native platform. */ + protected final long taskPtr; + + /** Lock for safe access to native pointers. */ + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** Done flag. */ + protected boolean done; + + /** + * Constructor. + * + * @param ctx Platform context. + * @param taskPtr Task pointer. + */ + protected PlatformAbstractTask(PlatformContext ctx, long taskPtr) { + this.ctx = ctx; + this.taskPtr = taskPtr; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"}) + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + assert rcvd.isEmpty() : "Should not cache result in Java for interop task"; + + int plc; + + lock.readLock().lock(); + + try { + assert !done; + + PlatformAbstractJob job = res.getJob(); + + assert job.pointer() != 0; + + Object res0bj = res.getData(); + + if (res0bj == PlatformAbstractJob.LOC_JOB_RES) + // Processing local job execution result. + plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), 0); + else { + // Processing remote job execution result or exception. + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeUuid(res.getNode().id()); + writer.writeBoolean(res.isCancelled()); + + IgniteException err = res.getException(); + + PlatformUtils.writeInvocationResult(writer, res0bj, err); + + out.synchronize(); + + plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), mem.pointer()); + } + } + + ComputeJobResultPolicy plc0 = ComputeJobResultPolicy.fromOrdinal((byte) plc); + + assert plc0 != null : plc; + + return plc0; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) { + assert results.isEmpty() : "Should not cache result in java for interop task"; + + lock.readLock().lock(); + + try { + assert !done; + + ctx.gateway().computeTaskReduce(taskPtr); + } + finally { + lock.readLock().unlock(); + } + + return null; + } + + /** + * Callback invoked when task future is completed and all resources could be safely cleaned up. + * + * @param e If failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void onDone(Exception e) { + lock.writeLock().lock(); + + try { + assert !done; + + if (e == null) + // Normal completion. + ctx.gateway().computeTaskComplete(taskPtr, 0); + else { + PlatformNativeException e0 = X.cause(e, PlatformNativeException.class); + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + if (e0 == null) { + writer.writeBoolean(false); + writer.writeString(e.getClass().getName()); + writer.writeString(e.getMessage()); + } + else { + writer.writeBoolean(true); + writer.writeObject(e0.cause()); + } + + out.synchronize(); + + ctx.gateway().computeTaskComplete(taskPtr, mem.pointer()); + } + } + } + finally { + // Done flag is set irrespective of any exceptions. + done = true; + + lock.writeLock().unlock(); + } + } + + /** + * Callback invoked by job when it wants to lock the task. + * + * @return {@code} True if task is not completed yet, {@code false} otherwise. + */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") + public boolean onJobLock() { + lock.readLock().lock(); + + if (done) { + lock.readLock().unlock(); + + return false; + } + else + return true; + } + + /** + * Callback invoked by job when task can be unlocked. + */ + public void onJobUnlock() { + assert !done; + + lock.readLock().unlock(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java new file mode 100644 index 0000000..5570586 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.compute; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeLoadBalancer; +import org.apache.ignite.compute.ComputeTaskNoResultCache; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.resources.LoadBalancerResource; +import org.jetbrains.annotations.Nullable; + +/** + * Interop multi-closure task with node balancing. + */ +@ComputeTaskNoResultCache +public class PlatformBalancingMultiClosureTask extends PlatformAbstractTask { + /** */ + private static final long serialVersionUID = 0L; + + /** Jobs. */ + private Collection<PlatformJob> jobs; + + /** Load balancer. */ + @SuppressWarnings("UnusedDeclaration") + @LoadBalancerResource + private ComputeLoadBalancer lb; + + /** + * Constructor. + * + * @param ctx Platform context. + * @param taskPtr Task pointer. + */ + public PlatformBalancingMultiClosureTask(PlatformContext ctx, long taskPtr) { + super(ctx, taskPtr); + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) { + assert !F.isEmpty(jobs) : "Jobs emptiness must be checked in native platform."; + + if (!F.isEmpty(subgrid)) { + Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size(), 1); + + for (PlatformJob job : jobs) + map.put(job, lb.getBalancedNode(job, null)); + + return map; + } + else + return Collections.emptyMap(); + } + + /** + * @param jobs Jobs. + */ + public void jobs(Collection<PlatformJob> jobs) { + this.jobs = jobs; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java new file mode 100644 index 0000000..a168144 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.compute; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeTaskNoResultCache; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Interop single-closure task with node balancing. + */ +@ComputeTaskNoResultCache +public class PlatformBalancingSingleClosureAffinityTask extends PlatformAbstractTask { + /** */ + private static final long serialVersionUID = 0L; + + /** Job. */ + private PlatformJob job; + + /** Node, according to affinity. */ + private ClusterNode node; + + /** + * Constructor. + * + * @param ctx Platform context. + * @param taskPtr Task pointer. + */ + public PlatformBalancingSingleClosureAffinityTask(PlatformContext ctx, long taskPtr) { + super(ctx, taskPtr); + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) { + assert job != null : "Job null-check must be performed in native platform."; + + return Collections.singletonMap(job, node); + } + + /** + * @param job Job. + */ + public void job(PlatformJob job) { + this.job = job; + } + + /** + * Init affinity. + * + * @param cacheName Cache name. + * @param affKey Affinity key. + * @param ctx Kernal context. + */ + public void affinity(String cacheName, Object affKey, GridKernalContext ctx) { + try { + final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); + + node = ctx.affinity().mapKeyToNode(cacheName, affKey0); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java new file mode 100644 index 0000000..3f1d66a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.compute; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeLoadBalancer; +import org.apache.ignite.compute.ComputeTaskNoResultCache; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.resources.LoadBalancerResource; +import org.jetbrains.annotations.Nullable; + +/** + * Interop single-closure task with node balancing. + */ +@ComputeTaskNoResultCache +public class PlatformBalancingSingleClosureTask extends PlatformAbstractTask { + /** */ + private static final long serialVersionUID = 0L; + + /** Jobs. */ + private PlatformJob job; + + /** Load balancer. */ + @SuppressWarnings("UnusedDeclaration") + @LoadBalancerResource + private ComputeLoadBalancer lb; + + /** + * Constructor. + * + * @param ctx Platform context. + * @param taskPtr Task pointer. + */ + public PlatformBalancingSingleClosureTask(PlatformContext ctx, long taskPtr) { + super(ctx, taskPtr); + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) { + assert job != null : "Job null-check must be performed in native platform."; + + if (!F.isEmpty(subgrid)) { + Map<ComputeJob, ClusterNode> map = new HashMap<>(1, 1); + + map.put(job, lb.getBalancedNode(job, null)); + + return map; + } + else + return Collections.emptyMap(); + } + + /** + * @param job Job. + */ + public void job(PlatformJob job) { + this.job = job; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java new file mode 100644 index 0000000..d2bd0ac --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.compute; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeTaskNoResultCache; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** + * Interop multi-closure task with broadcast semantics. + */ +@ComputeTaskNoResultCache +public class PlatformBroadcastingMultiClosureTask extends PlatformAbstractTask { + /** */ + private static final long serialVersionUID = 0L; + + /** Jobs. */ + private Collection<PlatformJob> jobs; + + /** + * Constructor. + * + * @param ctx Platform context. + * @param taskPtr Task pointer. + */ + public PlatformBroadcastingMultiClosureTask(PlatformContext ctx, long taskPtr) { + super(ctx, taskPtr); + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) { + assert !F.isEmpty(jobs) : "Jobs emptiness must be checked in native platform."; + + if (!F.isEmpty(subgrid)) { + Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size() * subgrid.size(), 1); + + for (PlatformJob job : jobs) { + boolean first = true; + + for (ClusterNode node : subgrid) { + if (first) { + map.put(job, node); + + first = false; + } + else + map.put(ctx.createClosureJob(this, job.pointer(), job.job()), node); + } + } + + return map; + } + else + return Collections.emptyMap(); + } + + /** + * @param jobs Jobs. + */ + public void jobs(Collection<PlatformJob> jobs) { + this.jobs = jobs; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java new file mode 100644 index 0000000..0736988 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.compute; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeTaskNoResultCache; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** + * Interop single-closure task with broadcast semantics. + */ +@ComputeTaskNoResultCache +public class PlatformBroadcastingSingleClosureTask extends PlatformAbstractTask { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private PlatformJob job; + + /** + * Constructor. + * + * @param ctx Platform context. + * @param taskPtr Task pointer. + */ + public PlatformBroadcastingSingleClosureTask(PlatformContext ctx, long taskPtr) { + super(ctx, taskPtr); + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) { + assert job != null : "Job null-check must be performed in native platform."; + + if (!F.isEmpty(subgrid)) { + Map<ComputeJob, ClusterNode> map = new HashMap<>(subgrid.size(), 1); + + boolean first = true; + + for (ClusterNode node : subgrid) { + if (first) { + map.put(job, node); + + first = false; + } + else + map.put(ctx.createClosureJob(this, job.pointer(), job.job()), node); + } + + return map; + } + else + return Collections.emptyMap(); + } + + /** + * @param job Job. + */ + public void job(PlatformJob job) { + this.job = job; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java new file mode 100644 index 0000000..9bd7d60 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.compute; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Light-weight interop job. Comparing to regular job, this guy has simpler logic because we should not + * bother with delayed serialization and cancellation. + */ +public class PlatformClosureJob extends PlatformAbstractJob { + /** */ + private static final long serialVersionUID = 0L; + + /** + * {@link java.io.Externalizable} support. + */ + public PlatformClosureJob() { + // No-op. + } + + /** + * Constructor. + * + * @param task Parent task. + * @param ptr Job pointer. + * @param job Job. + */ + public PlatformClosureJob(PlatformAbstractTask task, long ptr, Object job) { + super(task, ptr, job); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute0(PlatformContext ctx) throws IgniteCheckedException { + if (task == null) { + // Remote job execution. + assert ptr == 0; + + createJob(ctx); + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformInputStream in = mem.input(); + + ctx.gateway().computeJobExecute(ptr, 0, mem.pointer()); + + in.synchronize(); + + PortableRawReaderEx reader = ctx.reader(in); + + return PlatformUtils.readInvocationResult(ctx, reader); + } + finally { + ctx.gateway().computeJobDestroy(ptr); + } + } + else { + // Local job execution. + assert ptr != 0; + + return runLocal(ctx, false); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + assert job != null; + + out.writeObject(job); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + job = in.readObject(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java new file mode 100644 index 0000000..638b4b1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.compute; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.internal.IgniteComputeImpl; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.portable.PortableObjectImpl; +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.portable.PortableObject; + +import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; + +/** + * Interop compute. + */ +@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored", "UnusedDeclaration"}) +public class PlatformCompute extends PlatformAbstractTarget { + /** */ + private static final int OP_AFFINITY = 1; + + /** */ + private static final int OP_BROADCAST = 2; + + /** */ + private static final int OP_EXEC = 3; + + /** */ + private static final int OP_EXEC_ASYNC = 4; + + /** */ + private static final int OP_UNICAST = 5; + + /** Compute instance. */ + private final IgniteComputeImpl compute; + + /** Future for previous asynchronous operation. */ + protected ThreadLocal<IgniteFuture<?>> curFut = new ThreadLocal<>(); + /** + * Constructor. + * + * @param platformCtx Context. + * @param compute Compute instance. + */ + public PlatformCompute(PlatformContext platformCtx, IgniteComputeImpl compute) { + super(platformCtx); + + this.compute = compute; + } + + /** {@inheritDoc} */ + @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case OP_UNICAST: + processClosures(reader.readLong(), reader, false, false); + + return TRUE; + + case OP_BROADCAST: + processClosures(reader.readLong(), reader, true, false); + + return TRUE; + + case OP_AFFINITY: + processClosures(reader.readLong(), reader, false, true); + + return TRUE; + + default: + return super.processInStreamOutLong(type, reader); + } + } + + /** + * Process closure execution request. + * + * @param taskPtr Task pointer. + * @param reader Reader. + * @param broadcast broadcast flag. + */ + private void processClosures(long taskPtr, PortableRawReaderEx reader, boolean broadcast, boolean affinity) { + PlatformAbstractTask task; + + int size = reader.readInt(); + + if (size == 1) { + if (broadcast) { + PlatformBroadcastingSingleClosureTask task0 = + new PlatformBroadcastingSingleClosureTask(platformCtx, taskPtr); + + task0.job(nextClosureJob(task0, reader)); + + task = task0; + } + else if (affinity) { + PlatformBalancingSingleClosureAffinityTask task0 = + new PlatformBalancingSingleClosureAffinityTask(platformCtx, taskPtr); + + task0.job(nextClosureJob(task0, reader)); + + task0.affinity(reader.readString(), reader.readObjectDetached(), platformCtx.kernalContext()); + + task = task0; + } + else { + PlatformBalancingSingleClosureTask task0 = new PlatformBalancingSingleClosureTask(platformCtx, taskPtr); + + task0.job(nextClosureJob(task0, reader)); + + task = task0; + } + } + else { + if (broadcast) + task = new PlatformBroadcastingMultiClosureTask(platformCtx, taskPtr); + else + task = new PlatformBalancingMultiClosureTask(platformCtx, taskPtr); + + Collection<PlatformJob> jobs = new ArrayList<>(size); + + for (int i = 0; i < size; i++) + jobs.add(nextClosureJob(task, reader)); + + if (broadcast) + ((PlatformBroadcastingMultiClosureTask)task).jobs(jobs); + else + ((PlatformBalancingMultiClosureTask)task).jobs(jobs); + } + + platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, compute.clusterGroup().nodes()); + + executeNative0(task); + } + + /** + * Read the next closure job from the reader. + * + * @param task Task. + * @param reader Reader. + * @return Closure job. + */ + private PlatformJob nextClosureJob(PlatformAbstractTask task, PortableRawReaderEx reader) { + return platformCtx.createClosureJob(task, reader.readLong(), reader.readObjectDetached()); + } + + /** {@inheritDoc} */ + @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer) + throws IgniteCheckedException { + switch (type) { + case OP_EXEC: + writer.writeObjectDetached(executeJavaTask(reader, false)); + + break; + + case OP_EXEC_ASYNC: + writer.writeObjectDetached(executeJavaTask(reader, true)); + + break; + + default: + super.processInStreamOutStream(type, reader, writer); + } + } + + /** + * Execute native full-fledged task. + * + * @param taskPtr Pointer to the task. + * @param topVer Topology version. + */ + public void executeNative(long taskPtr, long topVer) { + final PlatformFullTask task = new PlatformFullTask(platformCtx, compute, taskPtr, topVer); + + executeNative0(task); + } + + /** + * Set "withTimeout" state. + * + * @param timeout Timeout (milliseconds). + */ + public void withTimeout(long timeout) { + compute.withTimeout(timeout); + } + + /** + * Set "withNoFailover" state. + */ + public void withNoFailover() { + compute.withNoFailover(); + } + + /** <inheritDoc /> */ + @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { + IgniteFuture<?> fut = curFut.get(); + + if (fut == null) + throw new IllegalStateException("Asynchronous operation not started."); + + return fut; + } + + /** + * Execute task. + * + * @param task Task. + */ + private void executeNative0(final PlatformAbstractTask task) { + IgniteInternalFuture fut = compute.executeAsync(task, null); + + fut.listen(new IgniteInClosure<IgniteInternalFuture>() { + private static final long serialVersionUID = 0L; + + @Override public void apply(IgniteInternalFuture fut) { + try { + fut.get(); + + task.onDone(null); + } + catch (IgniteCheckedException e) { + task.onDone(e); + } + } + }); + } + + /** + * Execute task taking arguments from the given reader. + * + * @param reader Reader. + * @return Task result. + */ + protected Object executeJavaTask(PortableRawReaderEx reader, boolean async) { + String taskName = reader.readString(); + boolean keepPortable = reader.readBoolean(); + Object arg = reader.readObjectDetached(); + + Collection<UUID> nodeIds = readNodeIds(reader); + + IgniteCompute compute0 = computeForTask(nodeIds); + + if (async) + compute0 = compute0.withAsync(); + + if (!keepPortable && arg instanceof PortableObjectImpl) + arg = ((PortableObject)arg).deserialize(); + + Object res = compute0.execute(taskName, arg); + + if (async) { + curFut.set(compute0.future().chain(new C1<IgniteFuture, Object>() { + private static final long serialVersionUID = 0L; + + @Override public Object apply(IgniteFuture fut) { + return toPortable(fut.get()); + } + })); + + return null; + } + else + return toPortable(res); + } + + /** + * Convert object to portable form. + * + * @param src Source object. + * @return Result. + */ + private Object toPortable(Object src) { + return platformCtx.kernalContext().grid().portables().toPortable(src); + } + + /** + * Read node IDs. + * + * @param reader Reader. + * @return Node IDs. + */ + protected Collection<UUID> readNodeIds(PortableRawReaderEx reader) { + if (reader.readBoolean()) { + int len = reader.readInt(); + + List<UUID> res = new ArrayList<>(len); + + for (int i = 0; i < len; i++) + res.add(reader.readUuid()); + + return res; + } + else + return null; + } + + /** + * Get compute object for the given node IDs. + * + * @param nodeIds Node IDs. + * @return Compute object. + */ + protected IgniteCompute computeForTask(Collection<UUID> nodeIds) { + return nodeIds == null ? compute : + platformCtx.kernalContext().grid().compute(compute.clusterGroup().forNodeIds(nodeIds)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java new file mode 100644 index 0000000..cfed735 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.compute; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformProcessor; +import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Wrapper around job created in native platform. + * <p> + * If the job is expected to be executed locally, it contains only pointer to the corresponding entity in the native + * platform. In case of topology change or failover, job is serialized on demand. + * <p> + * If we know in advance that the job is to be executed on remote node, then it is serialized into byte array right + * away. + * <p> + * This class is not thread safe. + */ +@SuppressWarnings({"FieldCanBeLocal"}) +public class PlatformFullJob extends PlatformAbstractJob { + /** */ + private static final long serialVersionUID = 0L; + + /** Job is initialized. */ + private static final byte STATE_INIT = 0; + + /** Job is running. */ + private static final byte STATE_RUNNING = 1; + + /** Job execution completed. */ + private static final byte STATE_COMPLETED = 2; + + /** Job cancelled. */ + private static final byte STATE_CANCELLED = 3; + + /** Platform context. */ + private transient PlatformContext ctx; + + /** Serialized job. */ + private transient byte state; + + /** + * {@link java.io.Externalizable} support. + */ + @SuppressWarnings("UnusedDeclaration") + public PlatformFullJob() { + // No-op. + } + + /** + * Constructor. + * + * @param ctx Platform context. + * @param task Parent task. + * @param ptr Job pointer. + * @param job Job. + */ + public PlatformFullJob(PlatformContext ctx, PlatformAbstractTask task, long ptr, Object job) { + super(task, ptr, job); + + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute0(PlatformContext ctx) throws IgniteCheckedException { + boolean cancel = false; + + synchronized (this) { + // 1. Create job if necessary. + if (task == null) { + assert ptr == 0; + + createJob(ctx); + } + else + assert ptr != 0; + + // 2. Set correct state. + if (state == STATE_INIT) + state = STATE_RUNNING; + else { + assert state == STATE_CANCELLED; + + cancel = true; + } + } + + try { + if (task != null) + return runLocal(ctx, cancel); + else { + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformInputStream in = mem.input(); + + ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, mem.pointer()); + + in.synchronize(); + + PortableRawReaderEx reader = ctx.reader(in); + + return PlatformUtils.readInvocationResult(ctx, reader); + } + } + } + finally { + synchronized (this) { + if (task == null) { + assert ptr != 0; + + ctx.gateway().computeJobDestroy(ptr); + } + + if (state == STATE_RUNNING) + state = STATE_COMPLETED; + } + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + PlatformProcessor proc = PlatformUtils.platformProcessor(ignite); + + synchronized (this) { + if (state == STATE_INIT) + state = STATE_CANCELLED; + else if (state == STATE_RUNNING) { + assert ptr != 0; + + try { + proc.context().gateway().computeJobCancel(ptr); + } + finally { + state = STATE_CANCELLED; + } + } + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + if (job == null) { + assert ptr != 0; + + try { + if (task != null) { + if (task.onJobLock()) { + try { + serialize(); + } + finally { + task.onJobUnlock(); + } + } + else + throw new IgniteCheckedException("Task already completed: " + task); + } + else + serialize(); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to serialize interop job.", e); + } + } + + assert job != null; + + out.writeObject(job); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + job = in.readObject(); + } + + /** + * Internal job serialization routine. + * + * @throws org.apache.ignite.IgniteCheckedException If failed. + */ + private void serialize() throws IgniteCheckedException { + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformInputStream in = mem.input(); + + boolean res = ctx.gateway().computeJobSerialize(ptr, mem.pointer()) == 1; + + in.synchronize(); + + PortableRawReaderEx reader = ctx.reader(in); + + if (res) + job = reader.readObjectDetached(); + else + throw new IgniteCheckedException(reader.readString()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java new file mode 100644 index 0000000..b96d445 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.compute; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeTaskNoResultCache; +import org.apache.ignite.internal.IgniteComputeImpl; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Interop task which requires full execution cycle. + */ +@ComputeTaskNoResultCache +public final class PlatformFullTask extends PlatformAbstractTask { + /** */ + private static final long serialVersionUID = 0L; + + /** Initial topology version. */ + private final long topVer; + + /** Compute instance. */ + private final IgniteComputeImpl compute; + + /** + * Constructor. + * + * @param ctx Platform context. + * @param compute Target compute instance. + * @param taskPtr Pointer to the task in the native platform. + * @param topVer Initial topology version. + */ + public PlatformFullTask(PlatformContext ctx, IgniteComputeImpl compute, long taskPtr, long topVer) { + super(ctx, taskPtr); + + this.compute = compute; + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) { + assert arg == null; + + lock.readLock().lock(); + + try { + assert !done; + + Collection<ClusterNode> nodes = compute.clusterGroup().nodes(); + + PlatformMemoryManager memMgr = ctx.memory(); + + try (PlatformMemory outMem = memMgr.allocate()) { + PlatformOutputStream out = outMem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + write(writer, nodes, subgrid); + + out.synchronize(); + + try (PlatformMemory inMem = memMgr.allocate()) { + PlatformInputStream in = inMem.input(); + + ctx.gateway().computeTaskMap(taskPtr, outMem.pointer(), inMem.pointer()); + + in.synchronize(); + + PortableRawReaderEx reader = ctx.reader(in); + + return read(reader, nodes); + } + } + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Write topology information. + * + * @param writer Writer. + * @param nodes Current topology nodes. + * @param subgrid Subgrid. + */ + private void write(PortableRawWriterEx writer, Collection<ClusterNode> nodes, List<ClusterNode> subgrid) { + GridDiscoveryManager discoMgr = ctx.kernalContext().discovery(); + + long curTopVer = discoMgr.topologyVersion(); + + if (topVer != curTopVer) { + writer.writeBoolean(true); + + writer.writeLong(curTopVer); + + writer.writeInt(nodes.size()); + + // Write subgrid size for more precise collection allocation on native side. + writer.writeInt(subgrid.size()); + + for (ClusterNode node : nodes) { + ctx.writeNode(writer, node); + writer.writeBoolean(subgrid.contains(node)); + } + } + else + writer.writeBoolean(false); + } + + /** + * Read map result. + * + * @param reader Reader. + * @param nodes Current topology nodes. + * @return Map result. + */ + private Map<ComputeJob, ClusterNode> read(PortableRawReaderEx reader, Collection<ClusterNode> nodes) { + if (reader.readBoolean()) { + if (!reader.readBoolean()) + return null; + + int size = reader.readInt(); + + Map<ComputeJob, ClusterNode> map = U.newHashMap(size); + + for (int i = 0; i < size; i++) { + long ptr = reader.readLong(); + + Object nativeJob = reader.readBoolean() ? reader.readObjectDetached() : null; + + PlatformJob job = ctx.createJob(this, ptr, nativeJob); + + UUID jobNodeId = reader.readUuid(); + + assert jobNodeId != null; + + ClusterNode jobNode = ctx.kernalContext().discovery().node(jobNodeId); + + if (jobNode == null) { + // Special case when node has left the grid at this point. + // We expect task processor to perform necessary failover. + for (ClusterNode node : nodes) { + if (node.id().equals(jobNodeId)) { + jobNode = node; + + break; + } + } + + assert jobNode != null; + } + + map.put(job, jobNode); + } + + return map; + } + else + throw new IgniteException(reader.readString()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java new file mode 100644 index 0000000..d066296 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cpp; + +import org.apache.ignite.internal.processors.platform.PlatformAbstractBootstrap; +import org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure; + +/** + * Platform .Net bootstrap. + */ +public class PlatformCppBootstrap extends PlatformAbstractBootstrap { + /** {@inheritDoc} */ + @Override protected PlatformAbstractConfigurationClosure closure(long envPtr) { + return new PlatformCppConfigurationClosure(envPtr); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java new file mode 100644 index 0000000..4933713 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cpp; + +import org.apache.ignite.internal.processors.platform.PlatformBootstrap; +import org.apache.ignite.internal.processors.platform.PlatformBootstrapFactory; + +/** + * Platform .Net bootstrap factory. + */ +public class PlatformCppBootstrapFactory implements PlatformBootstrapFactory { + /** Bootstrap ID. */ + public static final int ID = 2; + + /** {@inheritDoc} */ + @Override public int id() { + return ID; + } + + /** {@inheritDoc} */ + @Override public PlatformBootstrap create() { + return new PlatformCppBootstrap(); + } +} \ No newline at end of file