Repository: curator
Updated Branches:
  refs/heads/CURATOR-397 f2370b771 -> 1110ab3bb


better exception handling for serialization issues


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fe0a88b3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fe0a88b3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fe0a88b3

Branch: refs/heads/CURATOR-397
Commit: fe0a88b3b7b9fccf9da022bc0a13e440b1f8435c
Parents: f2370b7
Author: randgalt <[email protected]>
Authored: Fri May 5 18:38:46 2017 -0400
Committer: randgalt <[email protected]>
Committed: Fri May 5 18:38:46 2017 -0400

----------------------------------------------------------------------
 .../modeled/cached/ModeledCacheListener.java    | 12 ++++++++
 .../async/modeled/details/ModeledCacheImpl.java | 23 ++++++++++++---
 .../modeled/details/ModeledFrameworkImpl.java   | 30 ++++++++++++++++----
 3 files changed, 56 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
index 544de78..4f1ac70 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.async.modeled.cached;
 
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.LoggerFactory;
 
 @FunctionalInterface
 public interface ModeledCacheListener<T>
@@ -66,6 +67,17 @@ public interface ModeledCacheListener<T>
     }
 
     /**
+     * Called when there is an exception processing a message from the 
internal cache. This is most
+     * likely due to a de-serialization problem.
+     *
+     * @param e the exception
+     */
+    default void handleException(Exception e)
+    {
+        LoggerFactory.getLogger(getClass()).error("Could not process cache 
message", e);
+    }
+
+    /**
      * Returns a version of this listener that only begins calling
      * {@link 
#accept(org.apache.curator.x.async.modeled.cached.ModeledCacheListener.Type, 
org.apache.curator.x.async.modeled.ZPath, org.apache.zookeeper.data.Stat, 
Object)}
      * once {@link #initialized()} has been called. i.e. changes that occur as 
the cache is initializing are not sent

http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 091a727..353c28a 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -18,13 +18,13 @@
  */
 package org.apache.curator.x.async.modeled.details;
 
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
@@ -37,9 +37,7 @@ import java.util.AbstractMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
@@ -119,7 +117,24 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
     }
 
     @Override
-    public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception
+    public void childEvent(CuratorFramework client, TreeCacheEvent event)
+    {
+        try
+        {
+            internalChildEvent(event);
+        }
+        catch ( Exception e )
+        {
+            ThreadUtils.checkInterrupted(e);
+
+            listenerContainer.forEach(l -> {
+                l.handleException(e);
+                return null;
+            });
+        }
+    }
+
+    private void internalChildEvent(TreeCacheEvent event) throws Exception
     {
         switch ( event.getType() )
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index b666822..66f10de 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -131,8 +131,19 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T>
     @Override
     public AsyncStage<String> set(T item, Stat storingStatIn)
     {
-        byte[] bytes = modelSpec.serializer().serialize(item);
-        return dslClient.create().withOptions(modelSpec.createOptions(), 
modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, 
modelSpec.ttl()).forPath(modelSpec.path().fullPath(), bytes);
+        try
+        {
+            byte[] bytes = modelSpec.serializer().serialize(item);
+            return dslClient.create()
+                .withOptions(modelSpec.createOptions(), 
modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, 
modelSpec.ttl())
+                .forPath(modelSpec.path().fullPath(), bytes);
+        }
+        catch ( Exception e )
+        {
+            ModelStage<String> exceptionStage = new ModelStage<>();
+            exceptionStage.completeExceptionally(e);
+            return exceptionStage;
+        }
     }
 
     private List<ACL> fixAclList(List<ACL> aclList)
@@ -189,9 +200,18 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T>
     @Override
     public AsyncStage<Stat> update(T item, int version)
     {
-        byte[] bytes = modelSpec.serializer().serialize(item);
-        AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? 
dslClient.setData().compressedWithVersion(version) : dslClient.setData();
-        return next.forPath(modelSpec.path().fullPath(), bytes);
+        try
+        {
+            byte[] bytes = modelSpec.serializer().serialize(item);
+            AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? 
dslClient.setData().compressedWithVersion(version) : dslClient.setData();
+            return next.forPath(modelSpec.path().fullPath(), bytes);
+        }
+        catch ( Exception e )
+        {
+            ModelStage<Stat> exceptionStage = new ModelStage<>();
+            exceptionStage.completeExceptionally(e);
+            return exceptionStage;
+        }
     }
 
     @Override

Reply via email to