Repository: ignite
Updated Branches:
  refs/heads/ignite-1537 95b51335f -> 7fd645373


ignite-1.5 Fixed hang on metadata update inside put in atomic cache when 
topology read lock is held.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7fd64537
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7fd64537
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7fd64537

Branch: refs/heads/ignite-1537
Commit: 7fd645373b9caa51dbfd0fdd82d29c6bfa12a62f
Parents: 95b5133
Author: sboikov <[email protected]>
Authored: Tue Dec 22 10:24:42 2015 +0300
Committer: sboikov <[email protected]>
Committed: Tue Dec 22 10:24:42 2015 +0300

----------------------------------------------------------------------
 .../internal/processors/cache/GridCacheAdapter.java     | 12 ++++++++++--
 .../processors/datastreamer/DataStreamProcessor.java    | 12 +++++++++---
 2 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7fd64537/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9bb3f55..1d097b7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2112,8 +2112,16 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
             @Nullable @Override public EntryProcessorResult<T> 
op(IgniteTxLocalAdapter tx)
                 throws IgniteCheckedException {
-                IgniteInternalFuture<GridCacheReturn> fut =
-                    tx.invokeAsync(ctx, waitTopFut, key, (EntryProcessor<K, V, 
Object>)entryProcessor, args);
+                assert !waitTopFut || tx.implicit();
+
+                if (!waitTopFut)
+                    
tx.topologyVersion(ctx.shared().exchange().readyAffinityVersion());
+
+                IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx,
+                    waitTopFut,
+                    key,
+                    (EntryProcessor<K, V, Object>)entryProcessor,
+                    args);
 
                 Map<K, EntryProcessorResult<T>> resMap = fut.get().value();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fd64537/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index a2aab77..32f2ff5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -62,6 +62,9 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
     /** Marshaller. */
     private final Marshaller marsh;
 
+    /** */
+    private byte[] marshErrBytes;
+
     /**
      * @param ctx Kernal context.
      */
@@ -86,6 +89,9 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
+        marshErrBytes = marsh.marshal(new IgniteCheckedException("Failed to 
marshal response error, " +
+            "see log for details."));
+
         flusher = new IgniteThread(new GridWorker(ctx.gridName(), 
"grid-data-loader-flusher", log) {
             @Override protected void body() throws InterruptedException {
                 while (!isCancelled()) {
@@ -324,10 +330,10 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
         try {
             errBytes = err != null ? marsh.marshal(err) : null;
         }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to marshal message.", e);
+        catch (Exception e) {
+            U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" 
+ e + ']', e);
 
-            return;
+            errBytes = marshErrBytes;
         }
 
         DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, 
forceLocDep);

Reply via email to