Repository: ignite Updated Branches: refs/heads/master 1fee1fd18 -> 3aa251d81
IGNITE-9843 Fixed invalid metadata handling on client reconnect - Fixes #5027. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3aa251d8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3aa251d8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3aa251d8 Branch: refs/heads/master Commit: 3aa251d818ebfd7b1561296cec10597c2bb1cccd Parents: 1fee1fd Author: Ilya Kasnacheev <[email protected]> Authored: Fri Nov 9 20:53:10 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Nov 9 20:53:10 2018 +0300 ---------------------------------------------------------------------- .../binary/CacheObjectBinaryProcessorImpl.java | 43 +++++++++++--------- .../processors/query/h2/IgniteH2Indexing.java | 12 +++++- 2 files changed, 35 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3aa251d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 346e8a7..3250636 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap; import javax.cache.CacheException; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; @@ -46,9 +47,9 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.UnregisteredBinaryTypeException; import org.apache.ignite.internal.binary.BinaryContext; @@ -66,7 +67,6 @@ import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl; import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; @@ -103,7 +103,6 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAIT_SCHEMA_UPDATE; import static org.apache.ignite.IgniteSystemProperties.getBoolean; -import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.BINARY_PROC; /** @@ -115,6 +114,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm private volatile boolean discoveryStarted; /** */ + private volatile IgniteFuture<?> reconnectFut; + + /** */ private BinaryContext binaryCtx; /** */ @@ -143,16 +145,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm @GridToStringExclude private IgniteBinary binaries; - /** Listener removes all registered binary schemas and user type descriptors after the local client reconnected. */ - private final GridLocalEventListener clientDisconLsnr = new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - binaryContext().unregisterUserTypeDescriptors(); - binaryContext().unregisterBinarySchemas(); - - metadataLocCache.clear(); - } - }; - /** Locally cached metadata. This local cache is managed by exchanging discovery custom events. */ private final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache = new ConcurrentHashMap<>(); @@ -174,9 +166,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { if (marsh instanceof BinaryMarshaller) { - if (ctx.clientNode()) - ctx.event().addLocalEventListener(clientDisconLsnr, EVT_CLIENT_NODE_DISCONNECTED); - if (!ctx.clientNode()) metadataFileStore = new BinaryMetadataFileStore(metadataLocCache, ctx, log, binaryMetadataFileStoreDir); @@ -283,17 +272,28 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm /** {@inheritDoc} */ @Override public void stop(boolean cancel) { - if (ctx.clientNode()) - ctx.event().removeLocalEventListener(clientDisconLsnr); - if (transport != null) transport.stop(); } /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + this.reconnectFut = reconnectFut; + if (transport != null) transport.onDisconnected(); + + binaryContext().unregisterUserTypeDescriptors(); + binaryContext().unregisterBinarySchemas(); + + metadataLocCache.clear(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + this.reconnectFut = null; + + return super.onReconnected(clusterRestarted); } /** {@inheritDoc} */ @@ -622,6 +622,11 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm holder = metadataLocCache.get(typeId); + IgniteFuture<?> reconnectFut0 = reconnectFut; + + if (holder == null && reconnectFut0 != null) + throw new IgniteClientDisconnectedException(reconnectFut0, "Client node disconnected."); + if (log.isDebugEnabled()) log.debug("Finished waiting for client metadata update" + " [typeId=" + typeId http://git-wip-us.apache.org/repos/asf/ignite/blob/3aa251d8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 2337047..7d93ecf 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1884,7 +1884,17 @@ public class IgniteH2Indexing implements GridQueryIndexing { } @Override public Cache.Entry<K, V> next() { - List<?> l = iter0.next(); + List<?> l; + + try { + l = iter0.next(); + } + catch (CacheException e) { + throw e; + } + catch (Exception e) { + throw new CacheException(e); + } return new CacheEntryImpl<>((K)l.get(0), (V)l.get(1)); }
