http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java new file mode 100644 index 0000000..69e9523 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +/** + * Continuous query listener. + */ +interface CacheContinuousQueryListener<K, V> { + /** + * Query execution callback. + */ + public void onExecution(); + + /** + * Entry update callback. + * + * @param e Entry. + * @param recordIgniteEvt Whether to record event. + */ + public void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt); + + /** + * Listener unregistered callback. + */ + public void onUnregister(); +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java new file mode 100644 index 0000000..bda52b9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -0,0 +1,619 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import javax.cache.configuration.*; +import javax.cache.event.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static javax.cache.event.EventType.*; +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.GridTopic.*; + +/** + * Continuous queries manager. + */ +public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K, V> { + /** Ordered topic prefix. */ + private String topicPrefix; + + /** Listeners. */ + private final ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrs = new ConcurrentHashMap8<>(); + + /** Listeners count. */ + private final AtomicInteger lsnrCnt = new AtomicInteger(); + + /** Internal entries listeners. */ + private final ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> intLsnrs = new ConcurrentHashMap8<>(); + + /** Internal listeners count. */ + private final AtomicInteger intLsnrCnt = new AtomicInteger(); + + /** Query sequence number for message topic. */ + private final AtomicLong seq = new AtomicLong(); + + /** Continues queries created for cache event listeners. */ + private final ConcurrentMap<CacheEntryListenerConfiguration, CacheContinuousQuery<K, V>> lsnrQrys = + new ConcurrentHashMap8<>(); + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + // Append cache name to the topic. + topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void onKernalStart0() throws IgniteCheckedException { + Iterable<CacheEntryListenerConfiguration<K, V>> lsnrCfgs = cctx.config().getCacheEntryListenerConfigurations(); + + if (lsnrCfgs != null) { + for (CacheEntryListenerConfiguration<K, V> cfg : lsnrCfgs) + registerCacheEntryListener(cfg, false); + } + } + + /** {@inheritDoc} */ + @Override protected void onKernalStop0(boolean cancel) { + super.onKernalStop0(cancel); + + for (CacheEntryListenerConfiguration lsnrCfg : lsnrQrys.keySet()) { + try { + deregisterCacheEntryListener(lsnrCfg); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to remove cache entry listener: " + e); + } + } + } + + /** + * @return New topic. + */ + public Object topic() { + return TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()); + } + + /** + * @param e Cache entry. + * @param key Key. + * @param newVal New value. + * @param newBytes New value bytes. + * @param oldVal Old value. + * @param oldBytes Old value bytes. + * @param preload {@code True} if entry is updated during preloading. + * @throws IgniteCheckedException In case of error. + */ + public void onEntryUpdate(GridCacheEntryEx<K, V> e, + K key, + @Nullable V newVal, + @Nullable GridCacheValueBytes newBytes, + V oldVal, + @Nullable GridCacheValueBytes oldBytes, + boolean preload) throws IgniteCheckedException { + assert e != null; + assert key != null; + + ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol; + + if (e.isInternal()) + lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null; + else + lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null; + + if (F.isEmpty(lsnrCol)) + return; + + oldVal = cctx.unwrapTemporary(oldVal); + + EventType evtType = newVal == null ? REMOVED : + ((oldVal != null || (oldBytes != null && !oldBytes.isNull()) ? UPDATED : CREATED)); + + CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key, newVal, newBytes, oldVal, oldBytes); + + e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader()); + + CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>( + cctx.kernalContext().grid().jcache(cctx.name()), evtType, e0); + + boolean primary = e.wrap(false).primary(); + boolean recordIgniteEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + + for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) { +// if (preload && lsnr.entryListener()) +// continue; + + lsnr.onEntryUpdate(evt, primary, recordIgniteEvt); + } + } + + /** + * @param e Entry. + * @param key Key. + * @param oldVal Old value. + * @param oldBytes Old value bytes. + */ + public void onEntryExpired(GridCacheEntryEx<K, V> e, + K key, + V oldVal, + @Nullable GridCacheValueBytes oldBytes) { + if (e.isInternal()) + return; + + ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrCol = lsnrs; + + if (F.isEmpty(lsnrCol)) + return; + + if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, -1)) { + CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key, null, null, oldVal, oldBytes); + + CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>( + cctx.kernalContext().grid().jcache(cctx.name()), EXPIRED, e0); + + for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) { +// if (!lsnr.entryListener()) +// continue; + + lsnr.onEntryUpdate(evt, true, false); + } + } + } + + /** + * @param lsnrCfg Listener configuration. + * @param addToCfg If {@code true} adds listener configuration to cache configuration. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg, boolean addToCfg) + throws IgniteCheckedException { +// GridCacheContinuousQueryAdapter<K, V> qry = null; +// +// try { +// A.notNull(lsnrCfg, "lsnrCfg"); +// +// Factory<CacheEntryListener<? super K, ? super V>> factory = lsnrCfg.getCacheEntryListenerFactory(); +// +// A.notNull(factory, "cacheEntryListenerFactory"); +// +// CacheEntryListener lsnr = factory.create(); +// +// A.notNull(lsnr, "lsnr"); +// +// IgniteCacheProxy<K, V> cache= cctx.kernalContext().cache().jcache(cctx.name()); +// +// EntryListenerCallback cb = new EntryListenerCallback(cache, lsnr); +// +// if (!(cb.create() || cb.update() || cb.remove() || cb.expire())) +// throw new IllegalArgumentException("Listener must implement one of CacheEntryListener sub-interfaces."); +// +// qry = (GridCacheContinuousQueryAdapter<K, V>)cctx.cache().queries().createContinuousQuery(); +// +// CacheContinuousQuery<K, V> old = lsnrQrys.putIfAbsent(lsnrCfg, qry); +// +// if (old != null) +// throw new IllegalArgumentException("Listener is already registered for configuration: " + lsnrCfg); +// +// qry.autoUnsubscribe(true); +// +// qry.bufferSize(1); +// +//// qry.localCallback(cb); +// +// EntryListenerFilter<K, V> fltr = new EntryListenerFilter<>(cb.create(), +// cb.update(), +// cb.remove(), +// cb.expire(), +// lsnrCfg.getCacheEntryEventFilterFactory(), +// cctx.kernalContext().grid(), +// cctx.name()); +// +//// qry.remoteFilter(fltr); +// +// qry.execute(null, false, true, lsnrCfg.isSynchronous(), lsnrCfg.isOldValueRequired()); +// +// if (addToCfg) +// cctx.config().addCacheEntryListenerConfiguration(lsnrCfg); +// } +// catch (IgniteCheckedException e) { +// lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to execute it. +// +// throw e; +// } + } + + /** + * @param lsnrCfg Listener configuration. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) throws IgniteCheckedException { + A.notNull(lsnrCfg, "lsnrCfg"); + + CacheContinuousQuery<K, V> qry = lsnrQrys.remove(lsnrCfg); + + if (qry != null) { + cctx.config().removeCacheEntryListenerConfiguration(lsnrCfg); + + qry.close(); + } + } + + /** + * @param lsnrId Listener ID. + * @param lsnr Listener. + * @param internal Internal flag. + * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. + * @return Whether listener was actually registered. + */ + boolean registerListener(UUID lsnrId, + CacheContinuousQueryListener<K, V> lsnr, + boolean internal, + boolean entryLsnr) { + boolean added; + + if (internal) { + added = intLsnrs.putIfAbsent(lsnrId, lsnr) == null; + + if (added) + intLsnrCnt.incrementAndGet(); + } + else { + added = lsnrs.putIfAbsent(lsnrId, lsnr) == null; + + if (added) { + lsnrCnt.incrementAndGet(); + + lsnr.onExecution(); + } + } + + return added; + } + + /** + * @param internal Internal flag. + * @param id Listener ID. + */ + void unregisterListener(boolean internal, UUID id) { + CacheContinuousQueryListener<K, V> lsnr; + + if (internal) { + if ((lsnr = intLsnrs.remove(id)) != null) { + intLsnrCnt.decrementAndGet(); + + lsnr.onUnregister(); + } + } + else { + if ((lsnr = lsnrs.remove(id)) != null) { + lsnrCnt.decrementAndGet(); + + lsnr.onUnregister(); + } + } + } + + /** + * + */ + static class EntryListenerFilter<K1, V1> implements + IgnitePredicate<CacheContinuousQueryEntry<K1, V1>>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private boolean create; + + /** */ + private boolean update; + + /** */ + private boolean rmv; + + /** */ + private boolean expire; + + /** */ + private Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory; + + /** */ + private CacheEntryEventFilter fltr; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private IgniteCache cache; + + /** */ + private String cacheName; + + /** + * + */ + public EntryListenerFilter() { + // No-op. + } + + /** + * @param create {@code True} if listens for create events. + * @param update {@code True} if listens for create events. + * @param rmv {@code True} if listens for remove events. + * @param expire {@code True} if listens for expire events. + * @param fltrFactory Filter factory. + * @param ignite Ignite instance. + * @param cacheName Cache name. + */ + EntryListenerFilter( + boolean create, + boolean update, + boolean rmv, + boolean expire, + Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory, + Ignite ignite, + @Nullable String cacheName) { + this.create = create; + this.update = update; + this.rmv = rmv; + this.expire = expire; + this.fltrFactory = fltrFactory; + this.ignite = ignite; + this.cacheName = cacheName; + + if (fltrFactory != null) + fltr = fltrFactory.create(); + + cache = ignite.jcache(cacheName); + + assert cache != null : cacheName; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean apply(CacheContinuousQueryEntry<K1, V1> entry) { + return false; + +// try { +// EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType()); +// +// switch (evtType) { +// case EXPIRED: +// if (!expire) +// return false; +// +// break; +// +// case REMOVED: +// if (!rmv) +// return false; +// +// break; +// +// case CREATED: +// if (!create) +// return false; +// +// break; +// +// case UPDATED: +// if (!update) +// return false; +// +// break; +// +// default: +// assert false : evtType; +// } +// +// if (fltr == null) +// return true; +// +// if (cache == null) { +// cache = ignite.jcache(cacheName); +// +// assert cache != null : cacheName; +// } +// +// return fltr.evaluate(new CacheEntryEvent(cache, evtType, entry)); +// } +// catch (Exception e) { +// LT.warn(ignite.log(), e, "Cache entry event filter error: " + e); +// +// return true; +// } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeBoolean(create); + + out.writeBoolean(update); + + out.writeBoolean(rmv); + + out.writeBoolean(expire); + + U.writeString(out, cacheName); + + out.writeObject(fltrFactory); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + create = in.readBoolean(); + + update = in.readBoolean(); + + rmv = in.readBoolean(); + + expire = in.readBoolean(); + + cacheName = U.readString(in); + + fltrFactory = (Factory<CacheEntryEventFilter<? super K1, ? super V1>>)in.readObject(); + + if (fltrFactory != null) + fltr = fltrFactory.create(); + } + } + + /** + * + */ + private class EntryListenerCallback implements + IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> { + /** */ + private final IgniteCacheProxy<K, V> cache; + + /** */ + private final CacheEntryCreatedListener createLsnr; + + /** */ + private final CacheEntryUpdatedListener updateLsnr; + + /** */ + private final CacheEntryRemovedListener rmvLsnr; + + /** */ + private final CacheEntryExpiredListener expireLsnr; + + /** + * @param cache Cache to be used as event source. + * @param lsnr Listener. + */ + EntryListenerCallback(IgniteCacheProxy<K, V> cache, CacheEntryListener lsnr) { + this.cache = cache; + + createLsnr = lsnr instanceof CacheEntryCreatedListener ? (CacheEntryCreatedListener)lsnr : null; + updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? (CacheEntryUpdatedListener)lsnr : null; + rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? (CacheEntryRemovedListener)lsnr : null; + expireLsnr = lsnr instanceof CacheEntryExpiredListener ? (CacheEntryExpiredListener)lsnr : null; + } + + /** + * @return {@code True} if listens for create event. + */ + boolean create() { + return createLsnr != null; + } + + /** + * @return {@code True} if listens for update event. + */ + boolean update() { + return updateLsnr != null; + } + + /** + * @return {@code True} if listens for remove event. + */ + boolean remove() { + return rmvLsnr != null; + } + + /** + * @return {@code True} if listens for expire event. + */ + boolean expire() { + return expireLsnr != null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean apply(UUID uuid, + Collection<CacheContinuousQueryEntry<K, V>> entries) { +// for (CacheContinuousQueryEntry entry : entries) { +// try { +// EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType()); +// +// switch (evtType) { +// case EXPIRED: { +// assert expireLsnr != null; +// +// CacheEntryEvent evt0 = +// new CacheEntryEvent(cache, EXPIRED, entry); +// +// expireLsnr.onExpired(Collections.singleton(evt0)); +// +// break; +// } +// +// case REMOVED: { +// assert rmvLsnr != null; +// +// CacheEntryEvent evt0 = +// new CacheEntryEvent(cache, REMOVED, entry); +// +// rmvLsnr.onRemoved(Collections.singleton(evt0)); +// +// break; +// } +// +// case UPDATED: { +// assert updateLsnr != null; +// +// CacheEntryEvent evt0 = +// new CacheEntryEvent(cache, UPDATED, entry); +// +// updateLsnr.onUpdated(Collections.singleton(evt0)); +// +// break; +// } +// +// case CREATED: { +// assert createLsnr != null; +// +// CacheEntryEvent evt0 = +// new CacheEntryEvent(cache, CREATED, entry); +// +// createLsnr.onCreated(Collections.singleton(evt0)); +// +// break; +// } +// +// default: +// assert false : evtType; +// } +// } +// catch (CacheEntryListenerException e) { +// LT.warn(log, e, "Cache entry listener error: " + e); +// } +// } + + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java deleted file mode 100644 index acd96ed..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.cluster.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.continuous.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.security.*; -import org.jetbrains.annotations.*; - -import javax.cache.event.*; -import java.util.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.cache.CacheDistributionMode.*; - -/** - * Continuous query implementation. - */ -public class GridCacheContinuousQueryAdapter<K, V> implements CacheContinuousQuery<K, V> { - /** Guard. */ - private final GridBusyLock guard = new GridBusyLock(); - - /** Close lock. */ - private final Lock closeLock = new ReentrantLock(); - - /** Cache context. */ - private final GridCacheContext<K, V> ctx; - - /** Topic for ordered messages. */ - private final Object topic; - - /** Projection predicate */ - private final IgnitePredicate<CacheEntry<K, V>> prjPred; - - /** Keep portable flag. */ - private final boolean keepPortable; - - /** Logger. */ - private final IgniteLogger log; - - /** Local callback. */ - private volatile IgniteBiPredicate<UUID, Collection<Map.Entry<K, V>>> cb; - - /** Local callback. */ - private volatile IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> locCb; - - /** Filter. */ - private volatile IgniteBiPredicate<K, V> filter; - - /** Remote filter. */ - private volatile IgnitePredicate<CacheContinuousQueryEntry<K, V>> rmtFilter; - - /** Buffer size. */ - private volatile int bufSize = DFLT_BUF_SIZE; - - /** Time interval. */ - @SuppressWarnings("RedundantFieldInitialization") - private volatile long timeInterval = DFLT_TIME_INTERVAL; - - /** Automatic unsubscribe flag. */ - private volatile boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE; - - /** Continuous routine ID. */ - private UUID routineId; - - /** - * @param ctx Cache context. - * @param topic Topic for ordered messages. - * @param prjPred Projection predicate. - */ - GridCacheContinuousQueryAdapter(GridCacheContext<K, V> ctx, Object topic, - @Nullable IgnitePredicate<CacheEntry<K, V>> prjPred) { - assert ctx != null; - assert topic != null; - - this.ctx = ctx; - this.topic = topic; - this.prjPred = prjPred; - - keepPortable = ctx.keepPortable(); - - log = ctx.logger(getClass()); - } - - /** {@inheritDoc} */ - @Override public void localCallback(IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> locCb) { - if (!guard.enterBusy()) - throw new IllegalStateException("Continuous query can't be changed after it was executed."); - - try { - this.locCb = locCb; - } - finally { - guard.leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> localCallback() { - return locCb; - } - - /** {@inheritDoc} */ - @Override public void remoteFilter(@Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> rmtFilter) { - if (!guard.enterBusy()) - throw new IllegalStateException("Continuous query can't be changed after it was executed."); - - try { - this.rmtFilter = rmtFilter; - } - finally { - guard.leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public IgnitePredicate<CacheContinuousQueryEntry<K, V>> remoteFilter() { - return rmtFilter; - } - - /** {@inheritDoc} */ - @Override public void bufferSize(int bufSize) { - A.ensure(bufSize > 0, "bufSize > 0"); - - if (!guard.enterBusy()) - throw new IllegalStateException("Continuous query can't be changed after it was executed."); - - try { - this.bufSize = bufSize; - } - finally { - guard.leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public int bufferSize() { - return bufSize; - } - - /** {@inheritDoc} */ - @Override public void timeInterval(long timeInterval) { - A.ensure(timeInterval >= 0, "timeInterval >= 0"); - - if (!guard.enterBusy()) - throw new IllegalStateException("Continuous query can't be changed after it was executed."); - - try { - this.timeInterval = timeInterval; - } - finally { - guard.leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public long timeInterval() { - return timeInterval; - } - - /** {@inheritDoc} */ - @Override public void autoUnsubscribe(boolean autoUnsubscribe) { - this.autoUnsubscribe = autoUnsubscribe; - } - - /** {@inheritDoc} */ - @Override public boolean isAutoUnsubscribe() { - return autoUnsubscribe; - } - - /** {@inheritDoc} */ - @Override public void execute() throws IgniteCheckedException { - execute(null, false, false, false, true); - } - - /** {@inheritDoc} */ - @Override public void execute(@Nullable ClusterGroup prj) throws IgniteCheckedException { - execute(prj, false, false, false, true); - } - - /** - * Starts continuous query execution. - * - * @param prj Grid projection. - * @param internal If {@code true} then query notified about internal entries updates. - * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. - * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. - * @param oldVal {@code True} if old value is required. - * @throws IgniteCheckedException If failed. - */ - public void execute(@Nullable ClusterGroup prj, - boolean internal, - boolean entryLsnr, - boolean sync, - boolean oldVal) throws IgniteCheckedException { - if (locCb == null) - throw new IllegalStateException("Mandatory local callback is not set for the query: " + this); - - ctx.checkSecurity(GridSecurityPermission.CACHE_READ); - - if (prj == null) - prj = ctx.grid(); - - prj = prj.forCacheNodes(ctx.name()); - - if (prj.nodes().isEmpty()) - throw new ClusterTopologyCheckedException("Failed to continuous execute query (projection is empty): " + - this); - - boolean skipPrimaryCheck = false; - - Collection<ClusterNode> nodes = prj.nodes(); - - if (nodes.isEmpty()) - throw new ClusterTopologyCheckedException("Failed to execute continuous query (empty projection is " + - "provided): " + this); - - switch (ctx.config().getCacheMode()) { - case LOCAL: - if (!nodes.contains(ctx.localNode())) - throw new ClusterTopologyCheckedException("Continuous query for LOCAL cache can be executed " + - "only locally (provided projection contains remote nodes only): " + this); - else if (nodes.size() > 1) - U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " + - "ignored): " + this); - - prj = prj.forNode(ctx.localNode()); - - break; - - case REPLICATED: - if (nodes.size() == 1 && F.first(nodes).equals(ctx.localNode())) { - CacheDistributionMode distributionMode = ctx.config().getDistributionMode(); - - if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED) - skipPrimaryCheck = true; - } - - break; - } - - closeLock.lock(); - - try { - if (routineId != null) - throw new IllegalStateException("Continuous query can't be executed twice."); - - guard.block(); - - int taskNameHash = - ctx.kernalContext().security().enabled() ? ctx.kernalContext().job().currentTaskNameHash() : 0; - - GridContinuousHandler hnd = new GridCacheContinuousQueryHandler<>(ctx.name(), - topic, - locCb, - rmtFilter, - prjPred, - internal, - entryLsnr, - sync, - oldVal, - skipPrimaryCheck, - taskNameHash, - keepPortable); - - routineId = ctx.kernalContext().continuous().startRoutine(hnd, - bufSize, - timeInterval, - autoUnsubscribe, - prj.predicate()).get(); - } - finally { - closeLock.unlock(); - } - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - closeLock.lock(); - - try { - if (routineId != null) - ctx.kernalContext().continuous().stopRoutine(routineId).get(); - } - finally { - closeLock.unlock(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheContinuousQueryAdapter.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java deleted file mode 100644 index 6ea0ebf..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryEntry.java +++ /dev/null @@ -1,766 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.jetbrains.annotations.*; - -import javax.cache.event.*; -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.processors.cache.CacheFlag.*; -import static org.apache.ignite.internal.processors.cache.GridCacheValueBytes.*; - -/** - * Entry implementation. - */ -@SuppressWarnings("TypeParameterHidesVisibleType") -public class GridCacheContinuousQueryEntry<K, V> implements CacheEntry<K, V>, GridCacheDeployable, Externalizable, - CacheContinuousQueryEntry<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Event type enum values. */ - private static final EventType[] EVT_TYPE_VALS = EventType.values(); - - /** Cache context. */ - @SuppressWarnings("TransientFieldNotInitialized") - @GridToStringExclude - private final transient GridCacheContext<K, V> ctx; - - /** Cache entry. */ - @SuppressWarnings("TransientFieldNotInitialized") - @GridToStringExclude - private final transient CacheEntry<K, V> impl; - - /** Key. */ - @GridToStringInclude - private K key; - - /** New value. */ - @GridToStringInclude - private V newVal; - - /** Old value. */ - @GridToStringInclude - private V oldVal; - - /** Serialized key. */ - private byte[] keyBytes; - - /** Serialized value. */ - @GridToStringExclude - private GridCacheValueBytes newValBytes; - - /** Serialized value. */ - @GridToStringExclude - private GridCacheValueBytes oldValBytes; - - /** Cache name. */ - private String cacheName; - - /** Deployment info. */ - @GridToStringExclude - private GridDeploymentInfo depInfo; - - /** */ - private EventType evtType; - - /** - * Required by {@link Externalizable}. - */ - public GridCacheContinuousQueryEntry() { - ctx = null; - impl = null; - } - - /** - * @param ctx Cache context. - * @param impl Cache entry. - * @param key Key. - * @param newVal Value. - * @param newValBytes Value bytes. - * @param oldVal Old value. - * @param oldValBytes Old value bytes. - * @param evtType Event type. - */ - GridCacheContinuousQueryEntry(GridCacheContext<K, V> ctx, - CacheEntry<K, V> impl, - K key, - @Nullable V newVal, - @Nullable GridCacheValueBytes newValBytes, - @Nullable V oldVal, - @Nullable GridCacheValueBytes oldValBytes, - EventType evtType) { - assert ctx != null; - assert impl != null; - assert key != null; - assert evtType != null; - - this.ctx = ctx; - this.impl = impl; - this.key = key; - this.newVal = newVal; - this.newValBytes = newValBytes; - this.oldVal = oldVal; - this.oldValBytes = oldValBytes; - this.evtType = evtType; - } - - /** - * @return Cache entry. - */ - CacheEntry<K, V> entry() { - return impl; - } - - /** - * @return Cache context. - */ - GridCacheContext<K, V> context() { - return ctx; - } - - /** - * @return New value bytes. - */ - GridCacheValueBytes newValueBytes() { - return newValBytes; - } - - /** - * @return {@code True} if old value is set. - */ - boolean hasOldValue() { - return oldVal != null || (oldValBytes != null && !oldValBytes.isNull()); - } - - /** - * @return {@code True} if entry expired. - */ - public EventType eventType() { - return evtType; - } - - /** - * Unmarshals value from bytes if needed. - * - * @param marsh Marshaller. - * @param ldr Class loader. - * @throws IgniteCheckedException In case of error. - */ - void initValue(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - assert marsh != null; - - if (newVal == null && newValBytes != null && !newValBytes.isNull()) - newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr); - } - - /** - * @return Cache name. - */ - String cacheName() { - return cacheName; - } - - /** - * @param cacheName New cache name. - */ - void cacheName(String cacheName) { - this.cacheName = cacheName; - } - - /** - * @param marsh Marshaller. - * @throws IgniteCheckedException In case of error. - */ - void p2pMarshal(Marshaller marsh) throws IgniteCheckedException { - assert marsh != null; - - assert key != null; - - keyBytes = marsh.marshal(key); - - if (newValBytes == null || newValBytes.isNull()) - newValBytes = newVal != null ? - newVal instanceof byte[] ? plain(newVal) : marshaled(marsh.marshal(newVal)) : null; - - if (oldValBytes == null || oldValBytes.isNull()) - oldValBytes = oldVal != null ? - oldVal instanceof byte[] ? plain(oldVal) : marshaled(marsh.marshal(oldVal)) : null; - } - - /** - * @param marsh Marshaller. - * @param ldr Class loader. - * @throws IgniteCheckedException In case of error. - */ - void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - assert marsh != null; - - assert key == null : "Key should be null: " + key; - assert newVal == null : "New value should be null: " + newVal; - assert oldVal == null : "Old value should be null: " + oldVal; - assert keyBytes != null; - - key = marsh.unmarshal(keyBytes, ldr); - - if (newValBytes != null && !newValBytes.isNull()) - newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr); - - if (oldValBytes != null && !oldValBytes.isNull()) - oldVal = oldValBytes.isPlain() ? (V)oldValBytes.get() : marsh.<V>unmarshal(oldValBytes.get(), ldr); - } - - /** {@inheritDoc} */ - @Override public CacheProjection<K, V> projection() { - return impl.projection(); - } - - /** {@inheritDoc} */ - @Override public K getKey() { - return key; - } - - /** {@inheritDoc} */ - @Override public V getValue() { - return newVal; - } - - /** {@inheritDoc} */ - @Override public V getOldValue() { - return oldVal; - } - - /** {@inheritDoc} */ - @Override public V setValue(V val) { - ctx.denyOnFlag(READ); - - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public V peek() { - assert impl != null; - - return newVal; - } - - /** {@inheritDoc} */ - @Nullable @Override public V peek(@Nullable Collection<GridCachePeekMode> modes) throws IgniteCheckedException { - assert impl != null; - - return impl.peek(modes); - } - - /** {@inheritDoc} */ - @Nullable @Override public V get() - throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(LOCAL); - - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> getAsync() { - assert impl != null; - - ctx.denyOnFlag(LOCAL); - - return new GridFinishedFuture<>(ctx.kernalContext()); - } - - /** {@inheritDoc} */ - @Nullable @Override public V reload() throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> reloadAsync() { - assert impl != null; - - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext()); - } - - /** {@inheritDoc} */ - @Override public boolean isLocked() { - assert impl != null; - - return impl.isLocked(); - } - - /** {@inheritDoc} */ - @Override public boolean isLockedByThread() { - assert impl != null; - - return impl.isLockedByThread(); - } - - /** {@inheritDoc} */ - @Override public Object version() { - assert impl != null; - - return impl.version(); - } - - /** {@inheritDoc} */ - @Override public long expirationTime() { - assert impl != null; - - return impl.expirationTime(); - } - - /** {@inheritDoc} */ - @Override public long timeToLive() { - assert impl != null; - - return impl.timeToLive(); - } - - /** {@inheritDoc} */ - @Override public void timeToLive(long ttl) { - assert impl != null; - - impl.timeToLive(ttl); - } - - /** {@inheritDoc} */ - @Override public boolean primary() { - assert impl != null; - - return impl.primary(); - } - - /** {@inheritDoc} */ - @Override public boolean backup() { - assert impl != null; - - return impl.backup(); - } - - /** {@inheritDoc} */ - @Override public int partition() { - assert impl != null; - - return impl.partition(); - } - - /** {@inheritDoc} */ - @Nullable @Override public V set(V val, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) - throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> setAsync(V val, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { - assert impl != null; - - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext()); - } - - /** {@inheritDoc} */ - @Nullable @Override public V setIfAbsent(V val) throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> setIfAbsentAsync(V val) { - assert impl != null; - - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext()); - } - - /** {@inheritDoc} */ - @Override public boolean setx(V val, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) - throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return false; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> setxAsync(V val, - @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { - assert impl != null; - - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext(), false); - } - - /** {@inheritDoc} */ - @Override public boolean setxIfAbsent(@Nullable V val) throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return false; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> setxIfAbsentAsync(V val) { - assert impl != null; - - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext(), false); - } - - /** {@inheritDoc} */ - @Nullable @Override public V replace(V val) throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> replaceAsync(V val) { - assert impl != null; - - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext()); - } - - /** {@inheritDoc} */ - @Override public boolean replacex(V val) throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return false; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> replacexAsync(V val) { - assert impl != null; - - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext(), false); - } - - /** {@inheritDoc} */ - @Override public boolean replace(V oldVal, V newVal) throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return false; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> replaceAsync(V oldVal, V newVal) { - assert impl != null; - - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext(), false); - } - - /** {@inheritDoc} */ - @Nullable @Override public V remove(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) - throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> removeAsync(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { - assert impl != null; - - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext()); - } - - /** {@inheritDoc} */ - @Override public boolean removex(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return false; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> removexAsync(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { - assert impl != null; - - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext(), false); - } - - /** {@inheritDoc} */ - @Override public boolean remove(V val) throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return false; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> removeAsync(V val) { - assert impl != null; - - ctx.denyOnFlag(READ); - - return new GridFinishedFuture<>(ctx.kernalContext(), false); - } - - /** {@inheritDoc} */ - @Override public boolean evict() { - assert impl != null; - - ctx.denyOnFlag(READ); - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean clear() { - assert impl != null; - - ctx.denyOnFlag(READ); - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean compact() - throws IgniteCheckedException { - assert impl != null; - - ctx.denyOnFlag(READ); - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean lock(long timeout, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) - throws IgniteCheckedException { - assert impl != null; - - return impl.lock(timeout, filter); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> lockAsync(long timeout, - @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { - assert impl != null; - - return impl.lockAsync(timeout, filter); - } - - /** {@inheritDoc} */ - @Override public void unlock(IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException { - assert impl != null; - - impl.unlock(filter); - } - - /** {@inheritDoc} */ - @Override public boolean isCached() { - assert impl != null; - - return impl.isCached(); - } - - /** {@inheritDoc} */ - @Override public int memorySize() throws IgniteCheckedException { - assert impl != null; - - return impl.memorySize(); - } - - /** {@inheritDoc} */ - @Nullable @Override public <V> V addMeta(String name, V val) { - assert impl != null; - - return impl.addMeta(name, val); - } - - /** {@inheritDoc} */ - @Nullable @Override public <V> V putMetaIfAbsent(String name, V val) { - assert impl != null; - - return impl.putMetaIfAbsent(name, val); - } - - /** {@inheritDoc} */ - @Nullable @Override public <V> V putMetaIfAbsent(String name, Callable<V> c) { - assert impl != null; - - return impl.putMetaIfAbsent(name, c); - } - - /** {@inheritDoc} */ - @Override public <V> V meta(String name) { - assert impl != null; - - return impl.meta(name); - } - - /** {@inheritDoc} */ - @Override public <V> V removeMeta(String name) { - assert impl != null; - - return impl.removeMeta(name); - } - - /** {@inheritDoc} */ - @Override public <V> boolean removeMeta(String name, V val) { - assert impl != null; - - return impl.removeMeta(name, val); - } - - /** {@inheritDoc} */ - @Override public <V> boolean replaceMeta(String name, V curVal, V newVal) { - assert impl != null; - - return impl.replaceMeta(name, curVal, newVal); - } - - /** {@inheritDoc} */ - @Override public void prepare(GridDeploymentInfo depInfo) { - this.depInfo = depInfo; - } - - /** {@inheritDoc} */ - @Override public GridDeploymentInfo deployInfo() { - return depInfo; - } - - /** {@inheritDoc} */ - @Override public <T> T unwrap(Class<T> clazz) { - if(clazz.isAssignableFrom(getClass())) - return clazz.cast(this); - - throw new IllegalArgumentException(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - boolean b = keyBytes != null; - - out.writeBoolean(b); - - if (b) { - U.writeByteArray(out, keyBytes); - - if (newValBytes != null && !newValBytes.isNull()) { - out.writeBoolean(true); - out.writeBoolean(newValBytes.isPlain()); - U.writeByteArray(out, newValBytes.get()); - } - else - out.writeBoolean(false); - - if (oldValBytes != null && !oldValBytes.isNull()) { - out.writeBoolean(true); - out.writeBoolean(oldValBytes.isPlain()); - U.writeByteArray(out, oldValBytes.get()); - } - else - out.writeBoolean(false); - - U.writeString(out, cacheName); - out.writeObject(depInfo); - } - else { - out.writeObject(key); - out.writeObject(newVal); - out.writeObject(oldVal); - } - - out.writeByte((byte)evtType.ordinal()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - boolean b = in.readBoolean(); - - if (b) { - keyBytes = U.readByteArray(in); - - if (in.readBoolean()) - newValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in)); - - if (in.readBoolean()) - oldValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in)); - - cacheName = U.readString(in); - depInfo = (GridDeploymentInfo)in.readObject(); - } - else { - key = (K)in.readObject(); - newVal = (V)in.readObject(); - oldVal = (V)in.readObject(); - } - - evtType = EVT_TYPE_VALS[in.readByte()]; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheContinuousQueryEntry.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java deleted file mode 100644 index 7b0615d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryFilterEx.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - - -import org.apache.ignite.cache.query.*; -import org.apache.ignite.lang.*; - -/** - * Extended continuous query filter. - */ -public interface GridCacheContinuousQueryFilterEx<K, V> extends - IgnitePredicate<CacheContinuousQueryEntry<K, V>> { - /** - * Callback for query unregister event. - */ - public void onQueryUnregister(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java deleted file mode 100644 index 7f067aa..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java +++ /dev/null @@ -1,571 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.query.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.continuous.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import javax.cache.event.*; -import java.io.*; -import java.util.*; - -import static org.apache.ignite.events.EventType.*; - -/** - * Continuous query handler. - */ -class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler { - /** */ - private static final long serialVersionUID = 0L; - - /** Cache name. */ - private String cacheName; - - /** Topic for ordered messages. */ - private Object topic; - - /** Local callback. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> cb; - - /** Filter. */ - private IgnitePredicate<CacheContinuousQueryEntry<K, V>> filter; - - /** Projection predicate */ - private IgnitePredicate<CacheEntry<K, V>> prjPred; - - /** Deployable object for filter. */ - private DeployableObject filterDep; - - /** Deployable object for Projection predicate. */ - private DeployableObject prjPredDep; - - /** Internal flag. */ - private boolean internal; - - /** Entry listener flag. */ - private boolean entryLsnr; - - /** Synchronous listener flag. */ - private boolean sync; - - /** {@code True} if old value is required. */ - private boolean oldVal; - - /** Task name hash code. */ - private int taskHash; - - /** Keep portable flag. */ - private boolean keepPortable; - - /** Whether to skip primary check for REPLICATED cache. */ - private transient boolean skipPrimaryCheck; - - /** - * Required by {@link Externalizable}. - */ - public GridCacheContinuousQueryHandler() { - // No-op. - } - - /** - * Constructor. - * - * @param cacheName Cache name. - * @param topic Topic for ordered messages. - * @param cb Local callback. - * @param filter Filter. - * @param prjPred Projection predicate. - * @param internal If {@code true} then query is notified about internal entries updates. - * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. - * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. - * @param oldVal {@code True} if old value is required. - * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. - * @param taskHash Task name hash code. - */ - GridCacheContinuousQueryHandler(@Nullable String cacheName, - Object topic, - IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> cb, - @Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> filter, - @Nullable IgnitePredicate<CacheEntry<K, V>> prjPred, - boolean internal, - boolean entryLsnr, - boolean sync, - boolean oldVal, - boolean skipPrimaryCheck, - int taskHash, - boolean keepPortable) { - assert topic != null; - assert cb != null; - assert !sync || entryLsnr; - - this.cacheName = cacheName; - this.topic = topic; - this.cb = cb; - this.filter = filter; - this.prjPred = prjPred; - this.internal = internal; - this.entryLsnr = entryLsnr; - this.sync = sync; - this.oldVal = oldVal; - this.taskHash = taskHash; - this.keepPortable = keepPortable; - this.skipPrimaryCheck = skipPrimaryCheck; - } - - /** {@inheritDoc} */ - @Override public boolean isForEvents() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isForMessaging() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isForQuery() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) - throws IgniteCheckedException { - assert nodeId != null; - assert routineId != null; - assert ctx != null; - - if (cb != null) - ctx.resource().injectGeneric(cb); - - if (filter != null) - ctx.resource().injectGeneric(filter); - - final boolean loc = nodeId.equals(ctx.localNodeId()); - - GridCacheContinuousQueryListener<K, V> lsnr = new GridCacheContinuousQueryListener<K, V>() { - @Override public void onExecution() { - if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { - ctx.event().record(new CacheQueryExecutedEvent<>( - ctx.discovery().localNode(), - "Continuous query executed.", - EVT_CACHE_QUERY_EXECUTED, - CacheQueryType.CONTINUOUS, - cacheName, - null, - null, - null, - filter, - null, - nodeId, - taskName() - )); - } - } - - @Override public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) { - GridCacheContext<K, V> cctx = cacheContext(ctx); - - if (cctx.isReplicated() && !skipPrimaryCheck && !e.primary()) - return; - - boolean notify; - - CacheFlag[] f = cctx.forceLocalRead(); - - try { - notify = (prjPred == null || checkProjection(e)) && - (filter == null || filter.apply(e)); - } - finally { - cctx.forceFlags(f); - } - - if (notify) { - if (!oldVal && e.hasOldValue()) { - e = new GridCacheContinuousQueryEntry<>(e.context(), - e.entry(), - e.getKey(), - e.getValue(), - e.newValueBytes(), - null, - null, - e.eventType()); - } - - if (loc) { - if (!cb.apply(nodeId, - F.<CacheContinuousQueryEntry<K, V>>asList(e))) - ctx.continuous().stopRoutine(routineId); - } - else { - try { - ClusterNode node = ctx.discovery().node(nodeId); - - if (ctx.config().isPeerClassLoadingEnabled() && node != null && - U.hasCache(node, cacheName)) { - e.p2pMarshal(ctx.config().getMarshaller()); - - e.cacheName(cacheName); - - GridCacheDeploymentManager depMgr = - ctx.cache().internalCache(cacheName).context().deploy(); - - depMgr.prepare(e); - } - - ctx.continuous().addNotification(nodeId, routineId, e, topic, sync); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); - } - } - - if (!entryLsnr && recordEvt) { - ctx.event().record(new CacheQueryReadEvent<>( - ctx.discovery().localNode(), - "Continuous query executed.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.CONTINUOUS, - cacheName, - null, - null, - null, - filter, - null, - nodeId, - taskName(), - e.getKey(), - e.getValue(), - e.getOldValue(), - null - )); - } - } - } - - /** {@inheritDoc} */ - @Override public void onUnregister() { - if (filter != null && filter instanceof GridCacheContinuousQueryFilterEx) - ((GridCacheContinuousQueryFilterEx)filter).onQueryUnregister(); - } - - private boolean checkProjection(GridCacheContinuousQueryEntry<K, V> e) { - GridCacheProjectionImpl.FullFilter<K, V> filter = (GridCacheProjectionImpl.FullFilter<K, V>)prjPred; - - GridCacheProjectionImpl.KeyValueFilter<K, V> kvFilter = filter.keyValueFilter(); - IgnitePredicate<? super CacheEntry<K, V>> entryFilter = filter.entryFilter(); - - boolean ret = true; - - if (kvFilter != null) { - V v = e.getValue() == null ? e.getOldValue() : e.getValue(); - - ret = v != null && kvFilter.apply(e.getKey(), v); - } - - if (entryFilter != null) - ret = ret && entryFilter.apply(e); - - return ret; - } - - @Nullable private String taskName() { - return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; - } - }; - - return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr); - } - - /** {@inheritDoc} */ - @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { - if (!entryLsnr) - manager(ctx).iterate(internal, routineId, keepPortable); - } - - /** {@inheritDoc} */ - @Override public void unregister(UUID routineId, GridKernalContext ctx) { - assert routineId != null; - assert ctx != null; - - manager(ctx).unregisterListener(internal, routineId); - } - - /** - * @param ctx Kernal context. - * @return Continuous query manager. - */ - private GridCacheContinuousQueryManager<K, V> manager(GridKernalContext ctx) { - return cacheContext(ctx).continuousQueries(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) { - assert nodeId != null; - assert routineId != null; - assert objs != null; - assert ctx != null; - - Collection<CacheContinuousQueryEntry<K, V>> entries = - (Collection<CacheContinuousQueryEntry<K, V>>)objs; - - if (ctx.config().isPeerClassLoadingEnabled()) { - for (Map.Entry<K, V> e : entries) { - assert e instanceof GridCacheContinuousQueryEntry; - - GridCacheContinuousQueryEntry<K, V> qe = (GridCacheContinuousQueryEntry<K, V>)e; - - GridCacheAdapter cache = ctx.cache().internalCache(qe.cacheName()); - - ClassLoader ldr = null; - - if (cache != null) { - GridCacheDeploymentManager depMgr = cache.context().deploy(); - - GridDeploymentInfo depInfo = qe.deployInfo(); - - if (depInfo != null) { - depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), - depInfo.participants(), depInfo.localDeploymentOwner()); - } - - ldr = depMgr.globalLoader(); - } - else { - U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " + - "when peer class loading is enabled: " + qe.cacheName() + ". Will try to unmarshal " + - "with default class loader."); - } - - try { - qe.p2pUnmarshal(ctx.config().getMarshaller(), ldr); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); - } - } - } - - if (!cb.apply(nodeId, entries)) - ctx.continuous().stopRoutine(routineId); - } - - /** {@inheritDoc} */ - @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { - assert ctx != null; - assert ctx.config().isPeerClassLoadingEnabled(); - - if (filter != null && !U.isGrid(filter.getClass())) - filterDep = new DeployableObject(filter, ctx); - - if (prjPred != null && !U.isGrid(prjPred.getClass())) - prjPredDep = new DeployableObject(prjPred, ctx); - } - - /** {@inheritDoc} */ - @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { - assert nodeId != null; - assert ctx != null; - assert ctx.config().isPeerClassLoadingEnabled(); - - if (filterDep != null) - filter = filterDep.unmarshal(nodeId, ctx); - - if (prjPredDep != null) - prjPred = prjPredDep.unmarshal(nodeId, ctx); - } - - /** {@inheritDoc} */ - @Nullable @Override public Object orderedTopic() { - return topic; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeObject(topic); - - boolean b = filterDep != null; - - out.writeBoolean(b); - - if (b) - out.writeObject(filterDep); - else - out.writeObject(filter); - - b = prjPredDep != null; - - out.writeBoolean(b); - - if (b) - out.writeObject(prjPredDep); - else - out.writeObject(prjPred); - - out.writeBoolean(internal); - - out.writeBoolean(entryLsnr); - - out.writeBoolean(sync); - - out.writeBoolean(oldVal); - - out.writeInt(taskHash); - - out.writeBoolean(keepPortable); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - topic = in.readObject(); - - boolean b = in.readBoolean(); - - if (b) - filterDep = (DeployableObject)in.readObject(); - else - filter = (IgnitePredicate<CacheContinuousQueryEntry<K,V>>)in.readObject(); - - b = in.readBoolean(); - - if (b) - prjPredDep = (DeployableObject)in.readObject(); - else - prjPred = (IgnitePredicate<CacheEntry<K, V>>)in.readObject(); - - internal = in.readBoolean(); - - entryLsnr = in.readBoolean(); - - sync = in.readBoolean(); - - oldVal = in.readBoolean(); - - taskHash = in.readInt(); - - keepPortable = in.readBoolean(); - } - - /** - * @param ctx Kernal context. - * @return Cache context. - */ - private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) { - assert ctx != null; - - return ctx.cache().<K, V>internalCache(cacheName).context(); - } - - /** - * Deployable object. - */ - private static class DeployableObject implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Serialized object. */ - private byte[] bytes; - - /** Deployment class name. */ - private String clsName; - - /** Deployment info. */ - private GridDeploymentInfo depInfo; - - /** - * Required by {@link Externalizable}. - */ - public DeployableObject() { - // No-op. - } - - /** - * @param obj Object. - * @param ctx Kernal context. - * @throws IgniteCheckedException In case of error. - */ - private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException { - assert obj != null; - assert ctx != null; - - Class cls = U.detectClass(obj); - - clsName = cls.getName(); - - GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); - - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj); - - depInfo = new GridDeploymentInfoBean(dep); - - bytes = ctx.config().getMarshaller().marshal(obj); - } - - /** - * @param nodeId Node ID. - * @param ctx Kernal context. - * @return Deserialized object. - * @throws IgniteCheckedException In case of error. - */ - <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { - assert ctx != null; - - GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, - depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); - - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); - - return ctx.config().getMarshaller().unmarshal(bytes, dep.classLoader()); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeByteArray(out, bytes); - U.writeString(out, clsName); - out.writeObject(depInfo); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - bytes = U.readByteArray(in); - clsName = U.readString(in); - depInfo = (GridDeploymentInfo)in.readObject(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java deleted file mode 100644 index 7bea056..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryListener.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -/** - * Continuous query listener. - */ -interface GridCacheContinuousQueryListener<K, V> { - /** - * Query execution callback. - */ - public void onExecution(); - - /** - * Entry update callback. - * - * @param e Entry. - * @param recordEvt Whether to record event. - */ - public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt); - - /** - * Listener unregistered callback. - */ - public void onUnregister(); -}