Repository: ignite Updated Branches: refs/heads/master 5f58bbb57 -> 9b06cf3d8
IGNITE-1340: Fixes to exception propagation. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b06cf3d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b06cf3d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b06cf3d Branch: refs/heads/master Commit: 9b06cf3d896d17af5a132a8849df41f8822dce96 Parents: 5f58bbb Author: Pavel Tupitsyn <[email protected]> Authored: Tue Sep 1 15:57:36 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Sep 1 15:57:36 2015 +0300 ---------------------------------------------------------------------- .../platform/PlatformAbstractTarget.java | 34 +++----- .../platform/cache/PlatformCache.java | 42 ++++++---- .../PlatformCachePartialUpdateException.java | 9 ++- .../datastreamer/PlatformDataStreamer.java | 2 +- .../transactions/PlatformTransactions.java | 2 +- .../platform/utils/PlatformFutureUtils.java | 82 ++++++++++++++++++-- 6 files changed, 116 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java index 71d1657..0f46517 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -18,14 +18,14 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.portable.PortableRawReaderEx; import org.apache.ignite.internal.portable.PortableRawWriterEx; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; -import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; @@ -171,7 +171,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { * @param e Exception to convert. * @return Converted exception. */ - protected Exception convertException(Exception e) { + public Exception convertException(Exception e) { return e; } @@ -184,12 +184,12 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { /** {@inheritDoc} */ @Override public void listenFuture(final long futId, int typ) throws Exception { - PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null); + PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null, this); } /** {@inheritDoc} */ @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception { - PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId)); + PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId), this); } /** @@ -199,26 +199,10 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { * @throws IgniteCheckedException If failed. */ @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"}) - protected IgniteFuture currentFutureWrapped() throws IgniteCheckedException { - return currentFuture().chain(new IgniteClosure<IgniteFuture, Object>() { - @Override public Object apply(IgniteFuture o) { - try { - return o.get(); - } - catch (RuntimeException e) { - Exception converted = convertException(e); - - if (converted instanceof RuntimeException) - throw (RuntimeException)converted; - else { - log.error("Interop future result cannot be obtained due to exception.", converted); - - throw new IgniteException("Interop future result cannot be obtained due to exception " + - "(see log for more details)."); - } - } - } - }); + protected IgniteInternalFuture currentFutureWrapped() throws IgniteCheckedException { + IgniteFutureImpl fut = (IgniteFutureImpl)currentFuture(); + + return fut.internalFuture(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index a7c741e..e579be7 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -17,18 +17,6 @@ package org.apache.ignite.internal.processors.platform.cache; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import javax.cache.Cache; -import javax.cache.expiry.Duration; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheEntryProcessor; @@ -43,6 +31,7 @@ import org.apache.ignite.cache.query.TextQuery; import org.apache.ignite.internal.portable.PortableRawReaderEx; import org.apache.ignite.internal.portable.PortableRawWriterEx; import org.apache.ignite.internal.processors.cache.CacheOperationContext; +import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; @@ -58,6 +47,19 @@ import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; +import javax.cache.Cache; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; + /** * Native cache wrapper implementation. */ @@ -618,9 +620,16 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Exception convertException(Exception e) { + @Override public Exception convertException(Exception e) { if (e instanceof CachePartialUpdateException) - return new PlatformCachePartialUpdateException((CachePartialUpdateException)e, platformCtx, keepPortable); + return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e.getCause(), + platformCtx, keepPortable); + + if (e instanceof CachePartialUpdateCheckedException) + return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e, platformCtx, keepPortable); + + if (e.getCause() instanceof EntryProcessorException) + return (EntryProcessorException) e.getCause(); return super.convertException(e); } @@ -788,11 +797,10 @@ public class PlatformCache extends PlatformAbstractTarget { */ public void rebalance(long futId) { PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() { - @Override - public Object apply(IgniteFuture fut) { + @Override public Object apply(IgniteFuture fut) { return null; } - }), futId, PlatformFutureUtils.TYP_OBJ); + }), futId, PlatformFutureUtils.TYP_OBJ, this); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java index 354cef7..58dfa4c 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java @@ -17,14 +17,15 @@ package org.apache.ignite.internal.processors.platform.cache; -import java.util.Collection; -import org.apache.ignite.cache.CachePartialUpdateException; import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformException; import org.apache.ignite.internal.processors.platform.PlatformExtendedException; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import java.util.Collection; + /** * Interop cache partial update exception. */ @@ -45,7 +46,7 @@ public class PlatformCachePartialUpdateException extends PlatformException imple * @param ctx Context. * @param keepPortable Keep portable flag. */ - public PlatformCachePartialUpdateException(CachePartialUpdateException cause, PlatformContext ctx, + public PlatformCachePartialUpdateException(CachePartialUpdateCheckedException cause, PlatformContext ctx, boolean keepPortable) { super(cause); @@ -60,7 +61,7 @@ public class PlatformCachePartialUpdateException extends PlatformException imple /** {@inheritDoc} */ @Override public void writeData(PortableRawWriterEx writer) { - Collection keys = ((CachePartialUpdateException)getCause()).failedKeys(); + Collection keys = ((CachePartialUpdateCheckedException)getCause()).failedKeys(); writer.writeBoolean(keepPortable); http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java index e0e9305..ef64ef9 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java @@ -110,7 +110,7 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { vals.add(new GridMapEntry(reader.readObjectDetached(), reader.readObjectDetached())); PlatformFutureUtils.listen(platformCtx, ldr.addData(vals), futPtr, - PlatformFutureUtils.TYP_OBJ); + PlatformFutureUtils.TYP_OBJ, this); } if (plc == PLC_CLOSE) { http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java index 86942c5..1d2c315 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java @@ -179,7 +179,7 @@ public class PlatformTransactions extends PlatformAbstractTarget { } }); - PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ); + PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, this); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java index 59e5463..0019986 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.processors.platform.utils; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; @@ -66,8 +68,35 @@ public class PlatformFutureUtils { * @param futPtr Native future pointer. * @param typ Expected return type. */ - public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ) { - listen(ctx, new FutureListenable(fut), futPtr, typ, null); + public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, final int typ, + PlatformAbstractTarget target) { + listen(ctx, new InternalFutureListenable(fut), futPtr, typ, null, target); + } + /** + * Listen future. + * + * @param ctx Context. + * @param fut Java future. + * @param futPtr Native future pointer. + * @param typ Expected return type. + */ + public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ, + PlatformAbstractTarget target) { + listen(ctx, new FutureListenable(fut), futPtr, typ, null, target); + } + + /** + * Listen future. + * + * @param ctx Context. + * @param fut Java future. + * @param futPtr Native future pointer. + * @param typ Expected return type. + * @param writer Writer. + */ + public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, final int typ, + Writer writer, PlatformAbstractTarget target) { + listen(ctx, new InternalFutureListenable(fut), futPtr, typ, writer, target); } /** @@ -80,8 +109,8 @@ public class PlatformFutureUtils { * @param writer Writer. */ public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ, - Writer writer) { - listen(ctx, new FutureListenable(fut), futPtr, typ, writer); + Writer writer, PlatformAbstractTarget target) { + listen(ctx, new FutureListenable(fut), futPtr, typ, writer, target); } /** @@ -92,8 +121,9 @@ public class PlatformFutureUtils { * @param futPtr Native future pointer. * @param writer Writer. */ - public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, Writer writer) { - listen(ctx, new FutureListenable(fut), futPtr, TYP_OBJ, writer); + public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, Writer writer, + PlatformAbstractTarget target) { + listen(ctx, new InternalFutureListenable(fut), futPtr, TYP_OBJ, writer, target); } /** @@ -107,13 +137,16 @@ public class PlatformFutureUtils { */ @SuppressWarnings("unchecked") private static void listen(final PlatformContext ctx, Listenable listenable, final long futPtr, final int typ, - @Nullable final Writer writer) { + @Nullable final Writer writer, final PlatformAbstractTarget target) { final PlatformCallbackGateway gate = ctx.gateway(); listenable.listen(new IgniteBiInClosure<Object, Throwable>() { private static final long serialVersionUID = 0L; @Override public void apply(Object res, Throwable err) { + if (err instanceof Exception) + err = target.convertException((Exception)err); + if (writer != null && writeToWriter(res, err, ctx, writer, futPtr)) return; @@ -326,4 +359,39 @@ public class PlatformFutureUtils { }); } } + + /** + * Listenable around Ignite future. + */ + private static class InternalFutureListenable implements Listenable { + /** Future. */ + private final IgniteInternalFuture fut; + + /** + * Constructor. + * + * @param fut Future. + */ + public InternalFutureListenable(IgniteInternalFuture fut) { + this.fut = fut; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void listen(final IgniteBiInClosure<Object, Throwable> lsnr) { + fut.listen(new IgniteInClosure<IgniteInternalFuture>() { + private static final long serialVersionUID = 0L; + + @Override public void apply(IgniteInternalFuture fut0) { + try { + lsnr.apply(fut0.get(), null); + } + catch (Throwable err) { + lsnr.apply(null, err); + } + } + }); + } + } + } \ No newline at end of file
