IGNITE-4027 Extract PlatformTargetProxy interface This closes #1188
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59e6fec0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59e6fec0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59e6fec0 Branch: refs/heads/ignite-2.0 Commit: 59e6fec0b92c353ee5e128b9343a59f4b99bd468 Parents: 597f3a5 Author: Pavel Tupitsyn <[email protected]> Authored: Thu Dec 8 14:53:16 2016 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Thu Dec 8 14:53:16 2016 +0300 ---------------------------------------------------------------------- .../platform/PlatformAbstractTarget.java | 268 +++---------------- .../platform/PlatformAsyncTarget.java | 44 +++ .../platform/PlatformNoopProcessor.java | 41 +-- .../processors/platform/PlatformProcessor.java | 42 +-- .../platform/PlatformProcessorImpl.java | 87 +++--- .../processors/platform/PlatformTarget.java | 103 ++++--- .../platform/PlatformTargetProxy.java | 126 +++++++++ .../platform/PlatformTargetProxyImpl.java | 222 +++++++++++++++ .../binary/PlatformBinaryProcessor.java | 6 +- .../platform/cache/PlatformCache.java | 15 +- .../platform/cache/PlatformCacheIterator.java | 2 +- .../cache/affinity/PlatformAffinity.java | 4 +- .../affinity/PlatformAffinityFunction.java | 7 +- .../PlatformAffinityFunctionTarget.java | 4 +- .../query/PlatformAbstractQueryCursor.java | 4 +- .../query/PlatformContinuousQueryProxy.java | 3 +- .../callback/PlatformCallbackGateway.java | 6 +- .../callback/PlatformCallbackUtils.java | 6 +- .../platform/cluster/PlatformClusterGroup.java | 18 +- .../platform/compute/PlatformCompute.java | 15 +- .../datastreamer/PlatformDataStreamer.java | 4 +- .../PlatformStreamReceiverImpl.java | 8 +- .../datastructures/PlatformAtomicLong.java | 4 +- .../datastructures/PlatformAtomicReference.java | 8 +- .../datastructures/PlatformAtomicSequence.java | 2 +- .../platform/events/PlatformEvents.java | 15 +- .../platform/messaging/PlatformMessaging.java | 9 +- .../platform/services/PlatformServices.java | 27 +- .../transactions/PlatformTransactions.java | 8 +- .../platform/utils/PlatformFutureUtils.java | 14 +- .../utils/PlatformListenableTarget.java | 62 +++++ .../cpp/jni/include/ignite/jni/exports.h | 3 - .../platforms/cpp/jni/include/ignite/jni/java.h | 7 - modules/platforms/cpp/jni/project/vs/module.def | 2 - modules/platforms/cpp/jni/src/exports.cpp | 8 - modules/platforms/cpp/jni/src/java.cpp | 76 ++---- .../Apache.Ignite.Core.csproj | 1 + .../Apache.Ignite.Core/Impl/Common/Future.cs | 13 +- .../Impl/Common/Listenable.cs | 49 ++++ .../Impl/Compute/ComputeImpl.cs | 4 +- .../Apache.Ignite.Core/Impl/PlatformTarget.cs | 2 +- .../Impl/Unmanaged/IgniteJniNativeMethods.cs | 8 - .../Impl/Unmanaged/UnmanagedUtils.cs | 5 - 43 files changed, 817 insertions(+), 545 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java index 2df86ac..506470b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -24,16 +24,16 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; -import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; +import org.apache.ignite.internal.processors.platform.utils.PlatformListenableTarget; import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; /** * Abstract interop target. */ -public abstract class PlatformAbstractTarget implements PlatformTarget { +public abstract class PlatformAbstractTarget implements PlatformTarget, PlatformAsyncTarget { /** Constant: TRUE.*/ protected static final int TRUE = 1; @@ -60,144 +60,6 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { log = platformCtx.kernalContext().log(PlatformAbstractTarget.class); } - /** {@inheritDoc} */ - @Override public long inLongOutLong(int type, long val) throws Exception { - try { - return processInLongOutLong(type, val); - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public long inStreamOutLong(int type, long memPtr) throws Exception { - try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { - BinaryRawReaderEx reader = platformCtx.reader(mem); - - return processInStreamOutLong(type, reader, mem); - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public Object inStreamOutObject(int type, long memPtr) throws Exception { - try (PlatformMemory mem = memPtr != 0 ? platformCtx.memory().get(memPtr) : null) { - BinaryRawReaderEx reader = mem != null ? platformCtx.reader(mem) : null; - - return processInStreamOutObject(type, reader); - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public void outStream(int type, long memPtr) throws Exception { - try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { - PlatformOutputStream out = mem.output(); - - BinaryRawWriterEx writer = platformCtx.writer(out); - - processOutStream(type, writer); - - out.synchronize(); - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public Object outObject(int type) throws Exception { - try { - return processOutObject(type); - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception { - try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) { - BinaryRawReaderEx reader = platformCtx.reader(inMem); - - try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) { - PlatformOutputStream out = outMem.output(); - - BinaryRawWriterEx writer = platformCtx.writer(out); - - processInStreamOutStream(type, reader, writer); - - out.synchronize(); - } - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public Object inObjectStreamOutObjectStream(int type, Object arg, long inMemPtr, long outMemPtr) - throws Exception { - PlatformMemory inMem = null; - PlatformMemory outMem = null; - - try { - BinaryRawReaderEx reader = null; - - if (inMemPtr != 0) { - inMem = platformCtx.memory().get(inMemPtr); - - reader = platformCtx.reader(inMem); - } - - PlatformOutputStream out = null; - BinaryRawWriterEx writer = null; - - if (outMemPtr != 0) { - outMem = platformCtx.memory().get(outMemPtr); - - out = outMem.output(); - - writer = platformCtx.writer(out); - } - - Object res = processInObjectStreamOutObjectStream(type, arg, reader, writer); - - if (out != null) - out.synchronize(); - - return res; - } - catch (Exception e) { - throw convertException(e); - } - finally { - try { - if (inMem != null) - inMem.close(); - } - finally { - if (outMem != null) - outMem.close(); - } - } - } - - /** - * Convert caught exception. - * - * @param e Exception to convert. - * @return Converted exception. - */ - public Exception convertException(Exception e) { - return e; - } - /** * @return Context. */ @@ -206,128 +68,60 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { } /** {@inheritDoc} */ - @Override public void listenFuture(final long futId, int typ) throws Exception { - PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, this); + @Override public Exception convertException(Exception e) { + return e; } /** {@inheritDoc} */ - @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception { - PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), this); - } - - /** - * When overridden in a derived class, gets future for the current operation. - * - * @return current future. - * @throws IgniteCheckedException If failed. - */ - protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { throw new IgniteCheckedException("Future listening is not supported in " + getClass()); } - /** - * When overridden in a derived class, gets a custom future writer. - * - * @param opId Operation id. - * @return A custom writer for given op id. - */ - @Nullable protected PlatformFutureUtils.Writer futureWriter(int opId){ + /** {@inheritDoc} */ + @Override @Nullable public PlatformFutureUtils.Writer futureWriter(int opId){ return null; } - /** - * Process IN operation. - * - * @param type Type. - * @param val Value. - * @return Result. - * @throws IgniteCheckedException In case of exception. - */ - protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { return throwUnsupported(type); } - /** - * Process IN operation. - * - * @param type Type. - * @param reader Binary reader. - * @return Result. - * @throws IgniteCheckedException In case of exception. - */ - protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { return throwUnsupported(type); } - /** - * Process IN operation. - * - * @param type Type. - * @param reader Binary reader. - * @return Result. - * @throws IgniteCheckedException In case of exception. - */ - protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException { return processInStreamOutLong(type, reader); } - /** - * Process IN-OUT operation. - * - * @param type Type. - * @param reader Binary reader. - * @param writer Binary writer. - * @throws IgniteCheckedException In case of exception. - */ - protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + /** {@inheritDoc} */ + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { throwUnsupported(type); } - /** - * Process IN operation with managed object as result. - * - * @param type Type. - * @param reader Binary reader. - * @return Result. - * @throws IgniteCheckedException In case of exception. - */ - protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) + throws IgniteCheckedException { return throwUnsupported(type); } - /** - * Process IN-OUT operation. - * - * @param type Type. - * @param arg Argument. - * @param reader Binary reader. - * @param writer Binary writer. - * @throws IgniteCheckedException In case of exception. - */ - protected Object processInObjectStreamOutObjectStream(int type, @Nullable Object arg, BinaryRawReaderEx reader, - BinaryRawWriterEx writer) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public PlatformTarget processInObjectStreamOutObjectStream(int type, @Nullable PlatformTarget arg, + BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { return throwUnsupported(type); } - /** - * Process OUT operation. - * - * @param type Type. - * @param writer Binary writer. - * @throws IgniteCheckedException In case of exception. - */ - protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { throwUnsupported(type); } - /** - * Process OUT operation. - * - * @param type Type. - * @throws IgniteCheckedException In case of exception. - */ - protected Object processOutObject(int type) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { return throwUnsupported(type); } @@ -338,7 +132,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { * @return Dummy value which is never returned. * @throws IgniteCheckedException Exception to be thrown. */ - protected <T> T throwUnsupported(int type) throws IgniteCheckedException { + private <T> T throwUnsupported(int type) throws IgniteCheckedException { throw new IgniteCheckedException("Unsupported operation type: " + type); } @@ -411,4 +205,14 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { return TRUE; } + + /** + * Wraps a listenable to be returned to platform. + * + * @param listenable Listenable. + * @return Target. + */ + protected PlatformTarget wrapListenable(PlatformListenable listenable) { + return new PlatformListenableTarget(listenable, platformCtx); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java new file mode 100644 index 0000000..a4d35c9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Async target. + */ +public interface PlatformAsyncTarget { + /** + * Gets future for the current operation. + * + * @return current future. + * @throws IgniteCheckedException If failed. + */ + IgniteInternalFuture currentFuture() throws IgniteCheckedException; + + /** + * Gets a custom future writer. + * + * @param opId Operation id. + * @return A custom writer for given op id. + */ + @Nullable PlatformFutureUtils.Writer futureWriter(int opId); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java index fd357ec..2911418 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java @@ -61,27 +61,27 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException { + @Override public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException { + @Override public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException { return null; } @@ -91,47 +91,48 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException { + @Override public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary) + throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget transactions() { + @Override public PlatformTargetProxy transactions() { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget projection() throws IgniteCheckedException { + @Override public PlatformTargetProxy projection() throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget compute(PlatformTarget grp) { + @Override public PlatformTargetProxy compute(PlatformTargetProxy grp) { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget message(PlatformTarget grp) { + @Override public PlatformTargetProxy message(PlatformTargetProxy grp) { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget events(PlatformTarget grp) { + @Override public PlatformTargetProxy events(PlatformTargetProxy grp) { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget services(PlatformTarget grp) { + @Override public PlatformTargetProxy services(PlatformTargetProxy grp) { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget extensions() { + @Override public PlatformTargetProxy extensions() { return null; } @@ -142,7 +143,7 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget atomicLong(String name, long initVal, boolean create) throws IgniteException { + @Override public PlatformTargetProxy atomicLong(String name, long initVal, boolean create) throws IgniteException { return null; } @@ -157,22 +158,22 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget atomicSequence(String name, long initVal, boolean create) throws IgniteException { + @Override public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create) throws IgniteException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget atomicReference(String name, long memPtr, boolean create) throws IgniteException { + @Override public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create) throws IgniteException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr) { + @Override public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr) { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr) { + @Override public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr) { return null; } @@ -187,7 +188,7 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget binaryProcessor() { + @Override public PlatformTargetProxy binaryProcessor() { return null; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java index f01175e..e0d94d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java @@ -26,7 +26,7 @@ import org.jetbrains.annotations.Nullable; /** * Platform processor. */ -@SuppressWarnings("UnusedDeclaration") +@SuppressWarnings({"UnusedDeclaration", "UnnecessaryInterfaceModifier"}) public interface PlatformProcessor extends GridProcessor { /** * Gets owning Ignite instance. @@ -68,7 +68,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Cache. * @throws IgniteCheckedException If failed. */ - public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException; + public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException; /** * Create cache. @@ -77,7 +77,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Cache. * @throws IgniteCheckedException If failed. */ - public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException; + public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException; /** * Get or create cache. @@ -86,7 +86,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Cache. * @throws IgniteCheckedException If failed. */ - public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException; + public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException; /** * Create cache. @@ -95,7 +95,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Cache. * @throws IgniteCheckedException If failed. */ - public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException; + public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException; /** * Get or create cache. @@ -104,7 +104,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Cache. * @throws IgniteCheckedException If failed. */ - public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException; + public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException; /** * Destroy dynamically created cache. @@ -121,7 +121,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Affinity. * @throws IgniteCheckedException If failed. */ - public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException; + public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException; /** * Get data streamer. @@ -131,14 +131,14 @@ public interface PlatformProcessor extends GridProcessor { * @return Data streamer. * @throws IgniteCheckedException If failed. */ - public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException; + public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException; /** * Get transactions. * * @return Transactions. */ - public PlatformTarget transactions(); + public PlatformTargetProxy transactions(); /** * Get projection. @@ -146,7 +146,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Projection. * @throws IgniteCheckedException If failed. */ - public PlatformTarget projection() throws IgniteCheckedException; + public PlatformTargetProxy projection() throws IgniteCheckedException; /** * Create interop compute. @@ -154,7 +154,7 @@ public interface PlatformProcessor extends GridProcessor { * @param grp Cluster group. * @return Compute instance. */ - public PlatformTarget compute(PlatformTarget grp); + public PlatformTargetProxy compute(PlatformTargetProxy grp); /** * Create interop messaging. @@ -162,7 +162,7 @@ public interface PlatformProcessor extends GridProcessor { * @param grp Cluster group. * @return Messaging instance. */ - public PlatformTarget message(PlatformTarget grp); + public PlatformTargetProxy message(PlatformTargetProxy grp); /** * Create interop events. @@ -170,7 +170,7 @@ public interface PlatformProcessor extends GridProcessor { * @param grp Cluster group. * @return Events instance. */ - public PlatformTarget events(PlatformTarget grp); + public PlatformTargetProxy events(PlatformTargetProxy grp); /** * Create interop services. @@ -178,14 +178,14 @@ public interface PlatformProcessor extends GridProcessor { * @param grp Cluster group. * @return Services instance. */ - public PlatformTarget services(PlatformTarget grp); + public PlatformTargetProxy services(PlatformTargetProxy grp); /** * Get platform extensions. Override this method to provide any additional targets and operations you need. * * @return Platform extensions. */ - public PlatformTarget extensions(); + public PlatformTargetProxy extensions(); /** * Register cache store. @@ -203,7 +203,7 @@ public interface PlatformProcessor extends GridProcessor { * @param create Create flag. * @return Platform atomic long. */ - public PlatformTarget atomicLong(String name, long initVal, boolean create); + public PlatformTargetProxy atomicLong(String name, long initVal, boolean create); /** * Get or create AtomicSequence. @@ -212,7 +212,7 @@ public interface PlatformProcessor extends GridProcessor { * @param create Create flag. * @return Platform atomic long. */ - public PlatformTarget atomicSequence(String name, long initVal, boolean create); + public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create); /** * Get or create AtomicReference. @@ -221,7 +221,7 @@ public interface PlatformProcessor extends GridProcessor { * @param create Create flag. * @return Platform atomic long. */ - public PlatformTarget atomicReference(String name, long memPtr, boolean create); + public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create); /** * Gets the configuration of the current Ignite instance. @@ -244,7 +244,7 @@ public interface PlatformProcessor extends GridProcessor { * @param memPtr Pointer to a stream with near cache config. 0 for default config. * @return Cache. */ - public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr); + public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr); /** * Gets existing near cache with the given name or creates a new one. @@ -253,7 +253,7 @@ public interface PlatformProcessor extends GridProcessor { * @param memPtr Pointer to a stream with near cache config. 0 for default config. * @return Cache. */ - public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr); + public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr); /** * Gets a value indicating whether Ignite logger has specified level enabled. @@ -277,5 +277,5 @@ public interface PlatformProcessor extends GridProcessor { * * @return Binary processor. */ - public PlatformTarget binaryProcessor(); + public PlatformTargetProxy binaryProcessor(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java index f775987..8c81ebb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java @@ -220,7 +220,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException { IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().cache(name); if (cache == null) @@ -230,7 +230,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException { IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createCache(name); assert cache != null; @@ -239,7 +239,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException { IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateCache(name); assert cache != null; @@ -248,7 +248,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException { + @Override public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException { BinaryRawReaderEx reader = platformCtx.reader(platformCtx.memory().get(memPtr)); CacheConfiguration cfg = PlatformConfigurationUtils.readCacheConfiguration(reader); @@ -260,7 +260,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException { + @Override public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException { BinaryRawReaderEx reader = platformCtx.reader(platformCtx.memory().get(memPtr)); CacheConfiguration cfg = PlatformConfigurationUtils.readCacheConfiguration(reader); @@ -278,60 +278,60 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException { - return new PlatformAffinity(platformCtx, ctx, name); + @Override public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException { + return proxy(new PlatformAffinity(platformCtx, ctx, name)); } /** {@inheritDoc} */ - @Override public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary) + @Override public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException { IgniteDataStreamer ldr = ctx.dataStream().dataStreamer(cacheName); ldr.keepBinary(true); - return new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepBinary); + return proxy(new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepBinary)); } /** {@inheritDoc} */ - @Override public PlatformTarget transactions() { - return new PlatformTransactions(platformCtx); + @Override public PlatformTargetProxy transactions() { + return proxy(new PlatformTransactions(platformCtx)); } /** {@inheritDoc} */ - @Override public PlatformTarget projection() throws IgniteCheckedException { - return new PlatformClusterGroup(platformCtx, ctx.grid().cluster()); + @Override public PlatformTargetProxy projection() throws IgniteCheckedException { + return proxy(new PlatformClusterGroup(platformCtx, ctx.grid().cluster())); } /** {@inheritDoc} */ - @Override public PlatformTarget compute(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + @Override public PlatformTargetProxy compute(PlatformTargetProxy grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap(); - return new PlatformCompute(platformCtx, grp0.projection(), PlatformUtils.ATTR_PLATFORM); + return proxy(new PlatformCompute(platformCtx, grp0.projection(), PlatformUtils.ATTR_PLATFORM)); } /** {@inheritDoc} */ - @Override public PlatformTarget message(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + @Override public PlatformTargetProxy message(PlatformTargetProxy grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap(); - return new PlatformMessaging(platformCtx, grp0.projection().ignite().message(grp0.projection())); + return proxy(new PlatformMessaging(platformCtx, grp0.projection().ignite().message(grp0.projection()))); } /** {@inheritDoc} */ - @Override public PlatformTarget events(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + @Override public PlatformTargetProxy events(PlatformTargetProxy grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap(); - return new PlatformEvents(platformCtx, grp0.projection().ignite().events(grp0.projection())); + return proxy(new PlatformEvents(platformCtx, grp0.projection().ignite().events(grp0.projection()))); } /** {@inheritDoc} */ - @Override public PlatformTarget services(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + @Override public PlatformTargetProxy services(PlatformTargetProxy grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap(); - return new PlatformServices(platformCtx, grp0.projection().ignite().services(grp0.projection()), false); + return proxy(new PlatformServices(platformCtx, grp0.projection().ignite().services(grp0.projection()), false)); } /** {@inheritDoc} */ - @Override public PlatformTarget extensions() { + @Override public PlatformTargetProxy extensions() { return null; } @@ -356,28 +356,32 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget atomicLong(String name, long initVal, boolean create) throws IgniteException { + @Override public PlatformTargetProxy atomicLong(String name, long initVal, boolean create) throws IgniteException { GridCacheAtomicLongImpl atomicLong = (GridCacheAtomicLongImpl)ignite().atomicLong(name, initVal, create); if (atomicLong == null) return null; - return new PlatformAtomicLong(platformCtx, atomicLong); + return proxy(new PlatformAtomicLong(platformCtx, atomicLong)); } /** {@inheritDoc} */ - @Override public PlatformTarget atomicSequence(String name, long initVal, boolean create) throws IgniteException { + @Override public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create) + throws IgniteException { IgniteAtomicSequence atomicSeq = ignite().atomicSequence(name, initVal, create); if (atomicSeq == null) return null; - return new PlatformAtomicSequence(platformCtx, atomicSeq); + return proxy(new PlatformAtomicSequence(platformCtx, atomicSeq)); } /** {@inheritDoc} */ - @Override public PlatformTarget atomicReference(String name, long memPtr, boolean create) throws IgniteException { - return PlatformAtomicReference.createInstance(platformCtx, name, memPtr, create); + @Override public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create) + throws IgniteException { + PlatformAtomicReference ref = PlatformAtomicReference.createInstance(platformCtx, name, memPtr, create); + + return ref != null ? proxy(ref) : null; } /** {@inheritDoc} */ @@ -427,7 +431,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr) { + @Override public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr) { NearCacheConfiguration cfg = getNearCacheConfiguration(memPtr); IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createNearCache(cacheName, cfg); @@ -436,7 +440,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr) { + @Override public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr) { NearCacheConfiguration cfg = getNearCacheConfiguration(memPtr); IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateNearCache(cacheName, cfg); @@ -447,8 +451,8 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf /** * Creates new platform cache. */ - private PlatformTarget createPlatformCache(IgniteCacheProxy cache) { - return new PlatformCache(platformCtx, cache, false, cacheExts); + private PlatformTargetProxy createPlatformCache(IgniteCacheProxy cache) { + return proxy(new PlatformCache(platformCtx, cache, false, cacheExts)); } /** {@inheritDoc} */ @@ -504,8 +508,8 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ - @Override public PlatformTarget binaryProcessor() { - return new PlatformBinaryProcessor(platformCtx); + @Override public PlatformTargetProxy binaryProcessor() { + return proxy(new PlatformBinaryProcessor(platformCtx)); } /** @@ -580,6 +584,13 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** + * Wraps target in a proxy. + */ + private PlatformTargetProxy proxy(PlatformTarget target) { + return new PlatformTargetProxyImpl(target, platformCtx); + } + + /** * Store and manager pair. */ private static class StoreInfo { http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java index 805fd5e..5d234dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.jetbrains.annotations.Nullable; /** @@ -27,94 +29,89 @@ import org.jetbrains.annotations.Nullable; @SuppressWarnings("UnusedDeclaration") public interface PlatformTarget { /** - * Operation accepting long value and returning long value. + * Process IN operation. * - * @param type Operation type. + * @param type Type. * @param val Value. * @return Result. - * @throws Exception If case of failure. + * @throws IgniteCheckedException In case of exception. */ - public long inLongOutLong(int type, long val) throws Exception; + long processInLongOutLong(int type, long val) throws IgniteCheckedException; /** - * Operation accepting memory stream and returning long value. + * Process IN operation. * - * @param type Operation type. - * @param memPtr Memory pointer. + * @param type Type. + * @param reader Binary reader. * @return Result. - * @throws Exception If case of failure. + * @throws IgniteCheckedException In case of exception. */ - public long inStreamOutLong(int type, long memPtr) throws Exception; + long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException; /** - * Operation accepting memory stream and returning object. + * Process IN operation. * - * @param type Operation type. - * @param memPtr Memory pointer. + * @param type Type. + * @param reader Binary reader. * @return Result. - * @throws Exception If case of failure. + * @throws IgniteCheckedException In case of exception. */ - public Object inStreamOutObject(int type, long memPtr) throws Exception; + long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException; /** - * Operation accepting one memory stream and returning result to another memory stream. + * Process IN-OUT operation. * - * @param type Operation type. - * @param inMemPtr Input memory pointer. - * @param outMemPtr Output memory pointer. - * @throws Exception In case of failure. + * @param type Type. + * @param reader Binary reader. + * @param writer Binary writer. + * @throws IgniteCheckedException In case of exception. */ - public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception; + void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + throws IgniteCheckedException; /** - * Operation accepting an object and a memory stream and returning result to another memory stream and an object. + * Process IN-OUT operation. * - * @param type Operation type. - * @param arg Argument (optional). - * @param inMemPtr Input memory pointer. - * @param outMemPtr Output memory pointer. - * @return Result. - * @throws Exception In case of failure. + * @param type Type. + * @param reader Binary reader. + * @throws IgniteCheckedException In case of exception. */ - public Object inObjectStreamOutObjectStream(int type, @Nullable Object arg, long inMemPtr, long outMemPtr) - throws Exception; + PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException; /** - * Operation returning result to memory stream. + * Process IN-OUT operation. * - * @param type Operation type. - * @param memPtr Memory pointer. - * @throws Exception In case of failure. + * @param type Type. + * @param arg Argument. + * @param reader Binary reader. + * @param writer Binary writer. + * @throws IgniteCheckedException In case of exception. */ - public void outStream(int type, long memPtr) throws Exception; + PlatformTarget processInObjectStreamOutObjectStream(int type, @Nullable PlatformTarget arg, BinaryRawReaderEx reader, + BinaryRawWriterEx writer) throws IgniteCheckedException; /** - * Operation returning object result. + * Process OUT operation. * - * @param type Operation type. - * @return Result. - * @throws Exception If failed. + * @param type Type. + * @param writer Binary writer. + * @throws IgniteCheckedException In case of exception. */ - public Object outObject(int type) throws Exception; + void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException; /** - * Start listening for the future. + * Process OUT operation. * - * @param futId Future ID. - * @param typ Result type. - * @throws IgniteCheckedException In case of failure. + * @param type Type. + * @throws IgniteCheckedException In case of exception. */ - @SuppressWarnings("UnusedDeclaration") - public void listenFuture(final long futId, int typ) throws Exception; + PlatformTarget processOutObject(int type) throws IgniteCheckedException; /** - * Start listening for the future for specific operation type. + * Convert caught exception. * - * @param futId Future ID. - * @param typ Result type. - * @param opId Operation ID required to pick correct result writer. - * @throws IgniteCheckedException In case of failure. + * @param e Exception to convert. + * @return Converted exception. */ - @SuppressWarnings("UnusedDeclaration") - public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception; + Exception convertException(Exception e); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java new file mode 100644 index 0000000..a4f2a56 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.IgniteCheckedException; +import org.jetbrains.annotations.Nullable; + +/** + * Platform target that is invoked via JNI and propagates calls to underlying {@link PlatformTarget}. + */ +@SuppressWarnings("UnusedDeclaration") +public interface PlatformTargetProxy { + /** + * Operation accepting long value and returning long value. + * + * @param type Operation type. + * @param val Value. + * @return Result. + * @throws Exception If case of failure. + */ + long inLongOutLong(int type, long val) throws Exception; + + /** + * Operation accepting memory stream and returning long value. + * + * @param type Operation type. + * @param memPtr Memory pointer. + * @return Result. + * @throws Exception If case of failure. + */ + long inStreamOutLong(int type, long memPtr) throws Exception; + + /** + * Operation accepting memory stream and returning object. + * + * @param type Operation type. + * @param memPtr Memory pointer. + * @return Result. + * @throws Exception If case of failure. + */ + Object inStreamOutObject(int type, long memPtr) throws Exception; + + /** + * Operation accepting one memory stream and returning result to another memory stream. + * + * @param type Operation type. + * @param inMemPtr Input memory pointer. + * @param outMemPtr Output memory pointer. + * @throws Exception In case of failure. + */ + void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception; + + /** + * Operation accepting an object and a memory stream and returning result to another memory stream and an object. + * + * @param type Operation type. + * @param arg Argument (optional). + * @param inMemPtr Input memory pointer. + * @param outMemPtr Output memory pointer. + * @return Result. + * @throws Exception In case of failure. + */ + Object inObjectStreamOutObjectStream(int type, @Nullable Object arg, long inMemPtr, long outMemPtr) + throws Exception; + + /** + * Operation returning result to memory stream. + * + * @param type Operation type. + * @param memPtr Memory pointer. + * @throws Exception In case of failure. + */ + void outStream(int type, long memPtr) throws Exception; + + /** + * Operation returning object result. + * + * @param type Operation type. + * @return Result. + * @throws Exception If failed. + */ + Object outObject(int type) throws Exception; + + /** + * Start listening for the future. + * + * @param futId Future ID. + * @param typ Result type. + * @throws IgniteCheckedException In case of failure. + */ + @SuppressWarnings("UnusedDeclaration") + void listenFuture(final long futId, int typ) throws Exception; + + /** + * Start listening for the future for specific operation type. + * + * @param futId Future ID. + * @param typ Result type. + * @param opId Operation ID required to pick correct result writer. + * @throws IgniteCheckedException In case of failure. + */ + @SuppressWarnings("UnusedDeclaration") + void listenFutureForOperation(final long futId, int typ, int opId) throws Exception; + + /** + * Returns the underlying target. + * + * @return Underlying target. + */ + PlatformTarget unwrap(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java new file mode 100644 index 0000000..25a4de8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; + +/** + * Platform target that is invoked via JNI and propagates calls to underlying {@link PlatformTarget}. + */ +public class PlatformTargetProxyImpl implements PlatformTargetProxy { + /** Context. */ + protected final PlatformContext platformCtx; + + /** Underlying target. */ + private final PlatformTarget target; + + public PlatformTargetProxyImpl(PlatformTarget target, PlatformContext platformCtx) { + assert platformCtx != null; + assert target != null; + + this.platformCtx = platformCtx; + this.target = target; + } + + /** {@inheritDoc} */ + @Override public long inLongOutLong(int type, long val) throws Exception { + try { + return target.processInLongOutLong(type, val); + } + catch (Exception e) { + throw target.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public long inStreamOutLong(int type, long memPtr) throws Exception { + try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { + BinaryRawReaderEx reader = platformCtx.reader(mem); + + return target.processInStreamOutLong(type, reader, mem); + } + catch (Exception e) { + throw target.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object inStreamOutObject(int type, long memPtr) throws Exception { + try (PlatformMemory mem = memPtr != 0 ? platformCtx.memory().get(memPtr) : null) { + BinaryRawReaderEx reader = mem != null ? platformCtx.reader(mem) : null; + + return wrapProxy(target.processInStreamOutObject(type, reader)); + } + catch (Exception e) { + throw target.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void outStream(int type, long memPtr) throws Exception { + try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { + PlatformOutputStream out = mem.output(); + + BinaryRawWriterEx writer = platformCtx.writer(out); + + target.processOutStream(type, writer); + + out.synchronize(); + } + catch (Exception e) { + throw target.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object outObject(int type) throws Exception { + try { + return wrapProxy(target.processOutObject(type)); + } + catch (Exception e) { + throw target.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception { + try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) { + BinaryRawReaderEx reader = platformCtx.reader(inMem); + + try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) { + PlatformOutputStream out = outMem.output(); + + BinaryRawWriterEx writer = platformCtx.writer(out); + + target.processInStreamOutStream(type, reader, writer); + + out.synchronize(); + } + } + catch (Exception e) { + throw target.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object inObjectStreamOutObjectStream(int type, Object arg, long inMemPtr, long outMemPtr) + throws Exception { + PlatformMemory inMem = null; + PlatformMemory outMem = null; + + try { + BinaryRawReaderEx reader = null; + + if (inMemPtr != 0) { + inMem = platformCtx.memory().get(inMemPtr); + + reader = platformCtx.reader(inMem); + } + + PlatformOutputStream out = null; + BinaryRawWriterEx writer = null; + + if (outMemPtr != 0) { + outMem = platformCtx.memory().get(outMemPtr); + + out = outMem.output(); + + writer = platformCtx.writer(out); + } + + PlatformTarget res = target.processInObjectStreamOutObjectStream(type, unwrapProxy(arg), reader, writer); + + if (out != null) + out.synchronize(); + + return wrapProxy(res); + } + catch (Exception e) { + throw target.convertException(e); + } + finally { + try { + if (inMem != null) + inMem.close(); + } + finally { + if (outMem != null) + outMem.close(); + } + } + } + + /** {@inheritDoc} */ + @Override public void listenFuture(final long futId, int typ) throws Exception { + PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, target); + } + + /** {@inheritDoc} */ + @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception { + PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), target); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget unwrap() { + return target; + } + + /** + * @return Future writer. + */ + private PlatformFutureUtils.Writer futureWriter(int opId) { + return ((PlatformAsyncTarget)target).futureWriter(opId); + } + + /** + * @return Current future. + */ + private IgniteInternalFuture currentFuture() throws IgniteCheckedException { + return ((PlatformAsyncTarget)target).currentFuture(); + } + + /** + * Wraps an object in a proxy when possible. + * + * @param obj Object to wrap. + * @return Wrapped object. + */ + private Object wrapProxy(PlatformTarget obj) { + return obj == null ? null : new PlatformTargetProxyImpl(obj, platformCtx); + } + + /** + * Unwraps an object from a proxy when possible. + * + * @param obj Object to unwrap. + * @return Unwrapped object. + */ + private PlatformTarget unwrapProxy(Object obj) { + return obj == null ? null : ((PlatformTargetProxyImpl)obj).target; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java index 1bb577e..3c00abc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java @@ -49,7 +49,7 @@ public class PlatformBinaryProcessor extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { if (type == OP_PUT_META) { platformCtx.processMetadata(reader); @@ -60,7 +60,7 @@ public class PlatformBinaryProcessor extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { if (type == OP_GET_ALL_META) platformCtx.writeAllMetadata(writer); else @@ -68,7 +68,7 @@ public class PlatformBinaryProcessor extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_GET_META: { http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index aec3703..aee317e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformNativeException; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryProxy; import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor; @@ -400,7 +401,7 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException { try { switch (type) { @@ -824,7 +825,7 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) + @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_QRY_SQL: @@ -903,7 +904,7 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_GET_NAME: writer.writeObject(cache.getName()); @@ -940,7 +941,7 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Object processOutObject(int type) throws IgniteCheckedException { + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { switch (type) { case OP_WITH_ASYNC: { if (cache.isAsync()) @@ -983,7 +984,7 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_SIZE: { CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes((int)val); @@ -1121,12 +1122,12 @@ public class PlatformCache extends PlatformAbstractTarget { } /** <inheritDoc /> */ - @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { return ((IgniteFutureImpl) cacheAsync.future()).internalFuture(); } /** <inheritDoc /> */ - @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) { + @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) { if (opId == OP_GET_ALL) return WRITER_GET_ALL; http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java index 292caea..4c11cc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java @@ -47,7 +47,7 @@ public class PlatformCacheIterator extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_NEXT: if (iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java index 12df188..e24345c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java @@ -117,7 +117,7 @@ public class PlatformAffinity extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_PARTITION: return aff.partition(reader.readObjectDetached()); @@ -168,7 +168,7 @@ public class PlatformAffinity extends PlatformAbstractTarget { /** {@inheritDoc} */ @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"}) - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_PRIMARY_PARTITIONS: { http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java index 8076a19..2d3cada 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java @@ -26,6 +26,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; @@ -279,7 +280,11 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl ? new PlatformAffinityFunctionTarget(ctx, baseFunc) : null; - ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTarget); + PlatformTargetProxyImpl baseTargetProxy = baseTarget != null + ? new PlatformTargetProxyImpl(baseTarget, ctx) + : null; + + ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTargetProxy); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java index 8a07b33..342e726 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java @@ -71,7 +71,7 @@ public class PlatformAffinityFunctionTarget extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { if (type == OP_PARTITION) return baseFunc.partition(reader.readObjectDetached()); else if (type == OP_REMOVE_NODE) { @@ -84,7 +84,7 @@ public class PlatformAffinityFunctionTarget extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { if (type == OP_ASSIGN_PARTITIONS) { AffinityFunctionContext affCtx = currentAffCtx.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java index 6a259ca..f201425 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java @@ -71,7 +71,7 @@ public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTar } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, final BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, final BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_GET_BATCH: { assert iter != null : "iterator() has not been called"; @@ -136,7 +136,7 @@ public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTar } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_ITERATOR: iter = cursor.iterator(); http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java index 04f17ff..27d784a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.platform.cache.query; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; /** * Proxy that implements PlatformTarget. @@ -41,7 +42,7 @@ public class PlatformContinuousQueryProxy extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override public Object outObject(int type) throws Exception { + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { return qry.getInitialQueryCursor(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index f21861e..c77f501 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.platform.callback; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget; +import org.apache.ignite.internal.processors.platform.PlatformTargetProxy; import org.apache.ignite.internal.util.GridStripedSpinBusyLock; /** @@ -429,7 +429,7 @@ public class PlatformCallbackGateway { * @param memPtr Stream pointer. * @param keepBinary Binary flag. */ - public void dataStreamerStreamReceiverInvoke(long ptr, Object cache, long memPtr, boolean keepBinary) { + public void dataStreamerStreamReceiverInvoke(long ptr, PlatformTargetProxy cache, long memPtr, boolean keepBinary) { enter(); try { @@ -995,7 +995,7 @@ public class PlatformCallbackGateway { * @param baseFunc Optional func for base calls. * @return Affinity function pointer. */ - public long affinityFunctionInit(long memPtr, PlatformAffinityFunctionTarget baseFunc) { + public long affinityFunctionInit(long memPtr, PlatformTargetProxy baseFunc) { enter(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java index 50c4c28..9d60ec0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.platform.callback; -import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget; +import org.apache.ignite.internal.processors.platform.PlatformTargetProxy; /** * Platform callback utility methods. Implemented in target platform. All methods in this class must be @@ -226,7 +226,7 @@ public class PlatformCallbackUtils { * @param memPtr Stream pointer. * @param keepBinary Binary flag. */ - static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, Object cache, long memPtr, + static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, PlatformTargetProxy cache, long memPtr, boolean keepBinary); /** @@ -504,7 +504,7 @@ public class PlatformCallbackUtils { * @param baseFunc Optional func for base calls. * @return Affinity function pointer. */ - static native long affinityFunctionInit(long envPtr, long memPtr, PlatformAffinityFunctionTarget baseFunc); + static native long affinityFunctionInit(long envPtr, long memPtr, PlatformTargetProxy baseFunc); /** * Gets the partition from affinity function. http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java index dc73468..f49f477 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.cache.PlatformCache; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -120,7 +121,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget { /** {@inheritDoc} */ @SuppressWarnings("deprecation") - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_METRICS: platformCtx.writeClusterMetrics(writer, prj.metrics()); @@ -134,7 +135,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget { /** {@inheritDoc} */ @SuppressWarnings({"ConstantConditions", "deprecation"}) - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_METRICS_FILTERED: { @@ -217,7 +218,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_PING_NODE: return pingNode(reader.readUuid()) ? TRUE : FALSE; @@ -228,7 +229,8 @@ public class PlatformClusterGroup extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) + throws IgniteCheckedException { switch (type) { case OP_FOR_NODE_IDS: { Collection<UUID> ids = PlatformUtils.readCollection(reader); @@ -272,8 +274,8 @@ public class PlatformClusterGroup extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Object processInObjectStreamOutObjectStream( - int type, @Nullable Object arg, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public PlatformTarget processInObjectStreamOutObjectStream( + int type, @Nullable PlatformTarget arg, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_FOR_OTHERS: { @@ -289,7 +291,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Object processOutObject(int type) throws IgniteCheckedException { + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { switch (type) { case OP_FOR_REMOTES: return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes()); @@ -314,7 +316,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_RESET_METRICS: { assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
