IGNITE-1319: Moved platform services to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e2f522ba Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e2f522ba Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e2f522ba Branch: refs/heads/ignite-1093 Commit: e2f522ba0c450e5045393824cc66bceeba716628 Parents: 02f2465 Author: vozerov-gridgain <[email protected]> Authored: Fri Aug 28 16:23:15 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Aug 28 16:23:15 2015 +0300 ---------------------------------------------------------------------- .../processors/platform/PlatformContext.java | 9 + .../platform/services/PlatformService.java | 44 ++++ .../platform/dotnet/PlatformDotNetService.java | 27 ++ .../dotnet/PlatformDotNetServiceImpl.java | 47 ++++ .../services/PlatformAbstractService.java | 223 ++++++++++++++++ .../platform/services/PlatformServices.java | 252 +++++++++++++++++++ 6 files changed, 602 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f522ba/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java index 9b4a891..064cd91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.platform.cache.query.*; import org.apache.ignite.internal.processors.platform.callback.*; import org.apache.ignite.internal.processors.platform.compute.*; import org.apache.ignite.internal.processors.platform.memory.*; +import org.apache.ignite.lang.*; import org.apache.ignite.stream.*; import org.jetbrains.annotations.*; @@ -260,4 +261,12 @@ public interface PlatformContext { * @return Stream receiver. */ public StreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable); + + /** + * Create cluster node filter. + * + * @param filter Native filter. + * @return Cluster node filter. + */ + public IgnitePredicate<ClusterNode> createClusterNodeFilter(Object filter); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f522ba/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformService.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformService.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformService.java new file mode 100644 index 0000000..7b1daad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformService.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.services; + +import org.apache.ignite.*; +import org.apache.ignite.services.*; + +/** + * Base class for all platform services. + */ +public interface PlatformService extends Service { + /** + * Invokes native service method. + * + * @param mthdName Method name. + * @param srvKeepPortable Server keep portable flag. + * @param args Arguments. + * @return Resulting data. + * @throws org.apache.ignite.IgniteCheckedException If failed. + */ + public Object invokeMethod(String mthdName, boolean srvKeepPortable, Object[] args) throws IgniteCheckedException; + + /** + * Gets native pointer. + * + * @return Native pointer. + */ + public long pointer(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f522ba/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java new file mode 100644 index 0000000..7c61cf8 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.dotnet; + +import org.apache.ignite.internal.processors.platform.services.*; + +/** + * Marker interface to denote a service implemented on .Net platform. + */ +public interface PlatformDotNetService extends PlatformService { + // No-op. +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f522ba/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java new file mode 100644 index 0000000..74e143d --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.dotnet; + +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.services.*; + +/** + * Interop .Net service. + */ +public class PlatformDotNetServiceImpl extends PlatformAbstractService implements PlatformDotNetService { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Default constructor for serialization. + */ + public PlatformDotNetServiceImpl() { + // No-op. + } + + /** + * Constructor. + * + * @param svc Service. + * @param ctx Context. + * @param srvKeepPortable Whether to keep objects portable on server if possible. + */ + public PlatformDotNetServiceImpl(Object svc, PlatformContext ctx, boolean srvKeepPortable) { + super(svc, ctx, srvKeepPortable); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f522ba/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java new file mode 100644 index 0000000..d53b9b5 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.services; + +import org.apache.ignite.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.memory.*; +import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.services.*; + +import java.io.*; + +/** + * Base platform service implementation. + */ +public abstract class PlatformAbstractService implements PlatformService, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** .Net portable service. */ + protected Object svc; + + /** Whether to keep objects portable on server if possible. */ + protected boolean srvKeepPortable; + + /** Pointer to deployed service. */ + protected transient long ptr; + + /** Context. */ + protected transient PlatformContext platformCtx; + + /** + * Default constructor for serialization. + */ + public PlatformAbstractService() { + // No-op. + } + + /** + * Constructor. + * + * @param svc Service. + * @param ctx Context. + * @param srvKeepPortable Whether to keep objects portable on server if possible. + */ + public PlatformAbstractService(Object svc, PlatformContext ctx, boolean srvKeepPortable) { + assert svc != null; + assert ctx != null; + + this.svc = svc; + this.platformCtx = ctx; + this.srvKeepPortable = srvKeepPortable; + } + + /** {@inheritDoc} */ + @Override public void init(ServiceContext ctx) throws Exception { + assert ptr == 0; + assert platformCtx != null; + + try (PlatformMemory mem = platformCtx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = platformCtx.writer(out); + + writer.writeBoolean(srvKeepPortable); + writer.writeObject(svc); + + writeServiceContext(ctx, writer); + + out.synchronize(); + + ptr = platformCtx.gateway().serviceInit(mem.pointer()); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void execute(ServiceContext ctx) throws Exception { + assert ptr != 0; + assert platformCtx != null; + + try (PlatformMemory mem = platformCtx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = platformCtx.writer(out); + + writer.writeBoolean(srvKeepPortable); + + writeServiceContext(ctx, writer); + + out.synchronize(); + + platformCtx.gateway().serviceExecute(ptr, mem.pointer()); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void cancel(ServiceContext ctx) { + assert ptr != 0; + assert platformCtx != null; + + try (PlatformMemory mem = platformCtx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = platformCtx.writer(out); + + writer.writeBoolean(srvKeepPortable); + + writeServiceContext(ctx, writer); + + out.synchronize(); + + platformCtx.gateway().serviceCancel(ptr, mem.pointer()); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * Writes service context. + * + * @param ctx Context. + * @param writer Writer. + */ + private void writeServiceContext(ServiceContext ctx, PortableRawWriterEx writer) { + writer.writeString(ctx.name()); + writer.writeUuid(ctx.executionId()); + writer.writeBoolean(ctx.isCancelled()); + writer.writeString(ctx.cacheName()); + writer.writeObject(ctx.affinityKey()); + } + + /** {@inheritDoc} */ + @Override public long pointer() { + assert ptr != 0; + + return ptr; + } + + /** {@inheritDoc} */ + @Override public Object invokeMethod(String mthdName, boolean srvKeepPortable, Object[] args) + throws IgniteCheckedException { + assert ptr != 0; + assert platformCtx != null; + + try (PlatformMemory outMem = platformCtx.memory().allocate()) { + PlatformOutputStream out = outMem.output(); + PortableRawWriterEx writer = platformCtx.writer(out); + + writer.writeBoolean(srvKeepPortable); + writer.writeString(mthdName); + + if (args == null) + writer.writeBoolean(false); + else { + writer.writeBoolean(true); + writer.writeInt(args.length); + + for (Object arg : args) + writer.writeObjectDetached(arg); + } + + out.synchronize(); + + try (PlatformMemory inMem = platformCtx.memory().allocate()) { + PlatformInputStream in = inMem.input(); + + PortableRawReaderEx reader = platformCtx.reader(in); + + platformCtx.gateway().serviceInvokeMethod(ptr, outMem.pointer(), inMem.pointer()); + + in.synchronize(); + + return PlatformUtils.readInvocationResult(platformCtx, reader); + } + } + } + + /** + * @param ignite Ignite instance. + */ + @SuppressWarnings("UnusedDeclaration") + @IgniteInstanceResource + public void setIgniteInstance(Ignite ignite) { + platformCtx = PlatformUtils.platformContext(ignite); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + svc = in.readObject(); + srvKeepPortable = in.readBoolean(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(svc); + out.writeBoolean(srvKeepPortable); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f522ba/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java new file mode 100644 index 0000000..d0956f9 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.services; + +import org.apache.ignite.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.dotnet.*; +import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.services.*; + +import java.util.*; + +/** + * Interop services. + */ +@SuppressWarnings({"UnusedDeclaration"}) +public class PlatformServices extends PlatformAbstractTarget { + /** */ + private static final int OP_DOTNET_DEPLOY = 1; + + /** */ + private static final int OP_DOTNET_DEPLOY_MULTIPLE = 2; + + /** */ + private static final int OP_DOTNET_SERVICES = 3; + + /** */ + private static final int OP_DOTNET_INVOKE = 4; + + /** */ + private static final int OP_DESCRIPTORS = 5; + + /** */ + private final IgniteServices services; + + /** Server keep portable flag. */ + private final boolean srvKeepPortable; + + /** + * Ctor. + * + * @param platformCtx Context. + * @param services Services facade. + * @param srvKeepPortable Server keep portable flag. + */ + public PlatformServices(PlatformContext platformCtx, IgniteServices services, boolean srvKeepPortable) { + super(platformCtx); + + assert services != null; + + this.services = services; + this.srvKeepPortable = srvKeepPortable; + } + + /** + * Gets services with asynchronous mode enabled. + * + * @return Services with asynchronous mode enabled. + */ + public PlatformServices withAsync() { + if (services.isAsync()) + return this; + + return new PlatformServices(platformCtx, services.withAsync(), srvKeepPortable); + } + + /** + * Gets services with server "keep portable" mode enabled. + * + * @return Services with server "keep portable" mode enabled. + */ + public PlatformServices withServerKeepPortable() { + return srvKeepPortable ? this : new PlatformServices(platformCtx, services, true); + } + + /** + * Cancels service deployment. + * + * @param name Name of service to cancel. + */ + public void cancel(String name) { + services.cancel(name); + } + + /** + * Cancels all deployed services. + */ + public void cancelAll() { + services.cancelAll(); + } + + /** + * Gets a remote handle on the service. + * + * @param name Service name. + * @param sticky Whether or not Ignite should always contact the same remote service. + * @return Either proxy over remote service or local service if it is deployed locally. + */ + public Object dotNetServiceProxy(String name, boolean sticky) { + return services.serviceProxy(name, PlatformDotNetService.class, sticky); + } + + /** {@inheritDoc} */ + @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case OP_DOTNET_DEPLOY: { + ServiceConfiguration cfg = new ServiceConfiguration(); + + cfg.setName(reader.readString()); + cfg.setService(new PlatformDotNetServiceImpl(reader.readObjectDetached(), platformCtx, srvKeepPortable)); + cfg.setTotalCount(reader.readInt()); + cfg.setMaxPerNodeCount(reader.readInt()); + cfg.setCacheName(reader.readString()); + cfg.setAffinityKey(reader.readObjectDetached()); + + Object filter = reader.readObjectDetached(); + + if (filter != null) + cfg.setNodeFilter(platformCtx.createClusterNodeFilter(filter)); + + services.deploy(cfg); + + return TRUE; + } + + case OP_DOTNET_DEPLOY_MULTIPLE: { + String name = reader.readString(); + Object svc = reader.readObjectDetached(); + int totalCnt = reader.readInt(); + int maxPerNodeCnt = reader.readInt(); + + services.deployMultiple(name, new PlatformDotNetServiceImpl(svc, platformCtx, srvKeepPortable), + totalCnt, maxPerNodeCnt); + + return TRUE; + } + } + + return super.processInOp(type, reader); + } + + /** {@inheritDoc} */ + @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer, + Object arg) throws IgniteCheckedException { + switch (type) { + case OP_DOTNET_SERVICES: { + Collection<Service> svcs = services.services(reader.readString()); + + PlatformUtils.writeNullableCollection(writer, svcs, + new PlatformWriterClosure<Service>() { + @Override public void write(PortableRawWriterEx writer, Service svc) { + writer.writeLong(((PlatformService) svc).pointer()); + } + }, + new IgnitePredicate<Service>() { + @Override public boolean apply(Service svc) { + return svc instanceof PlatformDotNetService; + } + } + ); + + return; + } + + case OP_DOTNET_INVOKE: { + assert arg != null; + assert arg instanceof PlatformDotNetService; + + String mthdName = reader.readString(); + + Object[] args; + + if (reader.readBoolean()) { + args = new Object[reader.readInt()]; + + for (int i = 0; i < args.length; i++) + args[i] = reader.readObjectDetached(); + } + else + args = null; + + try { + Object result = ((PlatformDotNetService)arg).invokeMethod(mthdName, srvKeepPortable, args); + + PlatformUtils.writeInvocationResult(writer, result, null); + } + catch (Exception e) { + PlatformUtils.writeInvocationResult(writer, null, e); + } + + return; + } + } + + super.processInOutOp(type, reader, writer, arg); + } + + /** {@inheritDoc} */ + @Override protected void processOutOp(int type, PortableRawWriterEx writer) throws IgniteCheckedException { + switch (type) { + case OP_DESCRIPTORS: { + Collection<ServiceDescriptor> descs = services.serviceDescriptors(); + + PlatformUtils.writeCollection(writer, descs, new PlatformWriterClosure<ServiceDescriptor>() { + @Override public void write(PortableRawWriterEx writer, ServiceDescriptor d) { + writer.writeString(d.name()); + writer.writeString(d.cacheName()); + writer.writeInt(d.maxPerNodeCount()); + writer.writeInt(d.totalCount()); + writer.writeUuid(d.originNodeId()); + writer.writeObject(d.affinityKey()); + + Map<UUID, Integer> top = d.topologySnapshot(); + + PlatformUtils.writeMap(writer, top, new PlatformWriterBiClosure<UUID, Integer>() { + @Override public void write(PortableRawWriterEx writer, UUID key, Integer val) { + writer.writeUuid(key); + writer.writeInt(val); + } + }); + } + }); + + return; + } + } + + super.processOutOp(type, writer); + } + + /** <inheritDoc /> */ + @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { + return services.future(); + } +}
