Repository: curator Updated Branches: refs/heads/CURATOR-397 f2370b771 -> 1110ab3bb
better exception handling for serialization issues Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fe0a88b3 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fe0a88b3 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fe0a88b3 Branch: refs/heads/CURATOR-397 Commit: fe0a88b3b7b9fccf9da022bc0a13e440b1f8435c Parents: f2370b7 Author: randgalt <[email protected]> Authored: Fri May 5 18:38:46 2017 -0400 Committer: randgalt <[email protected]> Committed: Fri May 5 18:38:46 2017 -0400 ---------------------------------------------------------------------- .../modeled/cached/ModeledCacheListener.java | 12 ++++++++ .../async/modeled/details/ModeledCacheImpl.java | 23 ++++++++++++--- .../modeled/details/ModeledFrameworkImpl.java | 30 ++++++++++++++++---- 3 files changed, 56 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java index 544de78..4f1ac70 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java @@ -20,6 +20,7 @@ package org.apache.curator.x.async.modeled.cached; import org.apache.curator.x.async.modeled.ZPath; import org.apache.zookeeper.data.Stat; +import org.slf4j.LoggerFactory; @FunctionalInterface public interface ModeledCacheListener<T> @@ -66,6 +67,17 @@ public interface ModeledCacheListener<T> } /** + * Called when there is an exception processing a message from the internal cache. This is most + * likely due to a de-serialization problem. + * + * @param e the exception + */ + default void handleException(Exception e) + { + LoggerFactory.getLogger(getClass()).error("Could not process cache message", e); + } + + /** * Returns a version of this listener that only begins calling * {@link #accept(org.apache.curator.x.async.modeled.cached.ModeledCacheListener.Type, org.apache.curator.x.async.modeled.ZPath, org.apache.zookeeper.data.Stat, Object)} * once {@link #initialized()} has been called. i.e. changes that occur as the cache is initializing are not sent http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index 091a727..353c28a 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -18,13 +18,13 @@ */ package org.apache.curator.x.async.modeled.details; -import com.google.common.util.concurrent.MoreExecutors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.utils.ThreadUtils; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.modeled.ModelSerializer; import org.apache.curator.x.async.modeled.ModelSpec; @@ -37,9 +37,7 @@ import java.util.AbstractMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Collectors; class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> @@ -119,7 +117,24 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> } @Override - public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception + public void childEvent(CuratorFramework client, TreeCacheEvent event) + { + try + { + internalChildEvent(event); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + + listenerContainer.forEach(l -> { + l.handleException(e); + return null; + }); + } + } + + private void internalChildEvent(TreeCacheEvent event) throws Exception { switch ( event.getType() ) { http://git-wip-us.apache.org/repos/asf/curator/blob/fe0a88b3/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index b666822..66f10de 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -131,8 +131,19 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> @Override public AsyncStage<String> set(T item, Stat storingStatIn) { - byte[] bytes = modelSpec.serializer().serialize(item); - return dslClient.create().withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl()).forPath(modelSpec.path().fullPath(), bytes); + try + { + byte[] bytes = modelSpec.serializer().serialize(item); + return dslClient.create() + .withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl()) + .forPath(modelSpec.path().fullPath(), bytes); + } + catch ( Exception e ) + { + ModelStage<String> exceptionStage = new ModelStage<>(); + exceptionStage.completeExceptionally(e); + return exceptionStage; + } } private List<ACL> fixAclList(List<ACL> aclList) @@ -189,9 +200,18 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> @Override public AsyncStage<Stat> update(T item, int version) { - byte[] bytes = modelSpec.serializer().serialize(item); - AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? dslClient.setData().compressedWithVersion(version) : dslClient.setData(); - return next.forPath(modelSpec.path().fullPath(), bytes); + try + { + byte[] bytes = modelSpec.serializer().serialize(item); + AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? dslClient.setData().compressedWithVersion(version) : dslClient.setData(); + return next.forPath(modelSpec.path().fullPath(), bytes); + } + catch ( Exception e ) + { + ModelStage<Stat> exceptionStage = new ModelStage<>(); + exceptionStage.completeExceptionally(e); + return exceptionStage; + } } @Override
