http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java deleted file mode 100644 index 3207f0a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java +++ /dev/null @@ -1,784 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.CacheEntryEvent; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import javax.cache.configuration.*; -import javax.cache.event.*; -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static javax.cache.event.EventType.*; -import static org.apache.ignite.events.EventType.*; -import static org.apache.ignite.internal.GridTopic.*; - -/** - * Continuous queries manager. - */ -public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K, V> { - /** Ordered topic prefix. */ - private String topicPrefix; - - /** Listeners. */ - private final ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrs = new ConcurrentHashMap8<>(); - - /** Listeners count. */ - private final AtomicInteger lsnrCnt = new AtomicInteger(); - - /** Internal entries listeners. */ - private final ConcurrentMap<UUID, ListenerInfo<K, V>> intLsnrs = new ConcurrentHashMap8<>(); - - /** Internal listeners count. */ - private final AtomicInteger intLsnrCnt = new AtomicInteger(); - - /** Query sequence number for message topic. */ - private final AtomicLong seq = new AtomicLong(); - - /** Continues queries created for cache event listeners. */ - private final ConcurrentMap<CacheEntryListenerConfiguration, CacheContinuousQuery<K, V>> lsnrQrys = - new ConcurrentHashMap8<>(); - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - // Append cache name to the topic. - topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected void onKernalStart0() throws IgniteCheckedException { - Iterable<CacheEntryListenerConfiguration<K, V>> lsnrCfgs = cctx.config().getCacheEntryListenerConfigurations(); - - if (lsnrCfgs != null) { - for (CacheEntryListenerConfiguration<K, V> cfg : lsnrCfgs) - registerCacheEntryListener(cfg, false); - } - } - - /** {@inheritDoc} */ - @Override protected void onKernalStop0(boolean cancel) { - super.onKernalStop0(cancel); - - for (CacheEntryListenerConfiguration lsnrCfg : lsnrQrys.keySet()) { - try { - deregisterCacheEntryListener(lsnrCfg); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to remove cache entry listener: " + e); - } - } - } - - /** - * @param prjPred Projection predicate. - * @return New continuous query. - */ - public CacheContinuousQuery<K, V> createQuery(@Nullable IgnitePredicate<CacheEntry<K, V>> prjPred) { - Object topic = TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()); - - return new GridCacheContinuousQueryAdapter<>(cctx, topic, prjPred); - } - - /** - * @param e Cache entry. - * @param key Key. - * @param newVal New value. - * @param newBytes New value bytes. - * @param oldVal Old value. - * @param oldBytes Old value bytes. - * @param preload {@code True} if entry is updated during preloading. - * @throws IgniteCheckedException In case of error. - */ - public void onEntryUpdate(GridCacheEntryEx<K, V> e, - K key, - @Nullable V newVal, - @Nullable GridCacheValueBytes newBytes, - V oldVal, - @Nullable GridCacheValueBytes oldBytes, - boolean preload) throws IgniteCheckedException { - assert e != null; - assert key != null; - - ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrCol; - - if (e.isInternal()) - lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null; - else - lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null; - - if (F.isEmpty(lsnrCol)) - return; - - oldVal = cctx.unwrapTemporary(oldVal); - - EventType evtType = newVal == null ? REMOVED : - ((oldVal != null || (oldBytes != null && !oldBytes.isNull()) ? UPDATED : CREATED)); - - GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>( - cctx, - e.wrap(false), - key, - newVal, - newBytes, - oldVal, - oldBytes, - evtType); - - e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader()); - - boolean recordEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - - for (ListenerInfo<K, V> lsnr : lsnrCol.values()) { - if (preload && lsnr.entryListener()) - continue; - - lsnr.onEntryUpdate(e0, recordEvt); - } - } - - /** - * @param e Entry. - * @param key Key. - * @param oldVal Old value. - * @param oldBytes Old value bytes. - */ - public void onEntryExpired(GridCacheEntryEx<K, V> e, - K key, - V oldVal, - @Nullable GridCacheValueBytes oldBytes) { - if (e.isInternal()) - return; - - ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrCol = lsnrs; - - if (F.isEmpty(lsnrCol)) - return; - - if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, -1)) { - GridCacheContinuousQueryEntry<K, V> e0 = new GridCacheContinuousQueryEntry<>( - cctx, - e.wrap(false), - key, - null, - null, - oldVal, - oldBytes, - EXPIRED); - - for (ListenerInfo<K, V> lsnr : lsnrCol.values()) { - if (!lsnr.entryListener()) - continue; - - lsnr.onEntryUpdate(e0, false); - } - } - } - - /** - * @param lsnrCfg Listener configuration. - * @param addToCfg If {@code true} adds listener configuration to cache configuration. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg, boolean addToCfg) - throws IgniteCheckedException { - GridCacheContinuousQueryAdapter<K, V> qry = null; - - try { - A.notNull(lsnrCfg, "lsnrCfg"); - - Factory<CacheEntryListener<? super K, ? super V>> factory = lsnrCfg.getCacheEntryListenerFactory(); - - A.notNull(factory, "cacheEntryListenerFactory"); - - CacheEntryListener lsnr = factory.create(); - - A.notNull(lsnr, "lsnr"); - - IgniteCacheProxy<K, V> cache= cctx.kernalContext().cache().jcache(cctx.name()); - - EntryListenerCallback cb = new EntryListenerCallback(cache, lsnr); - - if (!(cb.create() || cb.update() || cb.remove() || cb.expire())) - throw new IllegalArgumentException("Listener must implement one of CacheEntryListener sub-interfaces."); - - qry = (GridCacheContinuousQueryAdapter<K, V>)cctx.cache().queries().createContinuousQuery(); - - CacheContinuousQuery<K, V> old = lsnrQrys.putIfAbsent(lsnrCfg, qry); - - if (old != null) - throw new IllegalArgumentException("Listener is already registered for configuration: " + lsnrCfg); - - qry.autoUnsubscribe(true); - - qry.bufferSize(1); - - qry.localCallback(cb); - - EntryListenerFilter<K, V> fltr = new EntryListenerFilter<>(cb.create(), - cb.update(), - cb.remove(), - cb.expire(), - lsnrCfg.getCacheEntryEventFilterFactory(), - cctx.kernalContext().grid(), - cctx.name()); - - qry.remoteFilter(fltr); - - qry.execute(null, false, true, lsnrCfg.isSynchronous(), lsnrCfg.isOldValueRequired()); - - if (addToCfg) - cctx.config().addCacheEntryListenerConfiguration(lsnrCfg); - } - catch (IgniteCheckedException e) { - lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to execute it. - - throw e; - } - } - - /** - * @param lsnrCfg Listener configuration. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) throws IgniteCheckedException { - A.notNull(lsnrCfg, "lsnrCfg"); - - CacheContinuousQuery<K, V> qry = lsnrQrys.remove(lsnrCfg); - - if (qry != null) { - cctx.config().removeCacheEntryListenerConfiguration(lsnrCfg); - - qry.close(); - } - } - - /** - * @param lsnrId Listener ID. - * @param lsnr Listener. - * @param internal Internal flag. - * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. - * @return Whether listener was actually registered. - */ - @SuppressWarnings("UnusedParameters") - boolean registerListener(UUID lsnrId, - GridCacheContinuousQueryListener<K, V> lsnr, - boolean internal, - boolean entryLsnr) { - ListenerInfo<K, V> info = new ListenerInfo<>(lsnr, entryLsnr); - - boolean added; - - if (internal) { - added = intLsnrs.putIfAbsent(lsnrId, info) == null; - - if (added) - intLsnrCnt.incrementAndGet(); - } - else { - added = lsnrs.putIfAbsent(lsnrId, info) == null; - - if (added) { - lsnrCnt.incrementAndGet(); - - lsnr.onExecution(); - } - } - - return added; - } - - /** - * @param internal Internal flag. - * @param id Listener ID. - */ - void unregisterListener(boolean internal, UUID id) { - ListenerInfo info; - - if (internal) { - if ((info = intLsnrs.remove(id)) != null) { - intLsnrCnt.decrementAndGet(); - - info.lsnr.onUnregister(); - } - } - else { - if ((info = lsnrs.remove(id)) != null) { - lsnrCnt.decrementAndGet(); - - info.lsnr.onUnregister(); - } - } - } - - /** - * Iterates through existing data. - * - * @param internal Internal flag. - * @param id Listener ID. - * @param keepPortable Keep portable flag. - */ - @SuppressWarnings("unchecked") - void iterate(boolean internal, UUID id, boolean keepPortable) { - ListenerInfo<K, V> info = internal ? intLsnrs.get(id) : lsnrs.get(id); - - assert info != null; - - GridCacheProjectionImpl<K, V> oldPrj = null; - - try { - if (keepPortable) { - oldPrj = cctx.projectionPerCall(); - - cctx.projectionPerCall(cctx.cache().<K, V>keepPortable0()); - } - - Set<CacheEntry<K, V>> entries; - - if (cctx.isReplicated()) - entries = internal ? cctx.cache().entrySetx() : - cctx.cache().entrySet(); - else - entries = internal ? cctx.cache().primaryEntrySetx() : - cctx.cache().primaryEntrySet(); - - boolean evt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - - for (CacheEntry<K, V> e : entries) { - GridCacheContinuousQueryEntry<K, V> qryEntry = new GridCacheContinuousQueryEntry<>(cctx, - e, - e.getKey(), - e.getValue(), - null, - null, - null, - CREATED); - - info.onIterate(qryEntry, evt); - } - - info.flushPending(); - } - finally { - if (keepPortable) - cctx.projectionPerCall(oldPrj); - } - } - - /** - * Listener info. - */ - private static class ListenerInfo<K, V> { - /** Listener. */ - private final GridCacheContinuousQueryListener<K, V> lsnr; - - /** Pending entries. */ - private Collection<PendingEntry<K, V>> pending; - - /** */ - private final boolean entryLsnr; - - /** - * @param lsnr Listener. - * @param entryLsnr {@code True} if listener created for {@link CacheEntryListener}. - */ - private ListenerInfo(GridCacheContinuousQueryListener<K, V> lsnr, boolean entryLsnr) { - this.lsnr = lsnr; - this.entryLsnr = entryLsnr; - - if (!entryLsnr) - pending = new LinkedList<>(); - } - - /** - * @param e Entry update callback. - * @param recordEvt Whether to record event. - */ - void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) { - boolean notifyLsnr = true; - - synchronized (this) { - if (pending != null) { - pending.add(new PendingEntry<>(e, recordEvt)); - - notifyLsnr = false; - } - } - - if (notifyLsnr) - lsnr.onEntryUpdate(e, recordEvt); - } - - /** - * @param e Entry iteration callback. - * @param recordEvt Whether to record event. - */ - void onIterate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) { - lsnr.onEntryUpdate(e, recordEvt); - } - - /** - * Flushes pending entries to listener. - */ - void flushPending() { - Collection<PendingEntry<K, V>> pending0; - - synchronized (this) { - pending0 = pending; - - pending = null; - } - - for (PendingEntry<K, V> e : pending0) - lsnr.onEntryUpdate(e.entry, e.recordEvt); - } - - /** - * @return {@code True} if listener created for {@link CacheEntryListener}. - */ - boolean entryListener() { - return entryLsnr; - } - } - - /** - * Pending entry. - */ - private static class PendingEntry<K, V> { - /** Entry. */ - private final GridCacheContinuousQueryEntry<K, V> entry; - - /** Whether to record event. */ - private final boolean recordEvt; - - /** - * @param entry Entry. - * @param recordEvt Whether to record event. - */ - private PendingEntry(GridCacheContinuousQueryEntry<K, V> entry, boolean recordEvt) { - this.entry = entry; - this.recordEvt = recordEvt; - } - } - - /** - * - */ - static class EntryListenerFilter<K1, V1> implements - IgnitePredicate<CacheContinuousQueryEntry<K1, V1>>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private boolean create; - - /** */ - private boolean update; - - /** */ - private boolean rmv; - - /** */ - private boolean expire; - - /** */ - private Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory; - - /** */ - private CacheEntryEventFilter fltr; - - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** */ - private IgniteCache cache; - - /** */ - private String cacheName; - - /** - * - */ - public EntryListenerFilter() { - // No-op. - } - - /** - * @param create {@code True} if listens for create events. - * @param update {@code True} if listens for create events. - * @param rmv {@code True} if listens for remove events. - * @param expire {@code True} if listens for expire events. - * @param fltrFactory Filter factory. - * @param ignite Ignite instance. - * @param cacheName Cache name. - */ - EntryListenerFilter( - boolean create, - boolean update, - boolean rmv, - boolean expire, - Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory, - Ignite ignite, - @Nullable String cacheName) { - this.create = create; - this.update = update; - this.rmv = rmv; - this.expire = expire; - this.fltrFactory = fltrFactory; - this.ignite = ignite; - this.cacheName = cacheName; - - if (fltrFactory != null) - fltr = fltrFactory.create(); - - cache = ignite.jcache(cacheName); - - assert cache != null : cacheName; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public boolean apply(CacheContinuousQueryEntry<K1, V1> entry) { - try { - EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType()); - - switch (evtType) { - case EXPIRED: - if (!expire) - return false; - - break; - - case REMOVED: - if (!rmv) - return false; - - break; - - case CREATED: - if (!create) - return false; - - break; - - case UPDATED: - if (!update) - return false; - - break; - - default: - assert false : evtType; - } - - if (fltr == null) - return true; - - if (cache == null) { - cache = ignite.jcache(cacheName); - - assert cache != null : cacheName; - } - - return fltr.evaluate(new CacheEntryEvent(cache, evtType, entry)); - } - catch (Exception e) { - LT.warn(ignite.log(), e, "Cache entry event filter error: " + e); - - return true; - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeBoolean(create); - - out.writeBoolean(update); - - out.writeBoolean(rmv); - - out.writeBoolean(expire); - - U.writeString(out, cacheName); - - out.writeObject(fltrFactory); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - create = in.readBoolean(); - - update = in.readBoolean(); - - rmv = in.readBoolean(); - - expire = in.readBoolean(); - - cacheName = U.readString(in); - - fltrFactory = (Factory<CacheEntryEventFilter<? super K1, ? super V1>>)in.readObject(); - - if (fltrFactory != null) - fltr = fltrFactory.create(); - } - } - - /** - * - */ - private class EntryListenerCallback implements - IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> { - /** */ - private final IgniteCacheProxy<K, V> cache; - - /** */ - private final CacheEntryCreatedListener createLsnr; - - /** */ - private final CacheEntryUpdatedListener updateLsnr; - - /** */ - private final CacheEntryRemovedListener rmvLsnr; - - /** */ - private final CacheEntryExpiredListener expireLsnr; - - /** - * @param cache Cache to be used as event source. - * @param lsnr Listener. - */ - EntryListenerCallback(IgniteCacheProxy<K, V> cache, CacheEntryListener lsnr) { - this.cache = cache; - - createLsnr = lsnr instanceof CacheEntryCreatedListener ? (CacheEntryCreatedListener)lsnr : null; - updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? (CacheEntryUpdatedListener)lsnr : null; - rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? (CacheEntryRemovedListener)lsnr : null; - expireLsnr = lsnr instanceof CacheEntryExpiredListener ? (CacheEntryExpiredListener)lsnr : null; - } - - /** - * @return {@code True} if listens for create event. - */ - boolean create() { - return createLsnr != null; - } - - /** - * @return {@code True} if listens for update event. - */ - boolean update() { - return updateLsnr != null; - } - - /** - * @return {@code True} if listens for remove event. - */ - boolean remove() { - return rmvLsnr != null; - } - - /** - * @return {@code True} if listens for expire event. - */ - boolean expire() { - return expireLsnr != null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public boolean apply(UUID uuid, - Collection<CacheContinuousQueryEntry<K, V>> entries) { - for (CacheContinuousQueryEntry entry : entries) { - try { - EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType()); - - switch (evtType) { - case EXPIRED: { - assert expireLsnr != null; - - CacheEntryEvent evt0 = - new CacheEntryEvent(cache, EXPIRED, entry); - - expireLsnr.onExpired(Collections.singleton(evt0)); - - break; - } - - case REMOVED: { - assert rmvLsnr != null; - - CacheEntryEvent evt0 = - new CacheEntryEvent(cache, REMOVED, entry); - - rmvLsnr.onRemoved(Collections.singleton(evt0)); - - break; - } - - case UPDATED: { - assert updateLsnr != null; - - CacheEntryEvent evt0 = - new CacheEntryEvent(cache, UPDATED, entry); - - updateLsnr.onUpdated(Collections.singleton(evt0)); - - break; - } - - case CREATED: { - assert createLsnr != null; - - CacheEntryEvent evt0 = - new CacheEntryEvent(cache, CREATED, entry); - - createLsnr.onCreated(Collections.singleton(evt0)); - - break; - } - - default: - assert false : evtType; - } - } - catch (CacheEntryListenerException e) { - LT.warn(log, e, "Cache entry listener error: " + e); - } - } - - return true; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index a6f76f8..10f5d36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -28,7 +28,6 @@ import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.query.continuous.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; @@ -36,12 +35,15 @@ import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.apache.ignite.services.*; import org.apache.ignite.marshaller.*; +import org.apache.ignite.services.*; import org.apache.ignite.thread.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.*; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.*; import java.util.*; import java.util.concurrent.*; @@ -88,10 +90,10 @@ public class GridServiceProcessor extends GridProcessorAdapter { private GridLocalEventListener topLsnr = new TopologyListener(); /** Deployment listener. */ - private GridCacheContinuousQueryAdapter<Object, Object> cfgQry; + private QueryCursor<Cache.Entry<Object, Object>> cfgQryCur; /** Assignment listener. */ - private GridCacheContinuousQueryAdapter<Object, Object> assignQry; + private QueryCursor<Cache.Entry<Object, Object>> assignQryCur; /** * @param ctx Kernal context. @@ -128,17 +130,19 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (ctx.deploy().enabled()) ctx.cache().context().deploy().ignoreOwnership(true); - cfgQry = (GridCacheContinuousQueryAdapter<Object, Object>)cache.queries().createContinuousQuery(); + IgniteCache<Object, Object> jCache = ctx.cache().utilityJCache(); - cfgQry.localCallback(new DeploymentListener()); + ContinuousQuery<Object, Object> cfgQry = Query.continuous(); - cfgQry.execute(ctx.grid().forLocal(), true, false, false, true); + cfgQry.setLocalListener(new DeploymentListener()); - assignQry = (GridCacheContinuousQueryAdapter<Object, Object>)cache.queries().createContinuousQuery(); + cfgQryCur = jCache.localQuery(cfgQry); - assignQry.localCallback(new AssignmentListener()); + ContinuousQuery<Object, Object> assignQry = Query.continuous(); - assignQry.execute(ctx.grid().forLocal(), true, false, false, true); + assignQry.setLocalListener(new AssignmentListener()); + + assignQryCur = jCache.localQuery(assignQry); } finally { if (ctx.deploy().enabled()) @@ -171,21 +175,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { ctx.event().removeLocalEventListener(topLsnr); - try { - if (cfgQry != null) - cfgQry.close(); - } - catch (IgniteCheckedException e) { - log.error("Failed to unsubscribe service configuration notifications.", e); - } + if (cfgQryCur != null) + cfgQryCur.close(); - try { - if (assignQry != null) - assignQry.close(); - } - catch (IgniteCheckedException e) { - log.error("Failed to unsubscribe service assignment notifications.", e); - } + if (assignQryCur != null) + assignQryCur.close(); Collection<ServiceContextImpl> ctxs = new ArrayList<>(); @@ -916,18 +910,12 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** * Service deployment listener. */ - private class DeploymentListener - implements IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>> { - /** Serial version ID. */ - private static final long serialVersionUID = 0L; - + private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object> { /** {@inheritDoc} */ - @Override public boolean apply( - UUID nodeId, - final Collection<CacheContinuousQueryEntry<Object, Object>> deps) { + @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> deps) { depExe.submit(new BusyRunnable() { @Override public void run0() { - for (Entry<Object, Object> e : deps) { + for (CacheEntryEvent<?, ?> e : deps) { if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue; @@ -989,8 +977,6 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } }); - - return true; } /** @@ -1194,18 +1180,12 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** * Assignment listener. */ - private class AssignmentListener - implements IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>> { - /** Serial version ID. */ - private static final long serialVersionUID = 0L; - + private class AssignmentListener implements CacheEntryUpdatedListener<Object, Object> { /** {@inheritDoc} */ - @Override public boolean apply( - UUID nodeId, - final Collection<CacheContinuousQueryEntry<Object, Object>> assignCol) { + @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> assignCol) throws CacheEntryListenerException { depExe.submit(new BusyRunnable() { @Override public void run0() { - for (Entry<Object, Object> e : assignCol) { + for (CacheEntryEvent<?, ?> e : assignCol) { if (!(e.getKey() instanceof GridServiceAssignmentsKey)) continue; @@ -1253,8 +1233,6 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } }); - - return true; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties index e6ece60..97f1c3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties @@ -694,8 +694,8 @@ org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponseEntry org.apache.ignite.internal.processors.cache.query.GridCacheQueryType org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery -org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryHandler -org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryHandler$DeployableObject +org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler +org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$DeployableObject org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcMetadataTask org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcMetadataTask$JdbcDriverMetadataJob org.apache.ignite.internal.processors.cache.query.jdbc.GridCacheQueryJdbcTask http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 7615f44..0a3c3d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -25,10 +25,9 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.datastructures.*; import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.internal.processors.datastructures.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -42,7 +41,10 @@ import org.apache.ignite.testframework.junits.common.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.*; import javax.cache.configuration.*; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.*; import javax.cache.integration.*; import java.util.*; import java.util.concurrent.*; @@ -54,8 +56,8 @@ import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CachePreloadMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.*; import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.*; /** * Continuous queries tests. @@ -165,7 +167,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo for (int i = 0; i < gridCount(); i++) { GridContinuousProcessor proc = ((IgniteKernal)grid(i)).context().continuous(); - assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, "locInfos")).size()); + assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "locInfos")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStartAck")).size()); @@ -173,7 +175,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStopAck")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "pending")).size()); - GridCacheContinuousQueryManager mgr = + CacheContinuousQueryManager mgr = ((IgniteKernal)grid(i)).context().cache().internalCache().context().continuousQueries(); assertEquals(0, ((Map)U.field(mgr, "lsnrs")).size()); @@ -201,14 +203,14 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo * @throws Exception If failed. */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public void testApi() throws Exception { - final CacheContinuousQuery<Object, Object> q = grid(0).cache(null).queries().createContinuousQuery(); + public void testIllegalArguments() throws Exception { + final ContinuousQuery<Object, Object> q = Query.continuous(); GridTestUtils.assertThrows( log, new Callable<Object>() { @Override public Object call() throws Exception { - q.bufferSize(-1); + q.setBufferSize(-1); return null; } @@ -217,24 +219,20 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo null ); - GridTestUtils.assertThrows( - log, - new Callable<Object>() { + GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - q.bufferSize(0); + q.setBufferSize(0); return null; } - }, - IllegalArgumentException.class, - null + }, IllegalArgumentException.class, null ); GridTestUtils.assertThrows( log, new Callable<Object>() { @Override public Object call() throws Exception { - q.timeInterval(-1); + q.setTimeInterval(-1); return null; } @@ -242,128 +240,24 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo IllegalArgumentException.class, null ); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.execute(); - - return null; - } - }, - IllegalStateException.class, - null - ); - - q.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>>() { - @Override public boolean apply(UUID uuid, Collection<CacheContinuousQueryEntry<Object, Object>> entries) { - return true; - } - }); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.execute(grid(0).forPredicate(F.<ClusterNode>alwaysFalse())); - - return null; - } - }, - ClusterTopologyCheckedException.class, - null - ); - - q.execute(); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>>() { - @Override public boolean apply(UUID uuid, Collection<CacheContinuousQueryEntry<Object, Object>> entries) { - return false; - } - }); - - return null; - } - }, - IllegalStateException.class, - null - ); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.remoteFilter(null); - - return null; - } - }, - IllegalStateException.class, - null - ); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.bufferSize(10); - - return null; - } - }, - IllegalStateException.class, - null - ); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.timeInterval(10); - - return null; - } - }, - IllegalStateException.class, - null - ); - - q.close(); - - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - q.execute(); - - return null; - } - }, - IllegalStateException.class, - null - ); } /** * @throws Exception If failed. */ public void testAllEntries() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, List<Integer>> map = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(5); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { + U.debug(">>>>>>>>>>>>>>> EVT: " + e); + synchronized (map) { List<Integer> vals = map.get(e.getKey()); @@ -378,21 +272,17 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo latch.countDown(); } - - return true; } }); - try { - qry.execute(); - - cache.putx(1, 1); - cache.putx(2, 2); - cache.putx(3, 3); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + cache.put(1, 1); + cache.put(2, 2); + cache.put(3, 3); - cache.removex(2); + cache.remove(2); - cache.putx(1, 10); + cache.put(1, 10); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -418,25 +308,22 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(1, vals.size()); assertEquals(3, (int)vals.get(0)); } - finally { - qry.close(); - } } /** * @throws Exception If failed. */ public void testEntriesByFilter() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, List<Integer>> map = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(4); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { synchronized (map) { List<Integer> vals = map.get(e.getKey()); @@ -451,46 +338,26 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo latch.countDown(); } - - return true; - } - }); - - qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) { - return e.getKey() > 2; } }); - // Second query to wait for notifications about all updates. - CacheContinuousQuery<Integer, Integer> qry0 = cache.queries().createContinuousQuery(); - - final CountDownLatch latch0 = new CountDownLatch(8); - - qry0.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID uuid, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> ignored : entries) - latch0.countDown(); - - return true; + qry.setRemoteFilter(new CacheEntryEventFilter<Integer, Integer>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + return evt.getKey() > 2; } }); - try { - qry.execute(); - qry0.execute(); - - cache.putx(1, 1); - cache.putx(2, 2); - cache.putx(3, 3); - cache.putx(4, 4); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + cache.put(1, 1); + cache.put(2, 2); + cache.put(3, 3); + cache.put(4, 4); - cache.removex(2); - cache.removex(3); + cache.remove(2); + cache.remove(3); - cache.putx(1, 10); - cache.putx(4, 40); + cache.put(1, 10); + cache.put(4, 40); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -509,91 +376,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(2, vals.size()); assertEquals(4, (int)vals.get(0)); assertEquals(40, (int)vals.get(1)); - - assert latch0.await(2, SECONDS); - } - finally { - qry.close(); - qry0.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testProjection() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - // Queries for non-partitioned caches always run locally. - if (cache.configuration().getCacheMode() != PARTITIONED) - return; - - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); - - final Map<Integer, List<Integer>> map = new HashMap<>(); - final CountDownLatch latch = new CountDownLatch(1); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - synchronized (map) { - List<Integer> vals = map.get(e.getKey()); - - if (vals == null) { - vals = new ArrayList<>(); - - map.put(e.getKey(), vals); - } - - vals.add(e.getValue()); - } - - latch.countDown(); - } - - return true; - } - }); - - try { - qry.execute(grid(0).forRemotes()); - - int locKey = -1; - int rmtKey = -1; - - int key = 0; - - while (true) { - ClusterNode n = grid(0).mapKeyToNode(null, key); - - assert n != null; - - if (n.equals(grid(0).localNode())) - locKey = key; - else - rmtKey = key; - - key++; - - if (locKey >= 0 && rmtKey >= 0) - break; - } - - cache.putx(locKey, 1); - cache.putx(rmtKey, 2); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals(1, map.size()); - - List<Integer> vals = map.get(rmtKey); - - assertNotNull(vals); - assertEquals(1, vals.size()); - assertEquals(2, (int)vals.get(0)); - } - finally { - qry.close(); } } @@ -601,20 +383,16 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo * @throws Exception If failed. */ public void testLocalNodeOnly() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - // Queries for non-partitioned caches always run locally. - if (cache.configuration().getCacheMode() != PARTITIONED) - return; - - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, List<Integer>> map = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(1); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { synchronized (map) { List<Integer> vals = map.get(e.getKey()); @@ -629,14 +407,10 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo latch.countDown(); } - - return true; } }); - try { - qry.execute(grid(0).forLocal()); - + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.localQuery(qry)) { int locKey = -1; int rmtKey = -1; @@ -658,8 +432,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo break; } - cache.putx(locKey, 1); - cache.putx(rmtKey, 2); + cache.put(locKey, 1); + cache.put(rmtKey, 2); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -671,103 +445,25 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(1, vals.size()); assertEquals(1, (int)vals.get(0)); } - finally { - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testStopByCallback() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); - - final Map<Integer, List<Integer>> map = new HashMap<>(); - final CountDownLatch latch = new CountDownLatch(1); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - synchronized (map) { - List<Integer> vals = map.get(e.getKey()); - - if (vals == null) { - vals = new ArrayList<>(); - - map.put(e.getKey(), vals); - } - - vals.add(e.getValue()); - } - - latch.countDown(); - } - - return false; - } - }); - - // Second query to wait for notifications about all updates. - CacheContinuousQuery<Integer, Integer> qry0 = cache.queries().createContinuousQuery(); - - final CountDownLatch latch0 = new CountDownLatch(3); - - qry0.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> ignored : entries) - latch0.countDown(); - - return true; - } - }); - - try { - qry.execute(); - qry0.execute(); - - cache.putx(1, 1); - cache.putx(2, 2); - cache.putx(3, 3); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals(1, map.size()); - - List<Integer> list = F.first(map.values()); - - assert list != null; - - assertEquals(1, list.size()); - - assert latch0.await(2, SECONDS); - } - finally { - qry.close(); - qry0.close(); - } } /** * @throws Exception If failed. */ public void testBuffering() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - // Buffering make sense only for remote nodes, so test only for partitioned cache. - if (cache.configuration().getCacheMode() != PARTITIONED) + if (grid(0).cache(null).configuration().getCacheMode() != PARTITIONED) return; - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, List<Integer>> map = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(5); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { synchronized (map) { List<Integer> vals = map.get(e.getKey()); @@ -782,18 +478,14 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo latch.countDown(); } - - return true; } }); - qry.bufferSize(5); + qry.setBufferSize(5); - try { + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { ClusterNode node = F.first(grid(0).forRemotes().nodes()); - qry.execute(grid(0).forNode(node)); - Collection<Integer> keys = new HashSet<>(); int key = 0; @@ -815,12 +507,12 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo Iterator<Integer> it = keys.iterator(); for (int i = 0; i < 4; i++) - cache.putx(it.next(), 0); + cache.put(it.next(), 0); assert !latch.await(2, SECONDS); for (int i = 0; i < 2; i++) - cache.putx(it.next(), 0); + cache.put(it.next(), 0); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -838,29 +530,25 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(0, (int)vals.get(0)); } } - finally { - qry.close(); - } } /** * @throws Exception If failed. */ public void testTimeInterval() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - // Buffering make sense only for remote nodes, so test only for partitioned cache. - if (cache.configuration().getCacheMode() != PARTITIONED) + if (cache.getConfiguration(CacheConfiguration.class).getCacheMode() != PARTITIONED) return; - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, List<Integer>> map = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(5); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { synchronized (map) { List<Integer> vals = map.get(e.getKey()); @@ -875,15 +563,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo latch.countDown(); } - - return true; } }); - qry.bufferSize(10); - qry.timeInterval(3000); + qry.setBufferSize(10); + qry.setTimeInterval(3000); - try { + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { ClusterNode node = F.first(grid(0).forRemotes().nodes()); Collection<Integer> keys = new HashSet<>(); @@ -905,9 +591,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } for (Integer k : keys) - cache.putx(k, 0); - - qry.execute(grid(0).forNode(node)); + cache.put(k, 0); assert !latch.await(2, SECONDS); assert latch.await(1000 + LATCH_TIMEOUT, MILLISECONDS); @@ -926,125 +610,117 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(0, (int)vals.get(0)); } } - finally { - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testIteration() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); - - final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(10); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - map.put(e.getKey(), e.getValue()); - - latch.countDown(); - } - - return true; - } - }); - - try { - for (int i = 0; i < 10; i++) - cache.putx(i, i); - - qry.execute(); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals(10, map.size()); - - for (int i = 0; i < 10; i++) - assertEquals(i, (int)map.get(i)); - } - finally { - qry.close(); - } } - /** - * @throws Exception If failed. - */ - public void testIterationAndUpdates() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); - - final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(12); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - map.put(e.getKey(), e.getValue()); - - latch.countDown(); - } - - return true; - } - }); - - try { - for (int i = 0; i < 10; i++) - cache.putx(i, i); - - qry.execute(); - - cache.putx(10, 10); - cache.putx(11, 11); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : latch.getCount(); - - assertEquals(12, map.size()); - - for (int i = 0; i < 12; i++) - assertEquals(i, (int)map.get(i)); - } - finally { - qry.close(); - } - } +// /** +// * @throws Exception If failed. +// */ +// public void testIteration() throws Exception { +// GridCache<Integer, Integer> cache = grid(0).cache(null); +// +// CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); +// +// final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); +// final CountDownLatch latch = new CountDownLatch(10); +// +// qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { +// @Override public boolean apply(UUID nodeId, +// Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { +// for (Map.Entry<Integer, Integer> e : entries) { +// map.put(e.getKey(), e.getValue()); +// +// latch.countDown(); +// } +// +// return true; +// } +// }); +// +// try { +// for (int i = 0; i < 10; i++) +// cache.putx(i, i); +// +// qry.execute(); +// +// assert latch.await(LATCH_TIMEOUT, MILLISECONDS); +// +// assertEquals(10, map.size()); +// +// for (int i = 0; i < 10; i++) +// assertEquals(i, (int)map.get(i)); +// } +// finally { +// qry.close(); +// } +// } +// +// /** +// * @throws Exception If failed. +// */ +// public void testIterationAndUpdates() throws Exception { +// GridCache<Integer, Integer> cache = grid(0).cache(null); +// +// CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); +// +// final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); +// final CountDownLatch latch = new CountDownLatch(12); +// +// qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { +// @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { +// for (Map.Entry<Integer, Integer> e : entries) { +// map.put(e.getKey(), e.getValue()); +// +// latch.countDown(); +// } +// +// return true; +// } +// }); +// +// try { +// for (int i = 0; i < 10; i++) +// cache.putx(i, i); +// +// qry.execute(); +// +// cache.putx(10, 10); +// cache.putx(11, 11); +// +// assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : latch.getCount(); +// +// assertEquals(12, map.size()); +// +// for (int i = 0; i < 12; i++) +// assertEquals(i, (int)map.get(i)); +// } +// finally { +// qry.close(); +// } +// } /** * @throws Exception If failed. */ public void testLoadCache() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); final CountDownLatch latch = new CountDownLatch(10); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { map.put(e.getKey(), e.getValue()); latch.countDown(); } - - return true; } }); - try { - qry.execute(); - - for (int i = 0; i < gridCount(); i++) - grid(i).cache(null).loadCache(null, 0); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + cache.loadCache(null, 0); assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + latch.getCount(); @@ -1053,170 +729,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo for (int i = 0; i < 10; i++) assertEquals(i, (int)map.get(i)); } - finally { - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTypedProjection() throws Exception { - GridCache<Object, Object> cache = grid(0).cache(null); - - CacheContinuousQuery<Integer, Integer> qry = - cache.projection(Integer.class, Integer.class).queries().createContinuousQuery(); - - final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(2); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - map.put(e.getKey(), e.getValue()); - - latch.countDown(); - } - - return true; - } - }); - - qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) { - return true; - } - }); - - try { - qry.execute(); - - cache.putx(1, 1); - cache.putx("a", "a"); - cache.putx(2, 2); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals(2, map.size()); - - assertEquals(1, (int)map.get(1)); - assertEquals(2, (int)map.get(2)); - } - finally { - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testEntryFilterProjection() throws Exception { - CacheProjection<Integer, Integer> cache = grid(0).cache(null); - - CacheContinuousQuery<Integer, Integer> qry = cache.projection( - new P1<CacheEntry<Integer, Integer>>() { - @Override public boolean apply(CacheEntry<Integer, Integer> e) { - Integer i = e.peek(); - - return i != null && i > 10; - } - }).queries().createContinuousQuery(); - - final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(2); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (CacheContinuousQueryEntry<Integer, Integer> e : entries) { - info("Query entry: " + e); - - map.put(e.getKey(), e.getValue()); - - latch.countDown(); - } - - return true; - } - }); - - qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) { - return true; - } - }); - - try { - qry.execute(); - - cache.putx(1, 1); - cache.putx(11, 11); - cache.putx(2, 2); - cache.putx(22, 22); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals("Invalid number of entries notified: " + map, 2, map.size()); - - assertEquals(11, (int)map.get(11)); - assertEquals(22, (int)map.get(22)); - } - finally { - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testKeyValueFilterProjection() throws Exception { - CacheProjection<Integer, Integer> cache = grid(0).cache(null); - - CacheContinuousQuery<Integer, Integer> qry = cache.projection( - new P2<Integer, Integer>() { - @Override public boolean apply(Integer key, Integer val) { - return val > 10; - } - }).queries().createContinuousQuery(); - - final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(2); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - map.put(e.getKey(), e.getValue()); - - latch.countDown(); - } - - return true; - } - }); - - qry.remoteFilter(new P1<CacheContinuousQueryEntry<Integer, Integer>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) { - return true; - } - }); - - try { - qry.execute(); - - cache.putx(1, 1); - cache.putx(11, 11); - cache.putx(2, 2); - cache.putx(22, 22); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - - assertEquals(2, map.size()); - - assertEquals(11, (int)map.get(11)); - assertEquals(22, (int)map.get(22)); - } - finally { - qry.close(); - } } /** @@ -1226,33 +738,28 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo if (atomicityMode() == ATOMIC) return; - GridCache<Object, Object> cache = grid(0).cache(null); + IgniteCache<Object, Object> cache = grid(0).jcache(null); - CacheContinuousQuery<Object, Object> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Object, Object> qry = Query.continuous(); final Map<Object, Object> map = new ConcurrentHashMap8<>(); final CountDownLatch latch = new CountDownLatch(2); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Object, Object>>>() { - @Override public boolean apply(UUID nodeId, - Collection<CacheContinuousQueryEntry<Object, Object>> entries) { - for (Map.Entry<Object, Object> e : entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> e : evts) { map.put(e.getKey(), e.getValue()); latch.countDown(); } - - return true; } }); - try { - qry.execute(); - - cache.putx(new GridCacheInternalKeyImpl("test"), 1); + try (QueryCursor<Cache.Entry<Object, Object>> ignored = cache.query(qry)) { + cache.put(new GridCacheInternalKeyImpl("test"), 1); - cache.putx(1, 1); - cache.putx(2, 2); + cache.put(1, 1); + cache.put(2, 2); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -1261,92 +768,41 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(1, (int)map.get(1)); assertEquals(2, (int)map.get(2)); } - finally { - qry.close(); - } - } - - /** - * @throws Exception If filter. - */ - public void testUpdateInFilter() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - cache.putx(1, 1); - - CacheProjection<Integer, Integer> prj = cache.projection(new P1<CacheEntry<Integer, Integer>>() { - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - @Override public boolean apply(final CacheEntry<Integer, Integer> e) { - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - e.set(1000); - - return null; - } - }, - CacheFlagException.class, - null - ); - - return true; - } - }); - - CacheContinuousQuery<Integer, Integer> qry = prj.queries().createContinuousQuery(); - - final CountDownLatch latch = new CountDownLatch(1); - - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - latch.countDown(); - - return true; - } - }); - - try { - qry.execute(); - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS); - } - finally { - qry.close(); - } } /** * @throws Exception If failed. */ public void testNodeJoin() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); - final Collection<Map.Entry<Integer, Integer>> all = new ConcurrentLinkedDeque8<>(); + final Collection<CacheEntryEvent<? extends Integer, ? extends Integer>> all = new ConcurrentLinkedDeque8<>(); final CountDownLatch latch = new CountDownLatch(2); - qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - assertEquals(1, entries.size()); + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + int size = 0; - all.addAll(entries); + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + all.add(evt); - latch.countDown(); + size++; + } - return true; + assertEquals(1, size); + + latch.countDown(); } }); - qry.execute(); - - cache.putx(1, 1); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + cache.put(1, 1); - try { startGrid("anotherGrid"); - cache.putx(2, 2); + cache.put(2, 2); assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : all; @@ -1354,8 +810,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } finally { stopGrid("anotherGrid"); - - qry.close(); } } @@ -1363,9 +817,9 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo * @throws Exception If failed. */ public void testCallbackForPreload() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - if (cache.configuration().getCacheMode() == LOCAL) + if (cache.getConfiguration(CacheConfiguration.class).getCacheMode() == LOCAL) return; Map<Integer, Integer> map = new HashMap<>(); @@ -1377,37 +831,28 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo cache.putAll(map); - Ignite ignite = startGrid("anotherGrid"); - - try { - cache = ignite.cache(null); - - CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery(); - - final CountDownLatch latch = new CountDownLatch(1); - final Collection<Integer> keys = new GridConcurrentHashSet<>(); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); - qry.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID nodeId, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - for (Map.Entry<Integer, Integer> e : entries) { - keys.add(e.getKey()); + final CountDownLatch latch = new CountDownLatch(1); + final Collection<Integer> keys = new GridConcurrentHashSet<>(); - if (keys.size() >= keysCnt) - latch.countDown(); - } + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + keys.add(evt.getKey()); - return true; + if (keys.size() >= keysCnt) + latch.countDown(); } - }); + } + }); - qry.execute(); + Ignite ignite = startGrid("anotherGrid"); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = ignite.<Integer, Integer>jcache(null).localQuery(qry)) { assert latch.await(LATCH_TIMEOUT, MILLISECONDS); assertEquals(keysCnt, keys.size()); - - qry.close(); } finally { stopGrid("anotherGrid"); @@ -1475,26 +920,25 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo grid(i).events().localListen(execLsnr, EVT_CACHE_QUERY_EXECUTED); } - GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - try (CacheContinuousQuery<Integer, Integer> qry = cache.queries().createContinuousQuery()) { - qry.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, Integer>>>() { - @Override public boolean apply(UUID uuid, - Collection<CacheContinuousQueryEntry<Integer, Integer>> entries) { - return true; - } - }); + ContinuousQuery<Integer, Integer> qry = Query.continuous(); - qry.remoteFilter(new IgnitePredicate<CacheContinuousQueryEntry<Integer, Integer>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, Integer> e) { - return e.getValue() >= 50; - } - }); + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + // No-op. + } + }); - qry.execute(); + qry.setRemoteFilter(new CacheEntryEventFilter<Integer, Integer>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + return evt.getValue() >= 50; + } + }); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { for (int i = 0; i < 100; i++) - cache.putx(i, i); + cache.put(i, i); assert latch.await(LATCH_TIMEOUT, MILLISECONDS); assert execLatch.await(LATCH_TIMEOUT, MILLISECONDS); @@ -1515,8 +959,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo */ private static class TestStore extends CacheStoreAdapter<Object, Object> { /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, - Object... args) { + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) { for (int i = 0; i < 10; i++) clo.apply(i, i); }