http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java index 8ffec00..8750cab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java @@ -24,6 +24,7 @@ import java.io.ObjectOutput; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; @@ -46,6 +47,7 @@ import org.apache.ignite.transactions.TransactionState; /** * Cache transaction proxy. */ +@SuppressWarnings("unchecked") public class TransactionProxyImpl<K, V> implements TransactionProxy, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -270,6 +272,18 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> commitAsync() throws IgniteException { + enter(); + + try { + return (IgniteFuture<Void>)createFuture(cctx.commitTxAsync(tx)); + } + finally { + leave(); + } + } + + /** {@inheritDoc} */ @Override public void close() { enter(); @@ -304,6 +318,21 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza } } + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException { + enter(); + + try { + return (IgniteFuture<Void>)(new IgniteFutureImpl(cctx.rollbackTxAsync(tx))); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + leave(); + } + } + /** * @param res Result to convert to finished future. */ @@ -315,6 +344,14 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza * @param fut Internal future. */ private void saveFuture(IgniteInternalFuture<IgniteInternalTx> fut) { + asyncRes = createFuture(fut); + } + + /** + * @param fut Internal future. + * @return User future. + */ + private IgniteFuture<?> createFuture(IgniteInternalFuture<IgniteInternalTx> fut) { IgniteInternalFuture<Transaction> fut0 = fut.chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, Transaction>() { @Override public Transaction applyx(IgniteInternalFuture<IgniteInternalTx> fut) throws IgniteCheckedException { fut.get(); @@ -323,7 +360,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza } }); - asyncRes = new IgniteFutureImpl(fut0); + return new IgniteFutureImpl(fut0); } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java index 106ef60..b5289a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.Collection; import java.util.Map; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.igfs.IgfsBlockLocation; @@ -36,6 +37,7 @@ import org.apache.ignite.igfs.mapreduce.IgfsTask; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.internal.AsyncSupportAdapter; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -58,7 +60,7 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme /** {@inheritDoc} */ @Override public void format() { try { - saveOrGet(igfs.formatAsync()); + saveOrGet(igfs.formatAsync0()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -66,10 +68,15 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> formatAsync() throws IgniteException { + return igfs.formatAsync(); + } + + /** {@inheritDoc} */ @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { try { - return saveOrGet(igfs.executeAsync(task, rslvr, paths, arg)); + return saveOrGet(igfs.executeAsync0(task, rslvr, paths, arg)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -77,10 +84,16 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme } /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException { + return igfs.executeAsync(task, rslvr, paths, arg); + } + + /** {@inheritDoc} */ @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { try { - return saveOrGet(igfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + return saveOrGet(igfs.executeAsync0(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -88,10 +101,17 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme } /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + return igfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg); + } + + /** {@inheritDoc} */ @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { try { - return saveOrGet(igfs.executeAsync(taskCls, rslvr, paths, arg)); + return saveOrGet(igfs.executeAsync0(taskCls, rslvr, paths, arg)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -99,11 +119,17 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme } /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException { + return igfs.executeAsync(taskCls, rslvr, paths, arg); + } + + /** {@inheritDoc} */ @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { try { - return saveOrGet(igfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + return saveOrGet(igfs.executeAsync0(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -111,6 +137,13 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme } /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + return igfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg); + } + + /** {@inheritDoc} */ @Override public void stop(boolean cancel) { igfs.stop(cancel); } http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 7165f31..18506cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -83,6 +83,7 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallabl import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; @@ -1430,12 +1431,17 @@ public final class IgfsImpl implements IgfsEx { } } + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> formatAsync() throws IgniteException { + return (IgniteFuture<Void>)createFuture(formatAsync0()); + } + /** * Formats the file system removing all existing entries from it. * * @return Future. */ - IgniteInternalFuture<?> formatAsync() { + IgniteInternalFuture<?> formatAsync0() { GridFutureAdapter<?> fut = new GridFutureAdapter<>(); Thread t = new Thread(new FormatRunnable(fut), "igfs-format-" + cfg.getName() + "-" + @@ -1452,7 +1458,7 @@ public final class IgfsImpl implements IgfsEx { @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { try { - return executeAsync(task, rslvr, paths, arg).get(); + return executeAsync0(task, rslvr, paths, arg).get(); } catch (Exception e) { throw IgfsUtils.toIgfsException(e); @@ -1460,10 +1466,16 @@ public final class IgfsImpl implements IgfsEx { } /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException { + return createFuture(executeAsync0(task, rslvr, paths, arg)); + } + + /** {@inheritDoc} */ @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { try { - return executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg).get(); + return executeAsync0(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg).get(); } catch (Exception e) { throw IgfsUtils.toIgfsException(e); @@ -1471,10 +1483,17 @@ public final class IgfsImpl implements IgfsEx { } /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + return createFuture(executeAsync0(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + } + + /** {@inheritDoc} */ @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { try { - return executeAsync(taskCls, rslvr, paths, arg).get(); + return executeAsync0(taskCls, rslvr, paths, arg).get(); } catch (Exception e) { throw IgfsUtils.toIgfsException(e); @@ -1482,17 +1501,30 @@ public final class IgfsImpl implements IgfsEx { } /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException { + return createFuture(executeAsync0(taskCls, rslvr, paths, arg)); + } + + /** {@inheritDoc} */ @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeSize, @Nullable T arg) { try { - return executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeSize, arg).get(); + return executeAsync0(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeSize, arg).get(); } catch (Exception e) { throw IgfsUtils.toIgfsException(e); } } + /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + return createFuture(executeAsync0(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + } + /** * Executes IGFS task asynchronously. * @@ -1502,9 +1534,9 @@ public final class IgfsImpl implements IgfsEx { * @param arg Optional task argument. * @return Execution future. */ - <T, R> IgniteInternalFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + <T, R> IgniteInternalFuture<R> executeAsync0(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { - return executeAsync(task, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); + return executeAsync0(task, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); } /** @@ -1521,7 +1553,7 @@ public final class IgfsImpl implements IgfsEx { * @param arg Optional task argument. * @return Execution future. */ - <T, R> IgniteInternalFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + <T, R> IgniteInternalFuture<R> executeAsync0(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { return igfsCtx.kernalContext().task().execute(task, new IgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr, skipNonExistentFiles, maxRangeLen, arg)); @@ -1536,9 +1568,9 @@ public final class IgfsImpl implements IgfsEx { * @param arg Optional task argument. * @return Execution future. */ - <T, R> IgniteInternalFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + <T, R> IgniteInternalFuture<R> executeAsync0(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { - return executeAsync(taskCls, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); + return executeAsync0(taskCls, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); } /** @@ -1555,7 +1587,7 @@ public final class IgfsImpl implements IgfsEx { * @return Execution future. */ @SuppressWarnings("unchecked") - <T, R> IgniteInternalFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + <T, R> IgniteInternalFuture<R> executeAsync0(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { return igfsCtx.kernalContext().task().execute((Class<IgfsTask<T, R>>)taskCls, @@ -1780,6 +1812,14 @@ public final class IgfsImpl implements IgfsEx { } /** + * @param fut Internal future. + * @return Public API future. + */ + private <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) { + return new IgniteFutureImpl<>(fut); + } + + /** * IGFS thread factory. */ @SuppressWarnings("NullableProblems") http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java index 396e784..5e785e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -33,7 +33,7 @@ import org.jetbrains.annotations.Nullable; /** * Abstract interop target. */ -public abstract class PlatformAbstractTarget implements PlatformTarget, PlatformAsyncTarget { +public abstract class PlatformAbstractTarget implements PlatformTarget { /** Constant: TRUE.*/ protected static final int TRUE = 1; @@ -73,16 +73,6 @@ public abstract class PlatformAbstractTarget implements PlatformTarget, Platform } /** {@inheritDoc} */ - @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { - throw new IgniteCheckedException("Future listening is not supported in " + getClass()); - } - - /** {@inheritDoc} */ - @Override @Nullable public PlatformFutureUtils.Writer futureWriter(int opId){ - return null; - } - - /** {@inheritDoc} */ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { return throwUnsupported(type); } @@ -203,18 +193,6 @@ public abstract class PlatformAbstractTarget implements PlatformTarget, Platform } /** - * Reads future information and listens. - * - * @param reader Reader. - * @throws IgniteCheckedException In case of error. - */ - protected long readAndListenFuture(BinaryRawReader reader) throws IgniteCheckedException { - readAndListenFuture(reader, currentFuture(), null); - - return TRUE; - } - - /** * Wraps a listenable to be returned to platform. * * @param listenable Listenable. http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java deleted file mode 100644 index a4d35c9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; -import org.jetbrains.annotations.Nullable; - -/** - * Async target. - */ -public interface PlatformAsyncTarget { - /** - * Gets future for the current operation. - * - * @return current future. - * @throws IgniteCheckedException If failed. - */ - IgniteInternalFuture currentFuture() throws IgniteCheckedException; - - /** - * Gets a custom future writer. - * - * @param opId Operation id. - * @return A custom writer for given op id. - */ - @Nullable PlatformFutureUtils.Writer futureWriter(int opId); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java index c2a0797..1ee57cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.platform; -import org.apache.ignite.IgniteCheckedException; import org.jetbrains.annotations.Nullable; /** @@ -106,27 +105,6 @@ public interface PlatformTargetProxy { void inStreamAsync(int type, long memPtr) throws Exception; /** - * Start listening for the future. - * - * @param futId Future ID. - * @param typ Result type. - * @throws IgniteCheckedException In case of failure. - */ - @SuppressWarnings("UnusedDeclaration") - void listenFuture(final long futId, int typ) throws Exception; - - /** - * Start listening for the future for specific operation type. - * - * @param futId Future ID. - * @param typ Result type. - * @param opId Operation ID required to pick correct result writer. - * @throws IgniteCheckedException In case of failure. - */ - @SuppressWarnings("UnusedDeclaration") - void listenFutureForOperation(final long futId, int typ, int opId) throws Exception; - - /** * Returns the underlying target. * * @return Underlying target. http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java index 7e0036d..44044b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java @@ -17,9 +17,7 @@ package org.apache.ignite.internal.processors.platform; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; @@ -37,6 +35,10 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy { /** Underlying target. */ private final PlatformTarget target; + /** + * @param target Platform target. + * @param platformCtx Platform context. + */ public PlatformTargetProxyImpl(PlatformTarget target, PlatformContext platformCtx) { assert platformCtx != null; assert target != null; @@ -115,15 +117,13 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy { final PlatformAsyncResult res = target.processInStreamAsync(type, reader); - if (res == null) { + if (res == null) throw new IgniteException("PlatformTarget.processInStreamAsync should not return null."); - } IgniteFuture fut = res.future(); - if (fut == null) { + if (fut == null) throw new IgniteException("PlatformAsyncResult.future() should not return null."); - } PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer() { /** {@inheritDoc} */ @@ -211,35 +211,11 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy { } /** {@inheritDoc} */ - @Override public void listenFuture(final long futId, int typ) throws Exception { - PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, target); - } - - /** {@inheritDoc} */ - @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception { - PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), target); - } - - /** {@inheritDoc} */ @Override public PlatformTarget unwrap() { return target; } /** - * @return Future writer. - */ - private PlatformFutureUtils.Writer futureWriter(int opId) { - return ((PlatformAsyncTarget)target).futureWriter(opId); - } - - /** - * @return Current future. - */ - private IgniteInternalFuture currentFuture() throws IgniteCheckedException { - return ((PlatformAsyncTarget)target).currentFuture(); - } - - /** * Wraps an object in a proxy when possible. * * @param obj Object to wrap. http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index 2abcc0d..72f5d62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -32,7 +32,6 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.TextQuery; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.cache.CacheOperationContext; @@ -56,7 +55,6 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure; import org.apache.ignite.internal.util.GridConcurrentFactory; -import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.lang.IgniteBiInClosure; @@ -334,9 +332,6 @@ public class PlatformCache extends PlatformAbstractTarget { /** Initial JCache (not in binary mode). */ private final IgniteCache rawCache; - /** Underlying JCache in async mode. */ - private final IgniteCache cacheAsync; - /** Whether this cache is created with "keepBinary" flag on the other side. */ private final boolean keepBinary; @@ -386,8 +381,9 @@ public class PlatformCache extends PlatformAbstractTarget { assert exts != null; rawCache = cache; + IgniteCache binCache = cache.withKeepBinary(); - cacheAsync = binCache.withAsync(); + this.cache = (IgniteCacheProxy)binCache; this.keepBinary = keepBinary; this.exts = exts; @@ -448,12 +444,12 @@ public class PlatformCache extends PlatformAbstractTarget { reader.readObjectDetached()) ? TRUE : FALSE; case OP_LOC_LOAD_CACHE: - loadCache0(reader, true, cache); + loadCache0(reader, true); return TRUE; case OP_LOAD_CACHE: - loadCache0(reader, false, cache); + loadCache0(reader, false); return TRUE; @@ -553,66 +549,66 @@ public class PlatformCache extends PlatformAbstractTarget { }); } - case OP_PUT_ASYNC: { - cacheAsync.put(reader.readObjectDetached(), reader.readObjectDetached()); + readAndListenFuture(reader, + cache.putAsync(reader.readObjectDetached(), reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; } case OP_CLEAR_CACHE_ASYNC: { - cacheAsync.clear(); + readAndListenFuture(reader, cache.clearAsync()); - return readAndListenFuture(reader); + return TRUE; } case OP_CLEAR_ALL_ASYNC: { - cacheAsync.clearAll(PlatformUtils.readSet(reader)); + readAndListenFuture(reader, cache.clearAllAsync(PlatformUtils.readSet(reader))); - return readAndListenFuture(reader); + return TRUE; } case OP_REMOVE_ALL2_ASYNC: { - cacheAsync.removeAll(); + readAndListenFuture(reader, cache.removeAllAsync()); - return readAndListenFuture(reader); + return TRUE; } case OP_SIZE_ASYNC: { CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt()); - cacheAsync.size(modes); + readAndListenFuture(reader, cache.sizeAsync(modes)); - return readAndListenFuture(reader); + return TRUE; } case OP_CLEAR_ASYNC: { - cacheAsync.clear(reader.readObjectDetached()); + readAndListenFuture(reader, cache.clearAsync(reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; } case OP_LOAD_CACHE_ASYNC: { - loadCache0(reader, false, cacheAsync); + readAndListenFuture(reader, loadCacheAsync0(reader, false)); - return readAndListenFuture(reader); + return TRUE; } case OP_LOC_LOAD_CACHE_ASYNC: { - loadCache0(reader, true, cacheAsync); + readAndListenFuture(reader, loadCacheAsync0(reader, true)); - return readAndListenFuture(reader); + return TRUE; } case OP_PUT_ALL_ASYNC: - cacheAsync.putAll(PlatformUtils.readMap(reader)); + readAndListenFuture(reader, cache.putAllAsync(PlatformUtils.readMap(reader))); - return readAndListenFuture(reader); + return TRUE; case OP_REMOVE_ALL_ASYNC: - cacheAsync.removeAll(PlatformUtils.readSet(reader)); + readAndListenFuture(reader, cache.removeAllAsync(PlatformUtils.readSet(reader))); - return readAndListenFuture(reader); + return TRUE; case OP_REBALANCE: readAndListenFuture(reader, cache.rebalance()); @@ -620,79 +616,81 @@ public class PlatformCache extends PlatformAbstractTarget { return TRUE; case OP_GET_ASYNC: - cacheAsync.get(reader.readObjectDetached()); + readAndListenFuture(reader, cache.getAsync(reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; case OP_CONTAINS_KEY_ASYNC: - cacheAsync.containsKey(reader.readObjectDetached()); + readAndListenFuture(reader, cache.containsKeyAsync(reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; case OP_CONTAINS_KEYS_ASYNC: - cacheAsync.containsKeys(PlatformUtils.readSet(reader)); + readAndListenFuture(reader, cache.containsKeysAsync(PlatformUtils.readSet(reader))); - return readAndListenFuture(reader); + return TRUE; case OP_REMOVE_OBJ_ASYNC: - cacheAsync.remove(reader.readObjectDetached()); + readAndListenFuture(reader, cache.removeAsync(reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; case OP_REMOVE_BOOL_ASYNC: - cacheAsync.remove(reader.readObjectDetached(), reader.readObjectDetached()); + readAndListenFuture(reader, + cache.removeAsync(reader.readObjectDetached(), reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; case OP_GET_ALL_ASYNC: { Set keys = PlatformUtils.readSet(reader); - cacheAsync.getAll(keys); - - readAndListenFuture(reader, cacheAsync.future(), WRITER_GET_ALL); + readAndListenFuture(reader, cache.getAllAsync(keys), WRITER_GET_ALL); return TRUE; } case OP_GET_AND_PUT_ASYNC: - cacheAsync.getAndPut(reader.readObjectDetached(), reader.readObjectDetached()); + readAndListenFuture(reader, + cache.getAndPutAsync(reader.readObjectDetached(), reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; case OP_GET_AND_PUT_IF_ABSENT_ASYNC: - cacheAsync.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()); + readAndListenFuture(reader, + cache.getAndPutIfAbsentAsync(reader.readObjectDetached(), reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; case OP_GET_AND_REMOVE_ASYNC: - cacheAsync.getAndRemove(reader.readObjectDetached()); + readAndListenFuture(reader, cache.getAndRemoveAsync(reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; case OP_GET_AND_REPLACE_ASYNC: - cacheAsync.getAndReplace(reader.readObjectDetached(), reader.readObjectDetached()); + readAndListenFuture(reader, + cache.getAndReplaceAsync(reader.readObjectDetached(), reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; case OP_REPLACE_2_ASYNC: - cacheAsync.replace(reader.readObjectDetached(), reader.readObjectDetached()); + readAndListenFuture(reader, + cache.replaceAsync(reader.readObjectDetached(), reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; case OP_REPLACE_3_ASYNC: - cacheAsync.replace(reader.readObjectDetached(), reader.readObjectDetached(), - reader.readObjectDetached()); + readAndListenFuture(reader, + cache.replaceAsync(reader.readObjectDetached(), reader.readObjectDetached(), + reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; case OP_INVOKE_ASYNC: { Object key = reader.readObjectDetached(); CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - cacheAsync.invoke(key, proc); - - readAndListenFuture(reader, cacheAsync.future(), WRITER_INVOKE); + readAndListenFuture(reader, cache.invokeAsync(key, proc), WRITER_INVOKE); return TRUE; } @@ -702,17 +700,16 @@ public class PlatformCache extends PlatformAbstractTarget { CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - cacheAsync.invokeAll(keys, proc); - - readAndListenFuture(reader, cacheAsync.future(), WRITER_INVOKE_ALL); + readAndListenFuture(reader, cache.invokeAllAsync(keys, proc), WRITER_INVOKE_ALL); return TRUE; } case OP_PUT_IF_ABSENT_ASYNC: - cacheAsync.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()); + readAndListenFuture(reader, + cache.putIfAbsentAsync(reader.readObjectDetached(), reader.readObjectDetached())); - return readAndListenFuture(reader); + return TRUE; case OP_INVOKE: { Object key = reader.readObjectDetached(); @@ -807,8 +804,45 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Loads cache via localLoadCache or loadCache. + * + * @param reader Binary reader. + * @param loc Local flag. + * @return Cache async operation future. */ - private void loadCache0(BinaryRawReaderEx reader, boolean loc, IgniteCache cache) { + private void loadCache0(BinaryRawReaderEx reader, boolean loc) { + PlatformCacheEntryFilter filter = createPlatformCacheEntryFilter(reader); + + Object[] args = readLoadCacheArgs(reader); + + if (loc) + cache.localLoadCache(filter, args); + else + cache.loadCache(filter, args); + } + + /** + * Asynchronously loads cache via localLoadCacheAsync or loadCacheAsync. + * + * @param reader Binary reader. + * @param loc Local flag. + * @return Cache async operation future. + */ + private IgniteFuture<Void> loadCacheAsync0(BinaryRawReaderEx reader, boolean loc) { + PlatformCacheEntryFilter filter = createPlatformCacheEntryFilter(reader); + + Object[] args = readLoadCacheArgs(reader); + + if (loc) + return cache.localLoadCacheAsync(filter, args); + else + return cache.loadCacheAsync(filter, args); + } + + /** + * @param reader Binary reader. + * @return created object. + */ + @Nullable private PlatformCacheEntryFilter createPlatformCacheEntryFilter(BinaryRawReaderEx reader) { PlatformCacheEntryFilter filter = null; Object pred = reader.readObjectDetached(); @@ -816,6 +850,14 @@ public class PlatformCache extends PlatformAbstractTarget { if (pred != null) filter = platformCtx.createCacheEntryFilter(pred, 0); + return filter; + } + + /** + * @param reader Binary reader. + * @return Arguments array. + */ + @Nullable private Object[] readLoadCacheArgs(BinaryRawReaderEx reader) { Object[] args = null; int argCnt = reader.readInt(); @@ -827,10 +869,7 @@ public class PlatformCache extends PlatformAbstractTarget { args[i] = reader.readObjectDetached(); } - if (loc) - cache.localLoadCache(filter, args); - else - cache.loadCache(filter, args); + return args; } /** {@inheritDoc} */ @@ -1130,25 +1169,6 @@ public class PlatformCache extends PlatformAbstractTarget { } } - /** <inheritDoc /> */ - @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { - return ((IgniteFutureImpl) cacheAsync.future()).internalFuture(); - } - - /** <inheritDoc /> */ - @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) { - if (opId == OP_GET_ALL) - return WRITER_GET_ALL; - - if (opId == OP_INVOKE) - return WRITER_INVOKE; - - if (opId == OP_INVOKE_ALL) - return WRITER_INVOKE_ALL; - - return null; - } - /** * Get lock by id. * @@ -1179,6 +1199,10 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Runs specified query. + * + * @param qry Query. + * @return Query cursor. + * @throws IgniteCheckedException On error. */ private PlatformQueryCursor runQuery(Query qry) throws IgniteCheckedException { @@ -1195,6 +1219,10 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Runs specified fields query. + * + * @param qry Query. + * @return Query cursor. + * @throws IgniteCheckedException On error. */ private PlatformFieldsQueryCursor runFieldsQuery(Query qry) throws IgniteCheckedException { @@ -1211,6 +1239,10 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Reads the query of specified type. + * + * @param reader Binary reader. + * @return Query. + * @throws IgniteCheckedException On error. */ private Query readInitialQuery(BinaryRawReaderEx reader) throws IgniteCheckedException { int typ = reader.readInt(); @@ -1234,6 +1266,9 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Reads sql query. + * + * @param reader Binary reader. + * @return Query. */ private Query readSqlQuery(BinaryRawReaderEx reader) { boolean loc = reader.readBoolean(); @@ -1250,6 +1285,9 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Reads fields query. + * + * @param reader Binary reader. + * @return Query. */ private Query readFieldsQuery(BinaryRawReaderEx reader) { boolean loc = reader.readBoolean(); @@ -1267,6 +1305,9 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Reads text query. + * + * @param reader Binary reader. + * @return Query. */ private Query readTextQuery(BinaryRawReader reader) { boolean loc = reader.readBoolean(); @@ -1279,6 +1320,9 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Reads scan query. + * + * @param reader Binary reader. + * @return Query. */ private Query readScanQuery(BinaryRawReaderEx reader) { boolean loc = reader.readBoolean(); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java index 9d9a4d2..2b2a78a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java @@ -240,9 +240,10 @@ public class PlatformCompute extends PlatformAbstractTarget { * Execute task. * * @param task Task. + * @return Target. */ private PlatformTarget executeNative0(final PlatformAbstractTask task) { - IgniteInternalFuture fut = computeForPlatform.executeAsync(task, null); + IgniteInternalFuture fut = computeForPlatform.executeAsync0(task, null); fut.listen(new IgniteInClosure<IgniteInternalFuture>() { private static final long serialVersionUID = 0L; @@ -266,7 +267,9 @@ public class PlatformCompute extends PlatformAbstractTarget { * Execute task taking arguments from the given reader. * * @param reader Reader. + * @param async Execute asynchronously flag. * @return Task result. + * @throws IgniteCheckedException On error. */ protected Object executeJavaTask(BinaryRawReaderEx reader, boolean async) throws IgniteCheckedException { String taskName = reader.readString(); @@ -277,18 +280,13 @@ public class PlatformCompute extends PlatformAbstractTarget { IgniteCompute compute0 = computeForTask(nodeIds); - if (async) - compute0 = compute0.withAsync(); - if (!keepBinary && arg instanceof BinaryObjectImpl) arg = ((BinaryObject)arg).deserialize(); - Object res = compute0.execute(taskName, arg); - if (async) - return readAndListenFuture(reader, new ComputeConvertingFuture(compute0.future())); + return readAndListenFuture(reader, new ComputeConvertingFuture(compute0.executeAsync(taskName, arg))); else - return toBinary(res); + return toBinary(compute0.execute(taskName, arg)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java index d4755de..cb27b19 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.platform.entityframework; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; @@ -187,11 +186,10 @@ public class PlatformDotNetEntityFrameworkCacheExtension implements PlatformCach final ClusterGroup dataNodes = grid.cluster().forDataNodes(dataCacheName); - IgniteCompute asyncCompute = grid.compute(dataNodes).withAsync(); + IgniteFuture f = grid.compute(dataNodes).broadcastAsync( + new RemoveOldEntriesRunnable(dataCacheName, currentVersions)); - asyncCompute.broadcast(new RemoveOldEntriesRunnable(dataCacheName, currentVersions)); - - asyncCompute.future().listen(new CleanupCompletionListener(metaCache, dataCacheName)); + f.listen(new CleanupCompletionListener(metaCache, dataCacheName)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java index 9ddcc37..845c06a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java @@ -17,11 +17,11 @@ package org.apache.ignite.internal.processors.platform.events; +import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteEvents; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventAdapter; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; @@ -29,8 +29,8 @@ import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; -import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -91,9 +91,6 @@ public class PlatformEvents extends PlatformAbstractTarget { private final IgniteEvents events; /** */ - private final IgniteEvents eventsAsync; - - /** */ private final EventResultWriter eventResWriter; /** */ @@ -111,7 +108,6 @@ public class PlatformEvents extends PlatformAbstractTarget { assert events != null; this.events = events; - eventsAsync = events.withAsync(); eventResWriter = new EventResultWriter(platformCtx); eventColResWriter = new EventCollectionResultWriter(platformCtx); @@ -148,16 +144,12 @@ public class PlatformEvents extends PlatformAbstractTarget { return TRUE; case OP_REMOTE_QUERY_ASYNC: - startRemoteQuery(reader, eventsAsync); - - readAndListenFuture(reader, currentFuture(), eventColResWriter); + readAndListenFuture(reader, startRemoteQueryAsync(reader, events), eventColResWriter); return TRUE; case OP_WAIT_FOR_LOCAL_ASYNC: { - startWaitForLocal(reader, eventsAsync); - - readAndListenFuture(reader, currentFuture(), eventResWriter); + readAndListenFuture(reader, startWaitForLocalAsync(reader, events), eventResWriter); return TRUE; } @@ -253,6 +245,23 @@ public class PlatformEvents extends PlatformAbstractTarget { } /** + * Starts the waitForLocal asynchronously. + * + * @param reader Reader + * @param events Events. + * @return Result. + */ + private IgniteFuture<EventAdapter> startWaitForLocalAsync(BinaryRawReaderEx reader, IgniteEvents events) { + Long filterHnd = reader.readObject(); + + IgnitePredicate filter = filterHnd != null ? localFilter(filterHnd) : null; + + int[] eventTypes = readEventTypes(reader); + + return events.waitForLocalAsync(filter, eventTypes); + } + + /** * Starts the remote query. * * @param reader Reader. @@ -271,6 +280,25 @@ public class PlatformEvents extends PlatformAbstractTarget { return events.remoteQuery(filter, timeout); } + /** + * Starts the remote query asynchronously. + * + * @param reader Reader. + * @param events Events. + * @return Result. + */ + private IgniteFuture<List<Event>> startRemoteQueryAsync(BinaryRawReaderEx reader, IgniteEvents events) { + Object pred = reader.readObjectDetached(); + + long timeout = reader.readLong(); + + int[] types = readEventTypes(reader); + + PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types); + + return events.remoteQueryAsync(filter, timeout); + } + /** {@inheritDoc} */ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { @@ -310,24 +338,6 @@ public class PlatformEvents extends PlatformAbstractTarget { return super.processInLongOutLong(type, val); } - /** {@inheritDoc} */ - @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { - return ((IgniteFutureImpl)eventsAsync.future()).internalFuture(); - } - - /** {@inheritDoc} */ - @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) { - switch (opId) { - case OP_WAIT_FOR_LOCAL: - return eventResWriter; - - case OP_REMOTE_QUERY: - return eventColResWriter; - } - - return null; - } - /** * Reads event types array. * http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java index 6fe109e..8018986 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.platform.messaging; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteMessaging; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; @@ -27,7 +26,7 @@ import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.lang.IgniteFuture; import java.util.UUID; @@ -68,9 +67,6 @@ public class PlatformMessaging extends PlatformAbstractTarget { /** */ private final IgniteMessaging messaging; - /** */ - private final IgniteMessaging messagingAsync; - /** * Ctor. * @@ -83,7 +79,6 @@ public class PlatformMessaging extends PlatformAbstractTarget { assert messaging != null; this.messaging = messaging; - messagingAsync = messaging.withAsync(); } /** {@inheritDoc} */ @@ -132,15 +127,15 @@ public class PlatformMessaging extends PlatformAbstractTarget { } case OP_REMOTE_LISTEN_ASYNC: { - startRemoteListen(reader, messagingAsync); + readAndListenFuture(reader, startRemoteListenAsync(reader, messaging)); - return readAndListenFuture(reader); + return TRUE; } case OP_STOP_REMOTE_LISTEN_ASYNC: { - messagingAsync.stopRemoteListen(reader.readUuid()); + readAndListenFuture(reader, messaging.stopRemoteListenAsync(reader.readUuid())); - return readAndListenFuture(reader); + return TRUE; } default: @@ -167,6 +162,7 @@ public class PlatformMessaging extends PlatformAbstractTarget { /** * Starts the remote listener. * @param reader Reader. + * @param messaging Messaging. * @return Listen id. */ private UUID startRemoteListen(BinaryRawReaderEx reader, IgniteMessaging messaging) { @@ -181,9 +177,22 @@ public class PlatformMessaging extends PlatformAbstractTarget { return messaging.remoteListen(topic, filter); } - /** {@inheritDoc} */ - @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { - return ((IgniteFutureImpl)messagingAsync.future()).internalFuture(); + /** + * Starts the remote listener. + * @param reader Reader. + * @param messaging Messaging. + * @return Future of the operation. + */ + private IgniteFuture<UUID> startRemoteListenAsync(BinaryRawReaderEx reader, IgniteMessaging messaging) { + Object nativeFilter = reader.readObjectDetached(); + + long ptr = reader.readLong(); // interop pointer + + Object topic = reader.readObjectDetached(); + + PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr); + + return messaging.remoteListenAsync(topic, filter); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java index 37727f5..827bc5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.platform.services; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteServices; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; @@ -32,8 +31,8 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformWriterBiClosure; import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure; import org.apache.ignite.internal.processors.service.GridServiceProxy; -import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; @@ -46,6 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import org.jetbrains.annotations.NotNull; /** * Interop services. @@ -107,9 +107,6 @@ public class PlatformServices extends PlatformAbstractTarget { /** */ private final IgniteServices services; - /** */ - private final IgniteServices servicesAsync; - /** Server keep binary flag. */ private final boolean srvKeepBinary; @@ -126,7 +123,6 @@ public class PlatformServices extends PlatformAbstractTarget { assert services != null; this.services = services; - servicesAsync = services.withAsync(); this.srvKeepBinary = srvKeepBinary; } @@ -155,21 +151,21 @@ public class PlatformServices extends PlatformAbstractTarget { } case OP_DOTNET_DEPLOY_ASYNC: { - dotnetDeploy(reader, servicesAsync); + readAndListenFuture(reader, dotnetDeployAsync(reader, services)); - return readAndListenFuture(reader); + return TRUE; } case OP_DOTNET_DEPLOY_MULTIPLE: { - dotnetDeployMultiple(reader, services); + dotnetDeployMultiple(reader); return TRUE; } case OP_DOTNET_DEPLOY_MULTIPLE_ASYNC: { - dotnetDeployMultiple(reader, servicesAsync); + readAndListenFuture(reader, dotnetDeployMultipleAsync(reader)); - return readAndListenFuture(reader); + return TRUE; } case OP_CANCEL: { @@ -179,15 +175,15 @@ public class PlatformServices extends PlatformAbstractTarget { } case OP_CANCEL_ASYNC: { - servicesAsync.cancel(reader.readString()); + readAndListenFuture(reader, services.cancelAsync(reader.readString())); - return readAndListenFuture(reader); + return TRUE; } case OP_CANCEL_ALL_ASYNC: { - servicesAsync.cancelAll(); + readAndListenFuture(reader, services.cancelAllAsync()); - return readAndListenFuture(reader); + return TRUE; } default: @@ -350,15 +346,12 @@ public class PlatformServices extends PlatformAbstractTarget { return super.processInStreamOutObject(type, reader); } - /** {@inheritDoc} */ - @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { - return ((IgniteFutureImpl)servicesAsync.future()).internalFuture(); - } - /** * Deploys multiple dotnet services. + * + * @param reader Binary reader. */ - private void dotnetDeployMultiple(BinaryRawReaderEx reader, IgniteServices services) { + private void dotnetDeployMultiple(BinaryRawReaderEx reader) { String name = reader.readString(); Object svc = reader.readObjectDetached(); int totalCnt = reader.readInt(); @@ -369,9 +362,53 @@ public class PlatformServices extends PlatformAbstractTarget { } /** + * Asynchronously deploys multiple dotnet services. + * + * @param reader Binary reader. + * @return Future of the operation. + */ + private IgniteFuture<Void> dotnetDeployMultipleAsync(BinaryRawReaderEx reader) { + String name = reader.readString(); + Object svc = reader.readObjectDetached(); + int totalCnt = reader.readInt(); + int maxPerNodeCnt = reader.readInt(); + + return services.deployMultipleAsync(name, new PlatformDotNetServiceImpl(svc, platformCtx, srvKeepBinary), + totalCnt, maxPerNodeCnt); + } + + /** * Deploys dotnet service. + * + * @param reader Binary reader. + * @param services Services. */ private void dotnetDeploy(BinaryRawReaderEx reader, IgniteServices services) { + ServiceConfiguration cfg = dotnetConfiguration(reader); + + services.deploy(cfg); + } + + /** + * Deploys dotnet service asynchronously. + * + * @param reader Binary reader. + * @param services Services. + * @return Future of the operation. + */ + private IgniteFuture<Void> dotnetDeployAsync(BinaryRawReaderEx reader, IgniteServices services) { + ServiceConfiguration cfg = dotnetConfiguration(reader); + + return services.deployAsync(cfg); + } + + /** + * Read the dotnet service configuration. + * + * @param reader Binary reader, + * @return Service configuration. + */ + @NotNull private ServiceConfiguration dotnetConfiguration(BinaryRawReaderEx reader) { ServiceConfiguration cfg = new ServiceConfiguration(); cfg.setName(reader.readString()); @@ -386,7 +423,7 @@ public class PlatformServices extends PlatformAbstractTarget { if (filter != null) cfg.setNodeFilter(platformCtx.createClusterNodeFilter(filter)); - services.deploy(cfg); + return cfg; } /** @@ -403,8 +440,8 @@ public class PlatformServices extends PlatformAbstractTarget { /** */ private static final Map<Class<?>, Class<?>> PRIMITIVES_TO_WRAPPERS = new HashMap<>(); - /** - * Class initializer. + /* + Class initializer. */ static { PRIMITIVES_TO_WRAPPERS.put(boolean.class, Boolean.class); @@ -422,6 +459,7 @@ public class PlatformServices extends PlatformAbstractTarget { * * @param proxy Proxy object. * @param clazz Proxy class. + * @param ctx Platform context. */ private ServiceProxyHolder(Object proxy, Class clazz, PlatformContext ctx) { super(ctx); @@ -435,18 +473,18 @@ public class PlatformServices extends PlatformAbstractTarget { /** * Invokes the proxy. + * * @param mthdName Method name. * @param srvKeepBinary Binary flag. * @param args Args. * @return Invocation result. - * @throws IgniteCheckedException - * @throws NoSuchMethodException + * @throws IgniteCheckedException On error. + * @throws NoSuchMethodException On error. */ public Object invoke(String mthdName, boolean srvKeepBinary, Object[] args) throws IgniteCheckedException, NoSuchMethodException { - if (proxy instanceof PlatformService) { + if (proxy instanceof PlatformService) return ((PlatformService)proxy).invokeMethod(mthdName, srvKeepBinary, args); - } else { assert proxy instanceof GridServiceProxy; @@ -467,6 +505,7 @@ public class PlatformServices extends PlatformAbstractTarget { * @param mthdName Name. * @param args Args. * @return Method. + * @throws NoSuchMethodException On error. */ private static Method getMethod(Class clazz, String mthdName, Object[] args) throws NoSuchMethodException { assert clazz != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java index 21f71fa..8f34343 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java @@ -196,17 +196,16 @@ public class PlatformTransactions extends PlatformAbstractTarget { @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { long txId = reader.readLong(); - final Transaction asyncTx = (Transaction)tx(txId).withAsync(); + IgniteFuture fut0; switch (type) { case OP_COMMIT_ASYNC: - asyncTx.commit(); + fut0 = tx(txId).commitAsync(); break; - case OP_ROLLBACK_ASYNC: - asyncTx.rollback(); + fut0 = tx(txId).rollbackAsync(); break; @@ -215,7 +214,7 @@ public class PlatformTransactions extends PlatformAbstractTarget { } // Future result is the tx itself, we do not want to return it to the platform. - IgniteFuture fut = asyncTx.future().chain(new C1<IgniteFuture, Object>() { + IgniteFuture fut = fut0.chain(new C1<IgniteFuture, Object>() { private static final long serialVersionUID = 0L; @Override public Object apply(IgniteFuture fut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java index ce74f17..7556e7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java @@ -112,26 +112,20 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple< futs = new IgniteFuture[3]; if (futs[0] == null || futs[1] == null || futs[2] == null) { - IgniteCache cache = ignite.cache(cacheName).withAsync(); + IgniteCache cache = ignite.cache(cacheName); if (futs[0] == null) { - cache.size(CachePeekMode.PRIMARY); - - if (callAsync(cache.<Integer>future(), 0)) + if (callAsync(cache.sizeAsync(CachePeekMode.PRIMARY), 0)) return null; } if (futs[1] == null) { - cache.clear(); - - if (callAsync(cache.<Integer>future(), 1)) + if (callAsync(cache.clearAsync(), 1)) return null; } if (futs[2] == null) { - cache.size(CachePeekMode.PRIMARY); - - if (callAsync(cache.<Integer>future(), 2)) + if (callAsync(cache.sizeAsync(CachePeekMode.PRIMARY), 2)) return null; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java index a64ec6d..8f42eb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java @@ -370,11 +370,9 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> { } } - IgniteCompute comp = ignite.compute(ignite.cluster().forNodeIds(nids)).withAsync(); - - comp.execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false)); + IgniteCompute comp = ignite.compute(ignite.cluster().forNodeIds(nids)); - fut = comp.future(); + fut = comp.executeAsync(taskName, new VisorTaskArgument<>(nids, jobArgs, false)); fut.listen(new CI1<IgniteFuture<Object>>() { @Override public void apply(IgniteFuture<Object> f) { http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java index 50a8700..3e31b51 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java @@ -18,25 +18,75 @@ package org.apache.ignite.lang; /** - * Allows to enable asynchronous mode on Ignite APIs. + * Allows to enable asynchronous mode on Ignite APIs, e.g. + * <pre> + * IgniteFuture f = cache.getAsync(); + * </pre> + * instead of old-style async API: + * <pre> + * IgniteCache asyncCache = cache.withAsync(); + * asyncCache.get(key); + * IgniteFuture fut = asyncCache.future(); + * </pre> + * @deprecated since 2.0. Please use specialized asynchronous methods. */ +@Deprecated public interface IgniteAsyncSupport { /** * Gets instance of this component with asynchronous mode enabled. * * @return Instance of this component with asynchronous mode enabled. + * + * @deprecated since 2.0. Please use new specialized async method + * e.g. + * <pre> + * IgniteFuture f = cache.getAsync(); + * </pre> + * instead of old-style async API: + * <pre> + * IgniteCache asyncCache = cache.withAsync(); + * asyncCache.get(key); + * IgniteFuture fut = asyncCache.future(); + * </pre> */ + @Deprecated public IgniteAsyncSupport withAsync(); /** * @return {@code True} if asynchronous mode is enabled. + * + * @deprecated since 2.0. Please use new specialized async method + * e.g. + * <pre> + * IgniteFuture f = cache.getAsync(); + * </pre> + * instead of old-style async API: + * <pre> + * IgniteCache asyncCache = cache.withAsync(); + * asyncCache.get(key); + * IgniteFuture fut = asyncCache.future(); + * </pre> */ + @Deprecated public boolean isAsync(); /** * Gets and resets future for previous asynchronous operation. * * @return Future for previous asynchronous operation. + * + * @deprecated since 2.0. Please use new specialized async method + * e.g. + * <pre> + * IgniteFuture f = cache.getAsync(); + * </pre> + * instead of old-style async API: + * <pre> + * IgniteCache asyncCache = cache.withAsync(); + * asyncCache.get(key); + * IgniteFuture fut = asyncCache.future(); + * </pre> */ + @Deprecated public <R> IgniteFuture<R> future(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java index 1bb7162..2dfea51 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java @@ -31,11 +31,13 @@ import java.lang.annotation.Target; * * TODO coding example. * + * @deprecated since 2.0. Please use specialized asynchronous methods. * @see IgniteAsyncSupport */ @Documented @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) +@Deprecated public @interface IgniteAsyncSupported { - + // No-op. } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java index e2e7100..57a2b00 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java @@ -22,6 +22,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteUuid; /** @@ -237,6 +238,19 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport { public void commit() throws IgniteException; /** + * Asynchronously commits this transaction by initiating {@code two-phase-commit} process. + * + * @return a Future representing pending completion of the commit. + * @throws IgniteException If commit failed. + * @throws TransactionTimeoutException If transaction is timed out. + * @throws TransactionRollbackException If transaction is automatically rolled back. + * @throws TransactionOptimisticException If transaction concurrency is {@link TransactionConcurrency#OPTIMISTIC} + * and commit is optimistically failed. + * @throws TransactionHeuristicException If transaction has entered an unknown state. + */ + public IgniteFuture<Void> commitAsync() throws IgniteException; + + /** * Ends the transaction. Transaction will be rolled back if it has not been committed. * * @throws IgniteException If transaction could not be gracefully ended. @@ -250,4 +264,12 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport { */ @IgniteAsyncSupported public void rollback() throws IgniteException; + + /** + * Asynchronously rolls back this transaction. + * + * @return a Future representing pending completion of the rollback. + * @throws IgniteException If rollback failed. + */ + public IgniteFuture<Void> rollbackAsync() throws IgniteException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java index 4d94400..722e37f 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java @@ -102,13 +102,8 @@ public class IgniteCacheExpiryStoreLoadSelfTest extends GridCacheAbstractSelfTes keys.add(primaryKey(jcache(1))); keys.add(primaryKey(jcache(2))); - if (async) { - IgniteCache<String, Integer> asyncCache = cache.withAsync(); - - asyncCache.loadCache(null, keys.toArray(new Integer[3])); - - asyncCache.future().get(); - } + if (async) + cache.loadCacheAsync(null, keys.toArray(new Integer[3])).get(); else cache.loadCache(null, keys.toArray(new Integer[3])); @@ -143,13 +138,8 @@ public class IgniteCacheExpiryStoreLoadSelfTest extends GridCacheAbstractSelfTes List<Integer> keys = primaryKeys(cache, 3); - if (async) { - IgniteCache<String, Integer> asyncCache = cache.withAsync(); - - asyncCache.localLoadCache(null, keys.toArray(new Integer[3])); - - asyncCache.future().get(); - } + if (async) + cache.localLoadCacheAsync(null, keys.toArray(new Integer[3])).get(); else cache.localLoadCache(null, keys.toArray(new Integer[3])); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java index 6fdaeb0..fbf938d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java @@ -33,7 +33,6 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCluster; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; @@ -356,11 +355,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im * @throws Exception If failed. */ private void run1(AtomicInteger cnt) throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - - comp.broadcast(runJob); - - ComputeTaskFuture fut = comp.future(); + IgniteFuture<Void> fut = compute(prj).broadcastAsync(runJob); waitForExecution(fut); @@ -378,11 +373,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im private void run2(AtomicInteger cnt) throws Exception { Collection<IgniteRunnable> jobs = F.asList(runJob); - IgniteCompute comp = compute(prj).withAsync(); - - comp.run(jobs); - - ComputeTaskFuture fut = comp.future(); + IgniteFuture<Void> fut = compute(prj).runAsync(jobs); waitForExecution(fut); @@ -398,11 +389,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im * @throws Exception If failed. */ private void call1(AtomicInteger cnt) throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - - comp.broadcast(calJob); - - ComputeTaskFuture fut = comp.future(); + IgniteFuture<Collection<String>> fut = compute(prj).broadcastAsync(calJob); waitForExecution(fut); @@ -418,13 +405,9 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im * @throws Exception If failed. */ private void call2(AtomicInteger cnt) throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - Collection<IgniteCallable<String>> jobs = F.asList(calJob); - comp.call(jobs); - - ComputeTaskFuture fut = comp.future(); + IgniteFuture<Collection<String>> fut = compute(prj).callAsync(jobs); waitForExecution(fut); @@ -440,11 +423,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im * @throws Exception If failed. */ private void call3(AtomicInteger cnt) throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - - comp.apply(clrJob, (String) null); - - ComputeTaskFuture fut = comp.future(); + IgniteFuture<String> fut = compute(prj).applyAsync(clrJob, (String) null); waitForExecution(fut); @@ -462,11 +441,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im private void call4(AtomicInteger cnt) throws Exception { Collection<String> args = F.asList("a", "b", "c"); - IgniteCompute comp = compute(prj).withAsync(); - - comp.apply(clrJob, args); - - ComputeTaskFuture fut = comp.future(); + IgniteFuture<Collection<String>> fut = compute(prj).applyAsync(clrJob, args); waitForExecution(fut); @@ -482,11 +457,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im * @throws Exception If failed. */ private void call5(AtomicInteger cnt) throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - - comp.broadcast(new TestClosure(), "arg"); - - ComputeTaskFuture<Collection<String>> fut = comp.future(); + IgniteFuture<Collection<String>> fut = compute(prj).broadcastAsync(new TestClosure(), "arg"); waitForExecution(fut); @@ -509,11 +480,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im private void forkjoin1(AtomicInteger cnt) throws Exception { Collection<String> args = F.asList("a", "b", "c"); - IgniteCompute comp = compute(prj).withAsync(); - - comp.apply(clrJob, args, rdc); - - ComputeTaskFuture fut = comp.future(); + IgniteFuture fut = compute(prj).applyAsync(clrJob, args, rdc); waitForExecution(fut); @@ -531,11 +498,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im private void forkjoin2(AtomicInteger cnt) throws Exception { Collection<IgniteCallable<String>> jobs = F.asList(calJob); - IgniteCompute comp = compute(prj).withAsync(); - - comp.call(jobs, rdc); - - ComputeTaskFuture fut = comp.future(); + IgniteFuture<Object> fut = compute(prj).callAsync(jobs, rdc); waitForExecution(fut); @@ -676,26 +639,22 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im * @throws Exception If test failed. */ private void checkActiveFutures() throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - - assertEquals(0, comp.activeTaskFutures().size()); + assertEquals(0, compute(prj).activeTaskFutures().size()); cnt.set(0); - Collection<ComputeTaskFuture<Object>> futsList = new ArrayList<>(); + Collection<IgniteFuture<Object>> futsList = new ArrayList<>(); for (int i = 0; i < 10; i++) { - comp.call(new TestWaitCallable<>()); - - ComputeTaskFuture<Object> fut = comp.future(); + IgniteFuture<Object> fut = compute(prj).callAsync(new TestWaitCallable<>()); assertFalse(fut.isDone()); - Map<IgniteUuid, ComputeTaskFuture<Object>> futs = comp.activeTaskFutures(); + Map<IgniteUuid, ComputeTaskFuture<Object>> futs = compute(prj).activeTaskFutures(); assertEquals(i + 1, futs.size()); - assertTrue(futs.containsKey(fut.getTaskSession().getId())); + assertTrue(futs.containsKey(((ComputeTaskFuture)fut).getTaskSession().getId())); futsList.add(fut); } @@ -706,10 +665,10 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im mux.notifyAll(); } - for (ComputeTaskFuture<Object> fut : futsList) + for (IgniteFuture<Object> fut : futsList) fut.get(); - assertEquals(0, comp.activeTaskFutures().size()); + assertEquals(0, compute(prj).activeTaskFutures().size()); } /**
