IGNITE-1513: Merged Java to core module.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8045c820 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8045c820 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8045c820 Branch: refs/heads/ignite-1282 Commit: 8045c820ceb2730a2a5ca96b2f7173ea715b348e Parents: 7ad8e80 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Fri Sep 18 13:02:45 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Fri Sep 18 13:02:47 2015 +0300 ---------------------------------------------------------------------- ...processors.platform.PlatformBootstrapFactory | 2 + .../platform/PlatformAbstractBootstrap.java | 48 + .../PlatformAbstractConfigurationClosure.java | 61 + .../platform/PlatformAbstractPredicate.java | 67 ++ .../platform/PlatformAbstractTarget.java | 320 +++++ .../processors/platform/PlatformBootstrap.java | 35 + .../platform/PlatformBootstrapFactory.java | 37 + .../platform/PlatformConfigurationEx.java | 48 + .../platform/PlatformContextImpl.java | 621 ++++++++++ .../processors/platform/PlatformIgnition.java | 189 +++ .../platform/PlatformProcessorImpl.java | 360 ++++++ .../platform/cache/PlatformCache.java | 1090 ++++++++++++++++++ .../cache/PlatformCacheEntryFilterImpl.java | 106 ++ .../cache/PlatformCacheEntryProcessorImpl.java | 220 ++++ .../platform/cache/PlatformCacheIterator.java | 72 ++ .../PlatformCachePartialUpdateException.java | 59 + .../cache/affinity/PlatformAffinity.java | 296 +++++ .../query/PlatformAbstractQueryCursor.java | 192 +++ .../query/PlatformContinuousQueryImpl.java | 235 ++++ .../PlatformContinuousQueryRemoteFilter.java | 188 +++ .../cache/query/PlatformFieldsQueryCursor.java | 49 + .../cache/query/PlatformQueryCursor.java | 45 + .../cache/store/PlatformCacheStoreCallback.java | 61 + .../platform/cluster/PlatformClusterGroup.java | 335 ++++++ .../cluster/PlatformClusterNodeFilterImpl.java | 78 ++ .../platform/compute/PlatformAbstractJob.java | 156 +++ .../platform/compute/PlatformAbstractTask.java | 206 ++++ .../PlatformBalancingMultiClosureTask.java | 83 ++ ...tformBalancingSingleClosureAffinityTask.java | 88 ++ .../PlatformBalancingSingleClosureTask.java | 81 ++ .../PlatformBroadcastingMultiClosureTask.java | 87 ++ .../PlatformBroadcastingSingleClosureTask.java | 84 ++ .../platform/compute/PlatformClosureJob.java | 104 ++ .../platform/compute/PlatformCompute.java | 332 ++++++ .../platform/compute/PlatformFullJob.java | 220 ++++ .../platform/compute/PlatformFullTask.java | 192 +++ .../platform/cpp/PlatformCppBootstrap.java | 31 + .../cpp/PlatformCppBootstrapFactory.java | 39 + .../cpp/PlatformCppConfigurationClosure.java | 99 ++ .../cpp/PlatformCppConfigurationEx.java | 82 ++ .../datastreamer/PlatformDataStreamer.java | 226 ++++ .../PlatformStreamReceiverImpl.java | 119 ++ .../dotnet/PlatformDotNetBootstrap.java | 31 + .../dotnet/PlatformDotNetBootstrapFactory.java | 39 + .../dotnet/PlatformDotNetCacheStore.java | 497 ++++++++ .../PlatformDotNetConfigurationClosure.java | 255 ++++ .../dotnet/PlatformDotNetConfigurationEx.java | 91 ++ .../platform/dotnet/PlatformDotNetService.java | 27 + .../dotnet/PlatformDotNetServiceImpl.java | 47 + .../events/PlatformEventFilterListenerImpl.java | 163 +++ .../platform/events/PlatformEvents.java | 396 +++++++ .../lifecycle/PlatformLifecycleBean.java | 75 ++ .../platform/memory/PlatformAbstractMemory.java | 121 ++ .../PlatformBigEndianInputStreamImpl.java | 126 ++ .../PlatformBigEndianOutputStreamImpl.java | 161 +++ .../platform/memory/PlatformExternalMemory.java | 55 + .../memory/PlatformInputStreamImpl.java | 331 ++++++ .../memory/PlatformMemoryManagerImpl.java | 85 ++ .../platform/memory/PlatformMemoryPool.java | 140 +++ .../platform/memory/PlatformMemoryUtils.java | 467 ++++++++ .../memory/PlatformOutputStreamImpl.java | 267 +++++ .../platform/memory/PlatformPooledMemory.java | 64 + .../platform/memory/PlatformUnpooledMemory.java | 51 + .../messaging/PlatformMessageFilterImpl.java | 110 ++ .../messaging/PlatformMessageLocalFilter.java | 102 ++ .../platform/messaging/PlatformMessaging.java | 166 +++ .../services/PlatformAbstractService.java | 230 ++++ .../platform/services/PlatformServices.java | 275 +++++ .../transactions/PlatformTransactions.java | 259 +++++ .../platform/utils/PlatformFutureUtils.java | 397 +++++++ .../platform/utils/PlatformReaderBiClosure.java | 34 + .../platform/utils/PlatformReaderClosure.java | 34 + .../platform/utils/PlatformUtils.java | 812 +++++++++++++ .../platform/utils/PlatformWriterBiClosure.java | 34 + .../platform/utils/PlatformWriterClosure.java | 33 + .../platform/cpp/PlatformCppConfiguration.java | 47 + .../ignite/platform/cpp/package-info.java | 22 + .../dotnet/PlatformDotNetCacheStoreFactory.java | 139 +++ .../dotnet/PlatformDotNetConfiguration.java | 97 ++ .../dotnet/PlatformDotNetLifecycleBean.java | 109 ++ .../PlatformDotNetPortableConfiguration.java | 196 ++++ ...PlatformDotNetPortableTypeConfiguration.java | 214 ++++ .../ignite/platform/dotnet/package-info.java | 22 + .../apache/ignite/platform/package-info.java | 22 + .../platform/PlatformComputeBroadcastTask.java | 73 ++ .../platform/PlatformComputeDecimalTask.java | 106 ++ .../platform/PlatformComputeEchoTask.java | 188 +++ .../ignite/platform/PlatformComputeEnum.java | 28 + .../platform/PlatformComputeJavaPortable.java | 39 + .../platform/PlatformComputePortable.java | 42 + .../PlatformComputePortableArgTask.java | 119 ++ .../platform/PlatformEventsWriteEventTask.java | 146 +++ .../ignite/platform/PlatformMaxMemoryTask.java | 57 + .../ignite/platform/PlatformMinMemoryTask.java | 57 + .../lifecycle/PlatformJavaLifecycleBean.java | 47 + .../lifecycle/PlatformJavaLifecycleTask.java | 65 ++ modules/platform/pom.xml | 80 -- ...processors.platform.PlatformBootstrapFactory | 2 - .../platform/PlatformAbstractBootstrap.java | 48 - .../PlatformAbstractConfigurationClosure.java | 61 - .../platform/PlatformAbstractPredicate.java | 67 -- .../platform/PlatformAbstractTarget.java | 320 ----- .../processors/platform/PlatformBootstrap.java | 35 - .../platform/PlatformBootstrapFactory.java | 37 - .../platform/PlatformConfigurationEx.java | 48 - .../platform/PlatformContextImpl.java | 621 ---------- .../processors/platform/PlatformIgnition.java | 189 --- .../platform/PlatformProcessorImpl.java | 360 ------ .../platform/cache/PlatformCache.java | 1090 ------------------ .../cache/PlatformCacheEntryFilterImpl.java | 106 -- .../cache/PlatformCacheEntryProcessorImpl.java | 220 ---- .../platform/cache/PlatformCacheIterator.java | 72 -- .../PlatformCachePartialUpdateException.java | 59 - .../cache/affinity/PlatformAffinity.java | 296 ----- .../query/PlatformAbstractQueryCursor.java | 192 --- .../query/PlatformContinuousQueryImpl.java | 235 ---- .../PlatformContinuousQueryRemoteFilter.java | 188 --- .../cache/query/PlatformFieldsQueryCursor.java | 49 - .../cache/query/PlatformQueryCursor.java | 45 - .../cache/store/PlatformCacheStoreCallback.java | 61 - .../platform/cluster/PlatformClusterGroup.java | 335 ------ .../cluster/PlatformClusterNodeFilterImpl.java | 78 -- .../platform/compute/PlatformAbstractJob.java | 156 --- .../platform/compute/PlatformAbstractTask.java | 206 ---- .../PlatformBalancingMultiClosureTask.java | 83 -- ...tformBalancingSingleClosureAffinityTask.java | 88 -- .../PlatformBalancingSingleClosureTask.java | 81 -- .../PlatformBroadcastingMultiClosureTask.java | 87 -- .../PlatformBroadcastingSingleClosureTask.java | 84 -- .../platform/compute/PlatformClosureJob.java | 104 -- .../platform/compute/PlatformCompute.java | 332 ------ .../platform/compute/PlatformFullJob.java | 220 ---- .../platform/compute/PlatformFullTask.java | 192 --- .../platform/cpp/PlatformCppBootstrap.java | 31 - .../cpp/PlatformCppBootstrapFactory.java | 39 - .../cpp/PlatformCppConfigurationClosure.java | 99 -- .../cpp/PlatformCppConfigurationEx.java | 82 -- .../datastreamer/PlatformDataStreamer.java | 226 ---- .../PlatformStreamReceiverImpl.java | 119 -- .../dotnet/PlatformDotNetBootstrap.java | 31 - .../dotnet/PlatformDotNetBootstrapFactory.java | 39 - .../dotnet/PlatformDotNetCacheStore.java | 497 -------- .../PlatformDotNetConfigurationClosure.java | 255 ---- .../dotnet/PlatformDotNetConfigurationEx.java | 91 -- .../platform/dotnet/PlatformDotNetService.java | 27 - .../dotnet/PlatformDotNetServiceImpl.java | 47 - .../events/PlatformEventFilterListenerImpl.java | 163 --- .../platform/events/PlatformEvents.java | 396 ------- .../lifecycle/PlatformLifecycleBean.java | 75 -- .../platform/memory/PlatformAbstractMemory.java | 121 -- .../PlatformBigEndianInputStreamImpl.java | 126 -- .../PlatformBigEndianOutputStreamImpl.java | 161 --- .../platform/memory/PlatformExternalMemory.java | 55 - .../memory/PlatformInputStreamImpl.java | 331 ------ .../memory/PlatformMemoryManagerImpl.java | 85 -- .../platform/memory/PlatformMemoryPool.java | 140 --- .../platform/memory/PlatformMemoryUtils.java | 467 -------- .../memory/PlatformOutputStreamImpl.java | 267 ----- .../platform/memory/PlatformPooledMemory.java | 64 - .../platform/memory/PlatformUnpooledMemory.java | 51 - .../messaging/PlatformMessageFilterImpl.java | 110 -- .../messaging/PlatformMessageLocalFilter.java | 102 -- .../platform/messaging/PlatformMessaging.java | 166 --- .../services/PlatformAbstractService.java | 230 ---- .../platform/services/PlatformServices.java | 275 ----- .../transactions/PlatformTransactions.java | 259 ----- .../platform/utils/PlatformFutureUtils.java | 397 ------- .../platform/utils/PlatformReaderBiClosure.java | 34 - .../platform/utils/PlatformReaderClosure.java | 34 - .../platform/utils/PlatformUtils.java | 812 ------------- .../platform/utils/PlatformWriterBiClosure.java | 34 - .../platform/utils/PlatformWriterClosure.java | 33 - .../platform/cpp/PlatformCppConfiguration.java | 47 - .../ignite/platform/cpp/package-info.java | 22 - .../dotnet/PlatformDotNetCacheStoreFactory.java | 139 --- .../dotnet/PlatformDotNetConfiguration.java | 97 -- .../dotnet/PlatformDotNetLifecycleBean.java | 109 -- .../PlatformDotNetPortableConfiguration.java | 196 ---- ...PlatformDotNetPortableTypeConfiguration.java | 214 ---- .../ignite/platform/dotnet/package-info.java | 22 - .../apache/ignite/platform/package-info.java | 22 - .../platform/PlatformComputeBroadcastTask.java | 73 -- .../platform/PlatformComputeDecimalTask.java | 106 -- .../platform/PlatformComputeEchoTask.java | 188 --- .../ignite/platform/PlatformComputeEnum.java | 28 - .../platform/PlatformComputeJavaPortable.java | 39 - .../platform/PlatformComputePortable.java | 42 - .../PlatformComputePortableArgTask.java | 119 -- .../platform/PlatformEventsWriteEventTask.java | 146 --- .../ignite/platform/PlatformMaxMemoryTask.java | 57 - .../ignite/platform/PlatformMinMemoryTask.java | 57 - .../lifecycle/PlatformJavaLifecycleBean.java | 47 - .../lifecycle/PlatformJavaLifecycleTask.java | 65 -- pom.xml | 11 - 194 files changed, 14853 insertions(+), 14944 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/META-INF/services/org.apache.ignite.internal.processors.platform.PlatformBootstrapFactory ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/META-INF/services/org.apache.ignite.internal.processors.platform.PlatformBootstrapFactory b/modules/core/src/main/java/META-INF/services/org.apache.ignite.internal.processors.platform.PlatformBootstrapFactory new file mode 100644 index 0000000..7f015e7 --- /dev/null +++ b/modules/core/src/main/java/META-INF/services/org.apache.ignite.internal.processors.platform.PlatformBootstrapFactory @@ -0,0 +1,2 @@ +org.apache.ignite.internal.processors.platform.cpp.PlatformCppBootstrapFactory +org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetBootstrapFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractBootstrap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractBootstrap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractBootstrap.java new file mode 100644 index 0000000..7e71e11 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractBootstrap.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.platform.memory.PlatformExternalMemory; +import org.apache.ignite.lang.IgniteClosure; + +/** + * Base interop bootstrap implementation. + */ +public abstract class PlatformAbstractBootstrap implements PlatformBootstrap { + /** {@inheritDoc} */ + @Override public PlatformProcessor start(IgniteConfiguration cfg, long envPtr, long dataPtr) { + Ignition.setClientMode(new PlatformExternalMemory(null, dataPtr).input().readBoolean()); + + IgniteConfiguration cfg0 = closure(envPtr).apply(cfg); + + IgniteEx node = (IgniteEx) Ignition.start(cfg0); + + return node.context().platform(); + } + + /** + * Get configuration transformer closure. + * + * @param envPtr Environment pointer. + * @return Closure. + */ + protected abstract IgniteClosure<IgniteConfiguration, IgniteConfiguration> closure(long envPtr); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractConfigurationClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractConfigurationClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractConfigurationClosure.java new file mode 100644 index 0000000..2f7af71 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractConfigurationClosure.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; +import org.apache.ignite.lang.IgniteClosure; + +/** + * Abstract interop configuration closure. + */ +public abstract class PlatformAbstractConfigurationClosure + implements IgniteClosure<IgniteConfiguration, IgniteConfiguration> { + /** */ + private static final long serialVersionUID = 0L; + + /** Native gateway. */ + protected final PlatformCallbackGateway gate; + + /** + * Constructor. + * + * @param envPtr Environment pointer. + */ + protected PlatformAbstractConfigurationClosure(long envPtr) { + this.gate = new PlatformCallbackGateway(envPtr); + } + + /** {@inheritDoc} */ + @Override public IgniteConfiguration apply(IgniteConfiguration igniteCfg) { + assert igniteCfg != null; + + IgniteConfiguration igniteCfg0 = new IgniteConfiguration(igniteCfg); + + apply0(igniteCfg0); + + return igniteCfg0; + } + + /** + * Internal apply routine. + * + * @param igniteCfg Ignite configuration. + */ + protected abstract void apply0(IgniteConfiguration igniteCfg); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractPredicate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractPredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractPredicate.java new file mode 100644 index 0000000..bcfe19e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractPredicate.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Base interop predicate. Delegates apply to native platform. + */ +public abstract class PlatformAbstractPredicate implements Externalizable { + /** .Net portable predicate */ + protected Object pred; + + /** Pointer to deployed predicate. */ + protected transient long ptr; + + /** Interop processor. */ + protected transient PlatformContext ctx; + + /** + * {@link java.io.Externalizable} support. + */ + public PlatformAbstractPredicate() { + // No-op. + } + + /** + * Constructor. + * + * @param pred .Net portable predicate. + * @param ptr Pointer to predicate in the native platform. + * @param ctx Kernal context. + */ + protected PlatformAbstractPredicate(Object pred, long ptr, PlatformContext ctx) { + this.pred = pred; + this.ptr = ptr; + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(pred); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + pred = in.readObject(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java new file mode 100644 index 0000000..0f46517 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.lang.IgniteFuture; +import org.jetbrains.annotations.Nullable; + +/** + * Abstract interop target. + */ +public abstract class PlatformAbstractTarget implements PlatformTarget { + /** Constant: TRUE.*/ + protected static final int TRUE = 1; + + /** Constant: FALSE. */ + protected static final int FALSE = 0; + + /** */ + private static final int OP_META = -1; + + /** Context. */ + protected final PlatformContext platformCtx; + + /** Logger. */ + protected final IgniteLogger log; + + /** + * Constructor. + * + * @param platformCtx Context. + */ + protected PlatformAbstractTarget(PlatformContext platformCtx) { + this.platformCtx = platformCtx; + + log = platformCtx.kernalContext().log(PlatformAbstractTarget.class); + } + + /** {@inheritDoc} */ + @Override public long inStreamOutLong(int type, long memPtr) throws Exception { + try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { + PortableRawReaderEx reader = platformCtx.reader(mem); + + if (type == OP_META) { + platformCtx.processMetadata(reader); + + return TRUE; + } + else + return processInStreamOutLong(type, reader); + } + catch (Exception e) { + throw convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object inStreamOutObject(int type, long memPtr) throws Exception { + try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { + PortableRawReaderEx reader = platformCtx.reader(mem); + + return processInStreamOutObject(type, reader); + } + catch (Exception e) { + throw convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public long outLong(int type) throws Exception { + try { + return processOutLong(type); + } + catch (Exception e) { + throw convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void outStream(int type, long memPtr) throws Exception { + try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = platformCtx.writer(out); + + processOutStream(type, writer); + + out.synchronize(); + } + catch (Exception e) { + throw convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object outObject(int type) throws Exception { + try { + return processOutObject(type); + } + catch (Exception e) { + throw convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception { + try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) { + PortableRawReaderEx reader = platformCtx.reader(inMem); + + try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) { + PlatformOutputStream out = outMem.output(); + + PortableRawWriterEx writer = platformCtx.writer(out); + + processInStreamOutStream(type, reader, writer); + + out.synchronize(); + } + } + catch (Exception e) { + throw convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void inObjectStreamOutStream(int type, Object arg, long inMemPtr, long outMemPtr) throws Exception { + try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) { + PortableRawReaderEx reader = platformCtx.reader(inMem); + + try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) { + PlatformOutputStream out = outMem.output(); + + PortableRawWriterEx writer = platformCtx.writer(out); + + processInObjectStreamOutStream(type, arg, reader, writer); + + out.synchronize(); + } + } + catch (Exception e) { + throw convertException(e); + } + } + + /** + * Convert caught exception. + * + * @param e Exception to convert. + * @return Converted exception. + */ + public Exception convertException(Exception e) { + return e; + } + + /** + * @return Context. + */ + public PlatformContext platformContext() { + return platformCtx; + } + + /** {@inheritDoc} */ + @Override public void listenFuture(final long futId, int typ) throws Exception { + PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null, this); + } + + /** {@inheritDoc} */ + @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception { + PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId), this); + } + + /** + * Get current future with proper exception conversions. + * + * @return Future. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"}) + protected IgniteInternalFuture currentFutureWrapped() throws IgniteCheckedException { + IgniteFutureImpl fut = (IgniteFutureImpl)currentFuture(); + + return fut.internalFuture(); + } + + /** + * When overridden in a derived class, gets future for the current operation. + * + * @return current future. + * @throws IgniteCheckedException + */ + protected IgniteFuture currentFuture() throws IgniteCheckedException { + throw new IgniteCheckedException("Future listening is not supported in " + this.getClass()); + } + + /** + * When overridden in a derived class, gets a custom future writer. + * + * @param opId Operation id. + * @return A custom writer for given op id. + */ + protected @Nullable PlatformFutureUtils.Writer futureWriter(int opId){ + return null; + } + + /** + * Process IN operation. + * + * @param type Type. + * @param reader Portable reader. + * @return Result. + * @throws IgniteCheckedException In case of exception. + */ + protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + return throwUnsupported(type); + } + + /** + * Process IN-OUT operation. + * + * @param type Type. + * @param reader Portable reader. + * @param writer Portable writer. + * @throws IgniteCheckedException In case of exception. + */ + protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer) + throws IgniteCheckedException { + throwUnsupported(type); + } + + /** + * Process IN operation with managed object as result. + * + * @param type Type. + * @param reader Portable reader. + * @return Result. + * @throws IgniteCheckedException In case of exception. + */ + protected Object processInStreamOutObject(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + return throwUnsupported(type); + } + + /** + * Process IN-OUT operation. + * + * @param type Type. + * @param arg Argument. + * @param reader Portable reader. + * @param writer Portable writer. + * @throws IgniteCheckedException In case of exception. + */ + protected void processInObjectStreamOutStream(int type, @Nullable Object arg, PortableRawReaderEx reader, + PortableRawWriterEx writer) throws IgniteCheckedException { + throwUnsupported(type); + } + + /** + * Process OUT operation. + * + * @param type Type. + * @throws IgniteCheckedException In case of exception. + */ + protected long processOutLong(int type) throws IgniteCheckedException { + return throwUnsupported(type); + } + + /** + * Process OUT operation. + * + * @param type Type. + * @param writer Portable writer. + * @throws IgniteCheckedException In case of exception. + */ + protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException { + throwUnsupported(type); + } + + /** + * Process OUT operation. + * + * @param type Type. + * @throws IgniteCheckedException In case of exception. + */ + protected Object processOutObject(int type) throws IgniteCheckedException { + return throwUnsupported(type); + } + + /** + * Throw an exception rendering unsupported operation type. + * + * @param type Operation type. + * @return Dummy value which is never returned. + * @throws IgniteCheckedException Exception to be thrown. + */ + protected <T> T throwUnsupported(int type) throws IgniteCheckedException { + throw new IgniteCheckedException("Unsupported operation type: " + type); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java new file mode 100644 index 0000000..9d64649 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * Platform bootstrap. Responsible for starting Ignite node with non-Java platform. + */ +public interface PlatformBootstrap { + /** + * Start Ignite node. + * + * @param cfg Configuration. + * @param envPtr Environment pointer. + * @param dataPtr Optional pointer to additional data required for startup. + * @return Platform processor. + */ + public PlatformProcessor start(IgniteConfiguration cfg, long envPtr, long dataPtr); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrapFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrapFactory.java new file mode 100644 index 0000000..3a732b2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrapFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +/** + * Platform bootstrap factory. + */ +public interface PlatformBootstrapFactory { + /** + * Get bootstrap factory ID. + * + * @return ID. + */ + public int id(); + + /** + * Create bootstrap instance. + * + * @return Bootstrap instance. + */ + public PlatformBootstrap create(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformConfigurationEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformConfigurationEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformConfigurationEx.java new file mode 100644 index 0000000..66eff8b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformConfigurationEx.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl; + +import java.util.Collection; + +/** + * Extended platform configuration. + */ +public interface PlatformConfigurationEx { + /* + * @return Native gateway. + */ + public PlatformCallbackGateway gate(); + + /** + * @return Memory manager. + */ + public PlatformMemoryManagerImpl memory(); + + /** + * @return Platform name. + */ + public String platform(); + + /** + * @return Warnings to be displayed on grid start. + */ + public Collection<String> warnings(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java new file mode 100644 index 0000000..3895506 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java @@ -0,0 +1,621 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.events.CacheQueryExecutedEvent; +import org.apache.ignite.events.CacheQueryReadEvent; +import org.apache.ignite.events.CacheRebalancingEvent; +import org.apache.ignite.events.CheckpointEvent; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventAdapter; +import org.apache.ignite.events.EventType; +import org.apache.ignite.events.JobEvent; +import org.apache.ignite.events.SwapSpaceEvent; +import org.apache.ignite.events.TaskEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.portable.GridPortableMarshaller; +import org.apache.ignite.internal.portable.PortableMetaDataImpl; +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.cache.portable.CacheObjectPortableProcessorImpl; +import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; +import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilterImpl; +import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryProcessor; +import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryProcessorImpl; +import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; +import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; +import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryImpl; +import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryRemoteFilter; +import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; +import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilter; +import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilterImpl; +import org.apache.ignite.internal.processors.platform.compute.PlatformAbstractTask; +import org.apache.ignite.internal.processors.platform.compute.PlatformClosureJob; +import org.apache.ignite.internal.processors.platform.compute.PlatformFullJob; +import org.apache.ignite.internal.processors.platform.compute.PlatformJob; +import org.apache.ignite.internal.processors.platform.datastreamer.PlatformStreamReceiver; +import org.apache.ignite.internal.processors.platform.datastreamer.PlatformStreamReceiverImpl; +import org.apache.ignite.internal.processors.platform.events.PlatformEventFilterListenerImpl; +import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; +import org.apache.ignite.internal.processors.platform.messaging.PlatformMessageFilterImpl; +import org.apache.ignite.internal.processors.platform.utils.PlatformReaderBiClosure; +import org.apache.ignite.internal.processors.platform.utils.PlatformReaderClosure; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T4; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.portable.PortableMetadata; +import org.jetbrains.annotations.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Implementation of platform context. + */ +public class PlatformContextImpl implements PlatformContext { + /** Supported event types. */ + private static final Set<Integer> evtTyps; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Marshaller. */ + private final GridPortableMarshaller marsh; + + /** Memory manager. */ + private final PlatformMemoryManagerImpl mem; + + /** Callback gateway. */ + private final PlatformCallbackGateway gate; + + /** Cache object processor. */ + private final CacheObjectPortableProcessorImpl cacheObjProc; + + /** Node ids that has been sent to native platform. */ + private final Set<UUID> sentNodes = Collections.newSetFromMap(new ConcurrentHashMap<UUID, Boolean>()); + + /** + * Static initializer. + */ + static { + Set<Integer> evtTyps0 = new HashSet<>(); + + addEventTypes(evtTyps0, EventType.EVTS_CACHE); + addEventTypes(evtTyps0, EventType.EVTS_CACHE_QUERY); + addEventTypes(evtTyps0, EventType.EVTS_CACHE_REBALANCE); + addEventTypes(evtTyps0, EventType.EVTS_CHECKPOINT); + addEventTypes(evtTyps0, EventType.EVTS_DISCOVERY_ALL); + addEventTypes(evtTyps0, EventType.EVTS_JOB_EXECUTION); + addEventTypes(evtTyps0, EventType.EVTS_SWAPSPACE); + addEventTypes(evtTyps0, EventType.EVTS_TASK_EXECUTION); + + evtTyps = Collections.unmodifiableSet(evtTyps0); + } + + /** + * Adds all elements to a set. + * @param set Set. + * @param items Items. + */ + private static void addEventTypes(Set<Integer> set, int[] items) { + for (int i : items) + set.add(i); + } + + /** + * Constructor. + * + * @param ctx Kernal context. + * @param gate Callback gateway. + * @param mem Memory manager. + */ + public PlatformContextImpl(GridKernalContext ctx, PlatformCallbackGateway gate, PlatformMemoryManagerImpl mem) { + this.ctx = ctx; + this.gate = gate; + this.mem = mem; + + cacheObjProc = (CacheObjectPortableProcessorImpl)ctx.cacheObjects(); + + marsh = cacheObjProc.marshaller(); + } + + /** {@inheritDoc} */ + @Override public GridKernalContext kernalContext() { + return ctx; + } + + /** {@inheritDoc} */ + @Override public PlatformMemoryManager memory() { + return mem; + } + + /** {@inheritDoc} */ + @Override public PlatformCallbackGateway gateway() { + return gate; + } + + /** {@inheritDoc} */ + @Override public PortableRawReaderEx reader(PlatformMemory mem) { + return reader(mem.input()); + } + + /** {@inheritDoc} */ + @Override public PortableRawReaderEx reader(PlatformInputStream in) { + return marsh.reader(in); + } + + /** {@inheritDoc} */ + @Override public PortableRawWriterEx writer(PlatformMemory mem) { + return writer(mem.output()); + } + + /** {@inheritDoc} */ + @Override public PortableRawWriterEx writer(PlatformOutputStream out) { + return marsh.writer(out); + } + + /** {@inheritDoc} */ + @Override public void addNode(ClusterNode node) { + if (node == null || sentNodes.contains(node.id())) + return; + + // Send node info to the native platform + try (PlatformMemory mem0 = mem.allocate()) { + PlatformOutputStream out = mem0.output(); + + PortableRawWriterEx w = writer(out); + + w.writeUuid(node.id()); + + Map<String, Object> attrs = new HashMap<>(node.attributes()); + + Iterator<Map.Entry<String, Object>> attrIter = attrs.entrySet().iterator(); + + while (attrIter.hasNext()) { + Map.Entry<String, Object> entry = attrIter.next(); + + Object val = entry.getValue(); + + if (val != null && !val.getClass().getName().startsWith("java.lang")) + attrIter.remove(); + } + + w.writeMap(attrs); + w.writeCollection(node.addresses()); + w.writeCollection(node.hostNames()); + w.writeLong(node.order()); + w.writeBoolean(node.isLocal()); + w.writeBoolean(node.isDaemon()); + writeClusterMetrics(w, node.metrics()); + + out.synchronize(); + + gateway().nodeInfo(mem0.pointer()); + } + + sentNodes.add(node.id()); + } + + /** {@inheritDoc} */ + @Override public void writeNode(PortableRawWriterEx writer, ClusterNode node) { + if (node == null) { + writer.writeUuid(null); + + return; + } + + addNode(node); + + writer.writeUuid(node.id()); + } + + /** {@inheritDoc} */ + @Override public void writeNodes(PortableRawWriterEx writer, Collection<ClusterNode> nodes) { + if (nodes == null) { + writer.writeInt(-1); + + return; + } + + writer.writeInt(nodes.size()); + + for (ClusterNode n : nodes) { + addNode(n); + + writer.writeUuid(n.id()); + } + } + + /** {@inheritDoc} */ + @Override public void writeClusterMetrics(PortableRawWriterEx writer, @Nullable ClusterMetrics metrics) { + if (metrics == null) + writer.writeBoolean(false); + else { + writer.writeBoolean(true); + + writer.writeLong(metrics.getLastUpdateTime()); + writer.writeDate(new Date(metrics.getLastUpdateTime())); + writer.writeInt(metrics.getMaximumActiveJobs()); + writer.writeInt(metrics.getCurrentActiveJobs()); + writer.writeFloat(metrics.getAverageActiveJobs()); + writer.writeInt(metrics.getMaximumWaitingJobs()); + + writer.writeInt(metrics.getCurrentWaitingJobs()); + writer.writeFloat(metrics.getAverageWaitingJobs()); + writer.writeInt(metrics.getMaximumRejectedJobs()); + writer.writeInt(metrics.getCurrentRejectedJobs()); + writer.writeFloat(metrics.getAverageRejectedJobs()); + + writer.writeInt(metrics.getTotalRejectedJobs()); + writer.writeInt(metrics.getMaximumCancelledJobs()); + writer.writeInt(metrics.getCurrentCancelledJobs()); + writer.writeFloat(metrics.getAverageCancelledJobs()); + writer.writeInt(metrics.getTotalCancelledJobs()); + + writer.writeInt(metrics.getTotalExecutedJobs()); + writer.writeLong(metrics.getMaximumJobWaitTime()); + writer.writeLong(metrics.getCurrentJobWaitTime()); + writer.writeDouble(metrics.getAverageJobWaitTime()); + writer.writeLong(metrics.getMaximumJobExecuteTime()); + + writer.writeLong(metrics.getCurrentJobExecuteTime()); + writer.writeDouble(metrics.getAverageJobExecuteTime()); + writer.writeInt(metrics.getTotalExecutedTasks()); + writer.writeLong(metrics.getTotalIdleTime()); + writer.writeLong(metrics.getCurrentIdleTime()); + + writer.writeInt(metrics.getTotalCpus()); + writer.writeDouble(metrics.getCurrentCpuLoad()); + writer.writeDouble(metrics.getAverageCpuLoad()); + writer.writeDouble(metrics.getCurrentGcCpuLoad()); + writer.writeLong(metrics.getHeapMemoryInitialized()); + + writer.writeLong(metrics.getHeapMemoryUsed()); + writer.writeLong(metrics.getHeapMemoryCommitted()); + writer.writeLong(metrics.getHeapMemoryMaximum()); + writer.writeLong(metrics.getHeapMemoryTotal()); + writer.writeLong(metrics.getNonHeapMemoryInitialized()); + + writer.writeLong(metrics.getNonHeapMemoryUsed()); + writer.writeLong(metrics.getNonHeapMemoryCommitted()); + writer.writeLong(metrics.getNonHeapMemoryMaximum()); + writer.writeLong(metrics.getNonHeapMemoryTotal()); + writer.writeLong(metrics.getUpTime()); + + writer.writeDate(new Date(metrics.getStartTime())); + writer.writeDate(new Date(metrics.getNodeStartTime())); + writer.writeInt(metrics.getCurrentThreadCount()); + writer.writeInt(metrics.getMaximumThreadCount()); + writer.writeLong(metrics.getTotalStartedThreadCount()); + + writer.writeInt(metrics.getCurrentDaemonThreadCount()); + writer.writeLong(metrics.getLastDataVersion()); + writer.writeInt(metrics.getSentMessagesCount()); + writer.writeLong(metrics.getSentBytesCount()); + writer.writeInt(metrics.getReceivedMessagesCount()); + + writer.writeLong(metrics.getReceivedBytesCount()); + writer.writeInt(metrics.getOutboundMessagesQueueSize()); + + writer.writeInt(metrics.getTotalNodes()); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public void processMetadata(PortableRawReaderEx reader) { + Collection<T4<Integer, String, String, Map<String, Integer>>> metas = PlatformUtils.readCollection(reader, + new PlatformReaderClosure<T4<Integer, String, String, Map<String, Integer>>>() { + @Override public T4<Integer, String, String, Map<String, Integer>> read(PortableRawReaderEx reader) { + int typeId = reader.readInt(); + String typeName = reader.readString(); + String affKey = reader.readString(); + + Map<String, Integer> fields = PlatformUtils.readMap(reader, + new PlatformReaderBiClosure<String, Integer>() { + @Override public IgniteBiTuple<String, Integer> read(PortableRawReaderEx reader) { + return F.t(reader.readString(), reader.readInt()); + } + }); + + return new T4<>(typeId, typeName, affKey, fields); + } + } + ); + + for (T4<Integer, String, String, Map<String, Integer>> meta : metas) + cacheObjProc.updateMetaData(meta.get1(), meta.get2(), meta.get3(), meta.get4()); + } + + /** {@inheritDoc} */ + @Override public void writeMetadata(PortableRawWriterEx writer, int typeId) { + writeMetadata0(writer, typeId, cacheObjProc.metadata(typeId)); + } + + /** {@inheritDoc} */ + @Override public void writeAllMetadata(PortableRawWriterEx writer) { + Collection<PortableMetadata> metas = cacheObjProc.metadata(); + + writer.writeInt(metas.size()); + + for (org.apache.ignite.portable.PortableMetadata m : metas) + writeMetadata0(writer, cacheObjProc.typeId(m.typeName()), m); + } + + /** + * Write portable metadata. + * + * @param writer Writer. + * @param typeId Type id. + * @param meta Metadata. + */ + private void writeMetadata0(PortableRawWriterEx writer, int typeId, PortableMetadata meta) { + if (meta == null) + writer.writeBoolean(false); + else { + writer.writeBoolean(true); + + Map<String, String> metaFields = ((PortableMetaDataImpl)meta).fields0(); + + Map<String, Integer> fields = U.newHashMap(metaFields.size()); + + for (Map.Entry<String, String> metaField : metaFields.entrySet()) + fields.put(metaField.getKey(), CacheObjectPortableProcessorImpl.fieldTypeId(metaField.getValue())); + + writer.writeInt(typeId); + writer.writeString(meta.typeName()); + writer.writeString(meta.affinityKeyFieldName()); + writer.writeMap(fields); + } + } + + /** {@inheritDoc} */ + @Override public PlatformContinuousQuery createContinuousQuery(long ptr, boolean hasFilter, + @Nullable Object filter) { + return new PlatformContinuousQueryImpl(this, ptr, hasFilter, filter); + } + + /** {@inheritDoc} */ + @Override public PlatformContinuousQueryFilter createContinuousQueryFilter(Object filter) { + return new PlatformContinuousQueryRemoteFilter(filter); + } + + /** {@inheritDoc} */ + @Override public PlatformMessageFilter createRemoteMessageFilter(Object filter, long ptr) { + return new PlatformMessageFilterImpl(filter, ptr, this); + } + + /** {@inheritDoc} */ + @Override public boolean isEventTypeSupported(int evtTyp) { + return evtTyps.contains(evtTyp); + } + + /** {@inheritDoc} */ + @Override public void writeEvent(PortableRawWriterEx writer, Event evt) { + assert writer != null; + + if (evt == null) + { + writer.writeInt(-1); + + return; + } + + EventAdapter evt0 = (EventAdapter)evt; + + if (evt0 instanceof CacheEvent) { + writer.writeInt(2); + writeCommonEventData(writer, evt0); + + CacheEvent event0 = (CacheEvent)evt0; + + writer.writeString(event0.cacheName()); + writer.writeInt(event0.partition()); + writer.writeBoolean(event0.isNear()); + writeNode(writer, event0.eventNode()); + writer.writeObject(event0.key()); + PlatformUtils.writeIgniteUuid(writer, event0.xid()); + writer.writeObject(event0.lockId()); + writer.writeObject(event0.newValue()); + writer.writeObject(event0.oldValue()); + writer.writeBoolean(event0.hasOldValue()); + writer.writeBoolean(event0.hasNewValue()); + writer.writeUuid(event0.subjectId()); + writer.writeString(event0.closureClassName()); + writer.writeString(event0.taskName()); + } + else if (evt0 instanceof CacheQueryExecutedEvent) { + writer.writeInt(3); + writeCommonEventData(writer, evt0); + + CacheQueryExecutedEvent event0 = (CacheQueryExecutedEvent)evt0; + + writer.writeString(event0.queryType()); + writer.writeString(event0.cacheName()); + writer.writeString(event0.className()); + writer.writeString(event0.clause()); + writer.writeUuid(event0.subjectId()); + writer.writeString(event0.taskName()); + } + else if (evt0 instanceof CacheQueryReadEvent) { + writer.writeInt(4); + writeCommonEventData(writer, evt0); + + CacheQueryReadEvent event0 = (CacheQueryReadEvent)evt0; + + writer.writeString(event0.queryType()); + writer.writeString(event0.cacheName()); + writer.writeString(event0.className()); + writer.writeString(event0.clause()); + writer.writeUuid(event0.subjectId()); + writer.writeString(event0.taskName()); + writer.writeObject(event0.key()); + writer.writeObject(event0.value()); + writer.writeObject(event0.oldValue()); + writer.writeObject(event0.row()); + } + else if (evt0 instanceof CacheRebalancingEvent) { + writer.writeInt(5); + writeCommonEventData(writer, evt0); + + CacheRebalancingEvent event0 = (CacheRebalancingEvent)evt0; + + writer.writeString(event0.cacheName()); + writer.writeInt(event0.partition()); + writeNode(writer, event0.discoveryNode()); + writer.writeInt(event0.discoveryEventType()); + writer.writeString(event0.discoveryEventName()); + writer.writeLong(event0.discoveryTimestamp()); + } + else if (evt0 instanceof CheckpointEvent) { + writer.writeInt(6); + writeCommonEventData(writer, evt0); + + CheckpointEvent event0 = (CheckpointEvent)evt0; + + writer.writeString(event0.key()); + } + else if (evt0 instanceof DiscoveryEvent) { + writer.writeInt(7); + writeCommonEventData(writer, evt0); + + DiscoveryEvent event0 = (DiscoveryEvent)evt0; + + writeNode(writer, event0.eventNode()); + writer.writeLong(event0.topologyVersion()); + + writeNodes(writer, event0.topologyNodes()); + } + else if (evt0 instanceof JobEvent) { + writer.writeInt(8); + writeCommonEventData(writer, evt0); + + JobEvent event0 = (JobEvent)evt0; + + writer.writeString(event0.taskName()); + writer.writeString(event0.taskClassName()); + PlatformUtils.writeIgniteUuid(writer, event0.taskSessionId()); + PlatformUtils.writeIgniteUuid(writer, event0.jobId()); + writeNode(writer, event0.taskNode()); + writer.writeUuid(event0.taskSubjectId()); + } + else if (evt0 instanceof SwapSpaceEvent) { + writer.writeInt(9); + writeCommonEventData(writer, evt0); + + SwapSpaceEvent event0 = (SwapSpaceEvent)evt0; + + writer.writeString(event0.space()); + } + else if (evt0 instanceof TaskEvent) { + writer.writeInt(10); + writeCommonEventData(writer, evt0); + + TaskEvent event0 = (TaskEvent)evt0; + + writer.writeString(event0.taskName()); + writer.writeString(event0.taskClassName()); + PlatformUtils.writeIgniteUuid(writer, event0.taskSessionId()); + writer.writeBoolean(event0.internal()); + writer.writeUuid(event0.subjectId()); + } + else + throw new IgniteException("Unsupported event: " + evt); + } + + /** + * Write common event data. + * + * @param writer Writer. + * @param evt Event. + */ + private void writeCommonEventData(PortableRawWriterEx writer, EventAdapter evt) { + PlatformUtils.writeIgniteUuid(writer, evt.id()); + writer.writeLong(evt.localOrder()); + writeNode(writer, evt.node()); + writer.writeString(evt.message()); + writer.writeInt(evt.type()); + writer.writeString(evt.name()); + writer.writeDate(new Date(evt.timestamp())); + } + + /** {@inheritDoc} */ + @Override public PlatformEventFilterListener createLocalEventFilter(long hnd) { + return new PlatformEventFilterListenerImpl(hnd, this); + } + + /** {@inheritDoc} */ + @Override public PlatformEventFilterListener createRemoteEventFilter(Object pred, int... types) { + return new PlatformEventFilterListenerImpl(pred, types); + } + + /** {@inheritDoc} */ + @Override public PlatformNativeException createNativeException(Object cause) { + return new PlatformNativeException(cause); + } + + /** {@inheritDoc} */ + @Override public PlatformJob createJob(Object task, long ptr, @Nullable Object job) { + return new PlatformFullJob(this, (PlatformAbstractTask)task, ptr, job); + } + + /** {@inheritDoc} */ + @Override public PlatformJob createClosureJob(Object task, long ptr, Object job) { + return new PlatformClosureJob((PlatformAbstractTask)task, ptr, job); + } + + /** {@inheritDoc} */ + @Override public PlatformCacheEntryProcessor createCacheEntryProcessor(Object proc, long ptr) { + return new PlatformCacheEntryProcessorImpl(proc, ptr); + } + + /** {@inheritDoc} */ + @Override public PlatformCacheEntryFilter createCacheEntryFilter(Object filter, long ptr) { + return new PlatformCacheEntryFilterImpl(filter, ptr, this); + } + + /** {@inheritDoc} */ + @Override public PlatformStreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable) { + return new PlatformStreamReceiverImpl(rcv, ptr, keepPortable, this); + } + + /** {@inheritDoc} */ + @Override public PlatformClusterNodeFilter createClusterNodeFilter(Object filter) { + return new PlatformClusterNodeFilterImpl(filter, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java new file mode 100644 index 0000000..e642b2d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import java.net.URL; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.ServiceLoader; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +/** + * Entry point for platform nodes. + */ +@SuppressWarnings("UnusedDeclaration") +public class PlatformIgnition { + /** Map with active instances. */ + private static final HashMap<String, PlatformProcessor> instances = new HashMap<>(); + + /** + * Start Ignite node in platform mode. + * + * @param springCfgPath Spring configuration path. + * @param gridName Grid name. + * @param factoryId Factory ID. + * @param envPtr Environment pointer. + * @param dataPtr Optional pointer to additional data required for startup. + * @return Ignite instance. + */ + public static synchronized PlatformProcessor start(@Nullable String springCfgPath, @Nullable String gridName, + int factoryId, long envPtr, long dataPtr) { + if (envPtr <= 0) + throw new IgniteException("Environment pointer must be positive."); + + ClassLoader oldClsLdr = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(PlatformProcessor.class.getClassLoader()); + + try { + IgniteConfiguration cfg = configuration(springCfgPath); + + if (gridName != null) + cfg.setGridName(gridName); + else + gridName = cfg.getGridName(); + + PlatformBootstrap bootstrap = bootstrap(factoryId); + + PlatformProcessor proc = bootstrap.start(cfg, envPtr, dataPtr); + + PlatformProcessor old = instances.put(gridName, proc); + + assert old == null; + + return proc; + } + finally { + Thread.currentThread().setContextClassLoader(oldClsLdr); + } + } + + /** + * Get instance by environment pointer. + * + * @param gridName Grid name. + * @return Instance or {@code null} if it doesn't exist (never started or stopped). + */ + @Nullable public static synchronized PlatformProcessor instance(@Nullable String gridName) { + return instances.get(gridName); + } + + /** + * Get environment pointer of the given instance. + * + * @param gridName Grid name. + * @return Environment pointer or {@code 0} in case grid with such name doesn't exist. + */ + public static synchronized long environmentPointer(@Nullable String gridName) { + PlatformProcessor proc = instance(gridName); + + return proc != null ? proc.environmentPointer() : 0; + } + + /** + * Stop single instance. + * + * @param gridName Grid name, + * @param cancel Cancel flag. + * @return {@code True} if instance was found and stopped. + */ + public static synchronized boolean stop(@Nullable String gridName, boolean cancel) { + if (Ignition.stop(gridName, cancel)) { + PlatformProcessor old = instances.remove(gridName); + + assert old != null; + + return true; + } + else + return false; + } + + /** + * Stop all instances. + * + * @param cancel Cancel flag. + */ + public static synchronized void stopAll(boolean cancel) { + for (PlatformProcessor proc : instances.values()) + Ignition.stop(proc.ignite().name(), cancel); + + instances.clear(); + } + + /** + * Create configuration. + * + * @param springCfgPath Path to Spring XML. + * @return Configuration. + */ + private static IgniteConfiguration configuration(@Nullable String springCfgPath) { + try { + URL url = springCfgPath == null ? U.resolveIgniteUrl(IgnitionEx.DFLT_CFG) : + U.resolveSpringUrl(springCfgPath); + + IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> t = IgnitionEx.loadConfiguration(url); + + return t.get1(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to instantiate configuration from Spring XML: " + springCfgPath, e); + } + } + + /** + * Create bootstrap for the given factory ID. + * + * @param factoryId Factory ID. + * @return Bootstrap. + */ + private static PlatformBootstrap bootstrap(final int factoryId) { + PlatformBootstrapFactory factory = AccessController.doPrivileged( + new PrivilegedAction<PlatformBootstrapFactory>() { + @Override public PlatformBootstrapFactory run() { + for (PlatformBootstrapFactory factory : ServiceLoader.load(PlatformBootstrapFactory.class)) { + if (factory.id() == factoryId) + return factory; + } + + return null; + } + }); + + if (factory == null) + throw new IgniteException("Interop factory is not found (did you put into the classpath?): " + factoryId); + + return factory.create(); + } + + /** + * Private constructor. + */ + private PlatformIgnition() { + // No-op. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java new file mode 100644 index 0000000..40b1334 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.PlatformConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteComputeImpl; +import org.apache.ignite.internal.cluster.ClusterGroupAdapter; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl; +import org.apache.ignite.internal.processors.platform.cache.PlatformCache; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinity; +import org.apache.ignite.internal.processors.platform.cache.store.PlatformCacheStore; +import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterGroup; +import org.apache.ignite.internal.processors.platform.compute.PlatformCompute; +import org.apache.ignite.internal.processors.platform.datastreamer.PlatformDataStreamer; +import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore; +import org.apache.ignite.internal.processors.platform.events.PlatformEvents; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.messaging.PlatformMessaging; +import org.apache.ignite.internal.processors.platform.services.PlatformServices; +import org.apache.ignite.internal.processors.platform.transactions.PlatformTransactions; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * GridGain platform processor. + */ +public class PlatformProcessorImpl extends GridProcessorAdapter implements PlatformProcessor { + /** Start latch. */ + private final CountDownLatch startLatch = new CountDownLatch(1); + + /** Stores pending initialization. */ + private final Collection<StoreInfo> pendingStores = + Collections.newSetFromMap(new ConcurrentHashMap<StoreInfo, Boolean>()); + + /** Started stores. */ + private final Collection<PlatformCacheStore> stores = + Collections.newSetFromMap(new ConcurrentHashMap<PlatformCacheStore, Boolean>()); + + /** Lock for store lifecycle operations. */ + private final ReadWriteLock storeLock = new ReentrantReadWriteLock(); + + /** Logger. */ + private final IgniteLogger log; + + /** Context. */ + private final PlatformContext platformCtx; + + /** Interop configuration. */ + private final PlatformConfigurationEx interopCfg; + + /** Whether processor is started. */ + private boolean started; + + /** Whether processor if stopped (or stopping). */ + private boolean stopped; + + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public PlatformProcessorImpl(GridKernalContext ctx) { + super(ctx); + + log = ctx.log(PlatformProcessorImpl.class); + + PlatformConfiguration interopCfg0 = ctx.config().getPlatformConfiguration(); + + assert interopCfg0 != null : "Must be checked earlier during component creation."; + + if (!(interopCfg0 instanceof PlatformConfigurationEx)) + throw new IgniteException("Unsupported platform configuration: " + interopCfg0.getClass().getName()); + + interopCfg = (PlatformConfigurationEx)interopCfg0; + + if (!F.isEmpty(interopCfg.warnings())) { + for (String w : interopCfg.warnings()) + U.warn(log, w); + } + + platformCtx = new PlatformContextImpl(ctx, interopCfg.gate(), interopCfg.memory()); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + try (PlatformMemory mem = platformCtx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = platformCtx.writer(out); + + writer.writeString(ctx.gridName()); + + out.synchronize(); + + platformCtx.gateway().onStart(this, mem.pointer()); + } + + // At this moment all necessary native libraries must be loaded, so we can process with store creation. + storeLock.writeLock().lock(); + + try { + for (StoreInfo store : pendingStores) + registerStore0(store.store, store.convertPortable); + + pendingStores.clear(); + + started = true; + } + finally { + storeLock.writeLock().unlock(); + } + + // Add Interop node attributes. + ctx.addNodeAttribute(PlatformUtils.ATTR_PLATFORM, interopCfg.platform()); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + startLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + if (platformCtx != null) { + // Destroy cache stores. + storeLock.writeLock().lock(); + + try { + for (PlatformCacheStore store : stores) { + if (store != null) { + if (store instanceof PlatformDotNetCacheStore) { + PlatformDotNetCacheStore store0 = (PlatformDotNetCacheStore)store; + + try { + store0.destroy(platformCtx.kernalContext()); + } + catch (Exception e) { + U.error(log, "Failed to destroy .Net cache store [store=" + store0 + + ", err=" + e.getMessage() + ']'); + } + } + else + assert false : "Invalid interop cache store type: " + store; + } + } + } + finally { + stopped = true; + + storeLock.writeLock().unlock(); + } + + platformCtx.gateway().onStop(); + } + } + + /** {@inheritDoc} */ + @Override public Ignite ignite() { + return ctx.grid(); + } + + /** {@inheritDoc} */ + @Override public long environmentPointer() { + return platformCtx.gateway().environmentPointer(); + } + + /** {@inheritDoc} */ + public void releaseStart() { + startLatch.countDown(); + } + + /** {@inheritDoc} */ + public void awaitStart() throws IgniteCheckedException { + U.await(startLatch); + } + + /** {@inheritDoc} */ + @Override public PlatformContext context() { + return platformCtx; + } + + /** {@inheritDoc} */ + @Override public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException { + IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().cache(name); + + if (cache == null) + throw new IllegalArgumentException("Cache doesn't exist: " + name); + + return new PlatformCache(platformCtx, cache.keepPortable(), false); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException { + IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createCache(name); + + assert cache != null; + + return new PlatformCache(platformCtx, cache.keepPortable(), false); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException { + IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateCache(name); + + assert cache != null; + + return new PlatformCache(platformCtx, cache.keepPortable(), false); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException { + return new PlatformAffinity(platformCtx, ctx, name); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepPortable) + throws IgniteCheckedException { + IgniteDataStreamer ldr = ctx.dataStream().dataStreamer(cacheName); + + return new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepPortable); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget transactions() { + return new PlatformTransactions(platformCtx); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget projection() throws IgniteCheckedException { + return new PlatformClusterGroup(platformCtx, ctx.grid().cluster()); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget compute(PlatformTarget grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + + assert grp0.projection() instanceof ClusterGroupAdapter; // Safety for very complex ClusterGroup hierarchy. + + return new PlatformCompute(platformCtx, (IgniteComputeImpl)((ClusterGroupAdapter)grp0.projection()).compute()); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget message(PlatformTarget grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + + return new PlatformMessaging(platformCtx, grp0.projection().ignite().message(grp0.projection())); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget events(PlatformTarget grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + + return new PlatformEvents(platformCtx, grp0.projection().ignite().events(grp0.projection())); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget services(PlatformTarget grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + + return new PlatformServices(platformCtx, grp0.projection().ignite().services(grp0.projection()), false); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget extensions() { + return null; + } + + /** {@inheritDoc} */ + @Override public void registerStore(PlatformCacheStore store, boolean convertPortable) + throws IgniteCheckedException { + storeLock.readLock().lock(); + + try { + if (stopped) + throw new IgniteCheckedException("Failed to initialize interop store becuase node is stopping: " + + store); + + if (started) + registerStore0(store, convertPortable); + else + pendingStores.add(new StoreInfo(store, convertPortable)); + } + finally { + storeLock.readLock().unlock(); + } + } + + /** + * Internal store initialization routine. + * + * @param store Store. + * @param convertPortable Convert portable flag. + * @throws IgniteCheckedException If failed. + */ + private void registerStore0(PlatformCacheStore store, boolean convertPortable) throws IgniteCheckedException { + if (store instanceof PlatformDotNetCacheStore) { + PlatformDotNetCacheStore store0 = (PlatformDotNetCacheStore)store; + + store0.initialize(ctx, convertPortable); + } + else + throw new IgniteCheckedException("Unsupported interop store: " + store); + } + + /** + * Store and manager pair. + */ + private static class StoreInfo { + /** Store. */ + private final PlatformCacheStore store; + + /** Convert portable flag. */ + private final boolean convertPortable; + + /** + * Constructor. + * + * @param store Store. + * @param convertPortable Convert portable flag. + */ + private StoreInfo(PlatformCacheStore store, boolean convertPortable) { + this.store = store; + this.convertPortable = convertPortable; + } + } +} \ No newline at end of file