Repository: ignite Updated Branches: refs/heads/ignite-1537 95b51335f -> 7fd645373
ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7fd64537 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7fd64537 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7fd64537 Branch: refs/heads/ignite-1537 Commit: 7fd645373b9caa51dbfd0fdd82d29c6bfa12a62f Parents: 95b5133 Author: sboikov <[email protected]> Authored: Tue Dec 22 10:24:42 2015 +0300 Committer: sboikov <[email protected]> Committed: Tue Dec 22 10:24:42 2015 +0300 ---------------------------------------------------------------------- .../internal/processors/cache/GridCacheAdapter.java | 12 ++++++++++-- .../processors/datastreamer/DataStreamProcessor.java | 12 +++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7fd64537/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 9bb3f55..1d097b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2112,8 +2112,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - IgniteInternalFuture<GridCacheReturn> fut = - tx.invokeAsync(ctx, waitTopFut, key, (EntryProcessor<K, V, Object>)entryProcessor, args); + assert !waitTopFut || tx.implicit(); + + if (!waitTopFut) + tx.topologyVersion(ctx.shared().exchange().readyAffinityVersion()); + + IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, + waitTopFut, + key, + (EntryProcessor<K, V, Object>)entryProcessor, + args); Map<K, EntryProcessorResult<T>> resMap = fut.get().value(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7fd64537/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index a2aab77..32f2ff5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -62,6 +62,9 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { /** Marshaller. */ private final Marshaller marsh; + /** */ + private byte[] marshErrBytes; + /** * @param ctx Kernal context. */ @@ -86,6 +89,9 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { if (ctx.config().isDaemon()) return; + marshErrBytes = marsh.marshal(new IgniteCheckedException("Failed to marshal response error, " + + "see log for details.")); + flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) { @Override protected void body() throws InterruptedException { while (!isCancelled()) { @@ -324,10 +330,10 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { try { errBytes = err != null ? marsh.marshal(err) : null; } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal message.", e); + catch (Exception e) { + U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e); - return; + errBytes = marshErrBytes; } DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
