http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java new file mode 100644 index 0000000..ecdfc2c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -0,0 +1,1090 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cache.CachePartialUpdateException; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.cache.CacheOperationContext; +import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; +import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor; +import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor; +import org.apache.ignite.internal.processors.platform.PlatformNativeException; +import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.util.GridConcurrentFactory; +import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.lang.IgniteFuture; +import org.jetbrains.annotations.Nullable; + +import javax.cache.Cache; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; + +/** + * Native cache wrapper implementation. + */ +@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"}) +public class PlatformCache extends PlatformAbstractTarget { + /** */ + public static final int OP_CLEAR = 1; + + /** */ + public static final int OP_CLEAR_ALL = 2; + + /** */ + public static final int OP_CONTAINS_KEY = 3; + + /** */ + public static final int OP_CONTAINS_KEYS = 4; + + /** */ + public static final int OP_GET = 5; + + /** */ + public static final int OP_GET_ALL = 6; + + /** */ + public static final int OP_GET_AND_PUT = 7; + + /** */ + public static final int OP_GET_AND_PUT_IF_ABSENT = 8; + + /** */ + public static final int OP_GET_AND_REMOVE = 9; + + /** */ + public static final int OP_GET_AND_REPLACE = 10; + + /** */ + public static final int OP_GET_NAME = 11; + + /** */ + public static final int OP_INVOKE = 12; + + /** */ + public static final int OP_INVOKE_ALL = 13; + + /** */ + public static final int OP_IS_LOCAL_LOCKED = 14; + + /** */ + public static final int OP_LOAD_CACHE = 15; + + /** */ + public static final int OP_LOC_EVICT = 16; + + /** */ + public static final int OP_LOC_LOAD_CACHE = 17; + + /** */ + public static final int OP_LOC_PROMOTE = 18; + + /** */ + public static final int OP_LOCAL_CLEAR = 20; + + /** */ + public static final int OP_LOCAL_CLEAR_ALL = 21; + + /** */ + public static final int OP_LOCK = 22; + + /** */ + public static final int OP_LOCK_ALL = 23; + + /** */ + public static final int OP_METRICS = 24; + + /** */ + private static final int OP_PEEK = 25; + + /** */ + private static final int OP_PUT = 26; + + /** */ + private static final int OP_PUT_ALL = 27; + + /** */ + public static final int OP_PUT_IF_ABSENT = 28; + + /** */ + public static final int OP_QRY_CONTINUOUS = 29; + + /** */ + public static final int OP_QRY_SCAN = 30; + + /** */ + public static final int OP_QRY_SQL = 31; + + /** */ + public static final int OP_QRY_SQL_FIELDS = 32; + + /** */ + public static final int OP_QRY_TXT = 33; + + /** */ + public static final int OP_REMOVE_ALL = 34; + + /** */ + public static final int OP_REMOVE_BOOL = 35; + + /** */ + public static final int OP_REMOVE_OBJ = 36; + + /** */ + public static final int OP_REPLACE_2 = 37; + + /** */ + public static final int OP_REPLACE_3 = 38; + + /** Underlying JCache. */ + private final IgniteCacheProxy cache; + + /** Whether this cache is created with "keepPortable" flag on the other side. */ + private final boolean keepPortable; + + /** */ + private static final GetAllWriter WRITER_GET_ALL = new GetAllWriter(); + + /** */ + private static final EntryProcessorInvokeWriter WRITER_INVOKE = new EntryProcessorInvokeWriter(); + + /** */ + private static final EntryProcessorInvokeAllWriter WRITER_INVOKE_ALL = new EntryProcessorInvokeAllWriter(); + + /** Map with currently active locks. */ + private final ConcurrentMap<Long, Lock> lockMap = GridConcurrentFactory.newMap(); + + /** Lock ID sequence. */ + private static final AtomicLong LOCK_ID_GEN = new AtomicLong(); + + /** + * Constructor. + * + * @param platformCtx Context. + * @param cache Underlying cache. + * @param keepPortable Keep portable flag. + */ + public PlatformCache(PlatformContext platformCtx, IgniteCache cache, boolean keepPortable) { + super(platformCtx); + + this.cache = (IgniteCacheProxy)cache; + this.keepPortable = keepPortable; + } + + /** + * Gets cache with "skip-store" flag set. + * + * @return Cache with "skip-store" flag set. + */ + public PlatformCache withSkipStore() { + if (cache.delegate().skipStore()) + return this; + + return new PlatformCache(platformCtx, cache.withSkipStore(), keepPortable); + } + + /** + * Gets cache with "keep portable" flag. + * + * @return Cache with "keep portable" flag set. + */ + public PlatformCache withKeepPortable() { + if (keepPortable) + return this; + + return new PlatformCache(platformCtx, cache.withSkipStore(), true); + } + + /** + * Gets cache with provided expiry policy. + * + * @param create Create. + * @param update Update. + * @param access Access. + * @return Cache. + */ + public PlatformCache withExpiryPolicy(final long create, final long update, final long access) { + IgniteCache cache0 = cache.withExpiryPolicy(new InteropExpiryPolicy(create, update, access)); + + return new PlatformCache(platformCtx, cache0, keepPortable); + } + + /** + * Gets cache with asynchronous mode enabled. + * + * @return Cache with asynchronous mode enabled. + */ + public PlatformCache withAsync() { + if (cache.isAsync()) + return this; + + return new PlatformCache(platformCtx, (IgniteCache)cache.withAsync(), keepPortable); + } + + /** + * Gets cache with no-retries mode enabled. + * + * @return Cache with no-retries mode enabled. + */ + public PlatformCache withNoRetries() { + CacheOperationContext opCtx = cache.operationContext(); + + if (opCtx != null && opCtx.noRetries()) + return this; + + return new PlatformCache(platformCtx, cache.withNoRetries(), keepPortable); + } + + /** {@inheritDoc} */ + @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case OP_PUT: + cache.put(reader.readObjectDetached(), reader.readObjectDetached()); + + return TRUE; + + case OP_REMOVE_BOOL: + return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; + + case OP_REMOVE_ALL: + cache.removeAll(PlatformUtils.readSet(reader)); + + return TRUE; + + case OP_PUT_ALL: + cache.putAll(PlatformUtils.readMap(reader)); + + return TRUE; + + case OP_LOC_EVICT: + cache.localEvict(PlatformUtils.readCollection(reader)); + + return TRUE; + + case OP_CONTAINS_KEY: + return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE; + + case OP_CONTAINS_KEYS: + return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE; + + case OP_LOC_PROMOTE: { + cache.localPromote(PlatformUtils.readSet(reader)); + + break; + } + + case OP_REPLACE_3: + return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(), + reader.readObjectDetached()) ? TRUE : FALSE; + + case OP_LOC_LOAD_CACHE: + loadCache0(reader, true); + + break; + + case OP_LOAD_CACHE: + loadCache0(reader, false); + + break; + + case OP_CLEAR: + cache.clear(reader.readObjectDetached()); + + break; + + case OP_CLEAR_ALL: + cache.clearAll(PlatformUtils.readSet(reader)); + + break; + + case OP_LOCAL_CLEAR: + cache.localClear(reader.readObjectDetached()); + + break; + + case OP_LOCAL_CLEAR_ALL: + cache.localClearAll(PlatformUtils.readSet(reader)); + + break; + + case OP_PUT_IF_ABSENT: { + return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; + } + + case OP_REPLACE_2: { + return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; + } + + case OP_REMOVE_OBJ: { + return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE; + } + + case OP_IS_LOCAL_LOCKED: + return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE; + + default: + return super.processInStreamOutLong(type, reader); + } + + return TRUE; + } + + /** + * Loads cache via localLoadCache or loadCache. + */ + private void loadCache0(PortableRawReaderEx reader, boolean loc) throws IgniteCheckedException { + PlatformCacheEntryFilter filter = null; + + Object pred = reader.readObjectDetached(); + + if (pred != null) + filter = platformCtx.createCacheEntryFilter(pred, reader.readLong()); + + Object[] args = reader.readObjectArray(); + + if (loc) + cache.localLoadCache(filter, args); + else + cache.loadCache(filter, args); + } + + /** {@inheritDoc} */ + @Override protected Object processInStreamOutObject(int type, PortableRawReaderEx reader) + throws IgniteCheckedException { + switch (type) { + case OP_QRY_SQL: + return runQuery(reader, readSqlQuery(reader)); + + case OP_QRY_SQL_FIELDS: + return runFieldsQuery(reader, readFieldsQuery(reader)); + + case OP_QRY_TXT: + return runQuery(reader, readTextQuery(reader)); + + case OP_QRY_SCAN: + return runQuery(reader, readScanQuery(reader)); + + case OP_QRY_CONTINUOUS: { + long ptr = reader.readLong(); + boolean loc = reader.readBoolean(); + boolean hasFilter = reader.readBoolean(); + Object filter = reader.readObjectDetached(); + int bufSize = reader.readInt(); + long timeInterval = reader.readLong(); + boolean autoUnsubscribe = reader.readBoolean(); + Query initQry = readInitialQuery(reader); + + PlatformContinuousQuery qry = platformCtx.createContinuousQuery(ptr, hasFilter, filter); + + qry.start(cache, loc, bufSize, timeInterval, autoUnsubscribe, initQry); + + return qry; + } + + default: + return super.processInStreamOutObject(type, reader); + } + } + + /** + * Read arguments for SQL query. + * + * @param reader Reader. + * @return Arguments. + */ + @Nullable private Object[] readQueryArgs(PortableRawReaderEx reader) { + int cnt = reader.readInt(); + + if (cnt > 0) { + Object[] args = new Object[cnt]; + + for (int i = 0; i < cnt; i++) + args[i] = reader.readObjectDetached(); + + return args; + } + else + return null; + } + + /** {@inheritDoc} */ + @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException { + switch (type) { + case OP_GET_NAME: + writer.writeObject(cache.getName()); + + break; + + case OP_METRICS: + CacheMetrics metrics = cache.metrics(); + + writer.writeLong(metrics.getCacheGets()); + writer.writeLong(metrics.getCachePuts()); + writer.writeLong(metrics.getCacheHits()); + writer.writeLong(metrics.getCacheMisses()); + writer.writeLong(metrics.getCacheTxCommits()); + writer.writeLong(metrics.getCacheTxRollbacks()); + writer.writeLong(metrics.getCacheEvictions()); + writer.writeLong(metrics.getCacheRemovals()); + writer.writeFloat(metrics.getAveragePutTime()); + writer.writeFloat(metrics.getAverageGetTime()); + writer.writeFloat(metrics.getAverageRemoveTime()); + writer.writeFloat(metrics.getAverageTxCommitTime()); + writer.writeFloat(metrics.getAverageTxRollbackTime()); + writer.writeString(metrics.name()); + writer.writeLong(metrics.getOverflowSize()); + writer.writeLong(metrics.getOffHeapEntriesCount()); + writer.writeLong(metrics.getOffHeapAllocatedSize()); + writer.writeInt(metrics.getSize()); + writer.writeInt(metrics.getKeySize()); + writer.writeBoolean(metrics.isEmpty()); + writer.writeInt(metrics.getDhtEvictQueueCurrentSize()); + writer.writeInt(metrics.getTxThreadMapSize()); + writer.writeInt(metrics.getTxXidMapSize()); + writer.writeInt(metrics.getTxCommitQueueSize()); + writer.writeInt(metrics.getTxPrepareQueueSize()); + writer.writeInt(metrics.getTxStartVersionCountsSize()); + writer.writeInt(metrics.getTxCommittedVersionsSize()); + writer.writeInt(metrics.getTxRolledbackVersionsSize()); + writer.writeInt(metrics.getTxDhtThreadMapSize()); + writer.writeInt(metrics.getTxDhtXidMapSize()); + writer.writeInt(metrics.getTxDhtCommitQueueSize()); + writer.writeInt(metrics.getTxDhtPrepareQueueSize()); + writer.writeInt(metrics.getTxDhtStartVersionCountsSize()); + writer.writeInt(metrics.getTxDhtCommittedVersionsSize()); + writer.writeInt(metrics.getTxDhtRolledbackVersionsSize()); + writer.writeBoolean(metrics.isWriteBehindEnabled()); + writer.writeInt(metrics.getWriteBehindFlushSize()); + writer.writeInt(metrics.getWriteBehindFlushThreadCount()); + writer.writeLong(metrics.getWriteBehindFlushFrequency()); + writer.writeInt(metrics.getWriteBehindStoreBatchSize()); + writer.writeInt(metrics.getWriteBehindTotalCriticalOverflowCount()); + writer.writeInt(metrics.getWriteBehindCriticalOverflowCount()); + writer.writeInt(metrics.getWriteBehindErrorRetryCount()); + writer.writeInt(metrics.getWriteBehindBufferSize()); + writer.writeString(metrics.getKeyType()); + writer.writeString(metrics.getValueType()); + writer.writeBoolean(metrics.isStoreByValue()); + writer.writeBoolean(metrics.isStatisticsEnabled()); + writer.writeBoolean(metrics.isManagementEnabled()); + writer.writeBoolean(metrics.isReadThrough()); + writer.writeBoolean(metrics.isWriteThrough()); + writer.writeFloat(metrics.getCacheHitPercentage()); + writer.writeFloat(metrics.getCacheMissPercentage()); + + break; + + default: + super.processOutStream(type, writer); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"}) + @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer) + throws IgniteCheckedException { + switch (type) { + case OP_GET: { + writer.writeObjectDetached(cache.get(reader.readObjectDetached())); + + break; + } + + case OP_GET_AND_PUT: { + writer.writeObjectDetached(cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached())); + + break; + } + + case OP_GET_AND_REPLACE: { + writer.writeObjectDetached(cache.getAndReplace(reader.readObjectDetached(), + reader.readObjectDetached())); + + break; + } + + case OP_GET_AND_REMOVE: { + writer.writeObjectDetached(cache.getAndRemove(reader.readObjectDetached())); + + break; + } + + case OP_GET_AND_PUT_IF_ABSENT: { + writer.writeObjectDetached(cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached())); + + break; + } + + case OP_PEEK: { + Object key = reader.readObjectDetached(); + + CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt()); + + writer.writeObjectDetached(cache.localPeek(key, modes)); + + break; + } + + case OP_GET_ALL: { + Set keys = PlatformUtils.readSet(reader); + + Map entries = cache.getAll(keys); + + PlatformUtils.writeNullableMap(writer, entries); + + break; + } + + case OP_INVOKE: { + Object key = reader.readObjectDetached(); + + CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); + + try { + writer.writeObjectDetached(cache.invoke(key, proc)); + } + catch (EntryProcessorException ex) + { + if (ex.getCause() instanceof PlatformNativeException) + writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause()); + else + throw ex; + } + + break; + } + + case OP_INVOKE_ALL: { + Set<Object> keys = PlatformUtils.readSet(reader); + + CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); + + writeInvokeAllResult(writer, cache.invokeAll(keys, proc)); + + break; + } + + case OP_LOCK: + writer.writeLong(registerLock(cache.lock(reader.readObjectDetached()))); + + break; + + case OP_LOCK_ALL: + writer.writeLong(registerLock(cache.lockAll(PlatformUtils.readCollection(reader)))); + + break; + + default: + super.processInStreamOutStream(type, reader, writer); + } + } + + /** {@inheritDoc} */ + @Override public Exception convertException(Exception e) { + if (e instanceof CachePartialUpdateException) + return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e.getCause(), + platformCtx, keepPortable); + + if (e instanceof CachePartialUpdateCheckedException) + return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e, platformCtx, keepPortable); + + if (e.getCause() instanceof EntryProcessorException) + return (EntryProcessorException) e.getCause(); + + return super.convertException(e); + } + + /** + * Writes the result of InvokeAll cache method. + * + * @param writer Writer. + * @param results Results. + */ + private static void writeInvokeAllResult(PortableRawWriterEx writer, Map<Object, EntryProcessorResult> results) { + if (results == null) { + writer.writeInt(-1); + + return; + } + + writer.writeInt(results.size()); + + for (Map.Entry<Object, EntryProcessorResult> entry : results.entrySet()) { + writer.writeObjectDetached(entry.getKey()); + + EntryProcessorResult procRes = entry.getValue(); + + try { + Object res = procRes.get(); + + writer.writeBoolean(false); // No exception + + writer.writeObjectDetached(res); + } + catch (Exception ex) { + writer.writeBoolean(true); // Exception + + writeError(writer, ex); + } + } + } + + /** + * Writes an error to the writer either as a native exception, or as a couple of strings. + * @param writer Writer. + * @param ex Exception. + */ + private static void writeError(PortableRawWriterEx writer, Exception ex) { + if (ex.getCause() instanceof PlatformNativeException) + writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause()); + else { + writer.writeObjectDetached(ex.getClass().getName()); + writer.writeObjectDetached(ex.getMessage()); + } + } + + /** <inheritDoc /> */ + @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { + return cache.future(); + } + + /** <inheritDoc /> */ + @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) { + if (opId == OP_GET_ALL) + return WRITER_GET_ALL; + + if (opId == OP_INVOKE) + return WRITER_INVOKE; + + if (opId == OP_INVOKE_ALL) + return WRITER_INVOKE_ALL; + + return null; + } + + /** + * Clears the contents of the cache, without notifying listeners or + * {@ignitelink javax.cache.integration.CacheWriter}s. + * + * @throws IllegalStateException if the cache is closed. + * @throws javax.cache.CacheException if there is a problem during the clear + */ + public void clear() throws IgniteCheckedException { + cache.clear(); + } + + /** + * Removes all entries. + * + * @throws org.apache.ignite.IgniteCheckedException In case of error. + */ + public void removeAll() throws IgniteCheckedException { + cache.removeAll(); + } + + /** + * Read cache size. + * + * @param peekModes Encoded peek modes. + * @param loc Local mode flag. + * @return Size. + */ + public int size(int peekModes, boolean loc) { + CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(peekModes); + + return loc ? cache.localSize(modes) : cache.size(modes); + } + + /** + * Create cache iterator. + * + * @return Cache iterator. + */ + public PlatformCacheIterator iterator() { + Iterator<Cache.Entry> iter = cache.iterator(); + + return new PlatformCacheIterator(platformCtx, iter); + } + + /** + * Create cache iterator over local entries. + * + * @param peekModes Peke modes. + * @return Cache iterator. + */ + public PlatformCacheIterator localIterator(int peekModes) { + CachePeekMode[] peekModes0 = PlatformUtils.decodeCachePeekModes(peekModes); + + Iterator<Cache.Entry> iter = cache.localEntries(peekModes0).iterator(); + + return new PlatformCacheIterator(platformCtx, iter); + } + + /** + * Enters a lock. + * + * @param id Lock id. + */ + public void enterLock(long id) throws InterruptedException { + lock(id).lockInterruptibly(); + } + + /** + * Exits a lock. + * + * @param id Lock id. + */ + public void exitLock(long id) { + lock(id).unlock(); + } + + /** + * Attempts to enter a lock. + * + * @param id Lock id. + * @param timeout Timeout, in milliseconds. -1 for infinite timeout. + */ + public boolean tryEnterLock(long id, long timeout) throws InterruptedException { + return timeout == -1 + ? lock(id).tryLock() + : lock(id).tryLock(timeout, TimeUnit.MILLISECONDS); + } + + /** + * Rebalances the cache. + * + * @param futId Future id. + */ + public void rebalance(long futId) { + PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() { + @Override public Object apply(IgniteFuture fut) { + return null; + } + }), futId, PlatformFutureUtils.TYP_OBJ, this); + } + + /** + * Unregister lock. + * + * @param id Lock id. + */ + public void closeLock(long id){ + Lock lock = lockMap.remove(id); + + assert lock != null : "Failed to unregister lock: " + id; + } + + /** + * Get lock by id. + * + * @param id Id. + * @return Lock. + */ + private Lock lock(long id) { + Lock lock = lockMap.get(id); + + assert lock != null : "Lock not found for ID: " + id; + + return lock; + } + + /** + * Registers a lock in a map. + * + * @param lock Lock to register. + * @return Registered lock id. + */ + private long registerLock(Lock lock) { + long id = LOCK_ID_GEN.incrementAndGet(); + + lockMap.put(id, lock); + + return id; + } + + /** + * Runs specified query. + */ + private PlatformQueryCursor runQuery(PortableRawReaderEx reader, Query qry) throws IgniteCheckedException { + + try { + QueryCursorEx cursor = (QueryCursorEx) cache.query(qry); + + return new PlatformQueryCursor(platformCtx, cursor, + qry.getPageSize() > 0 ? qry.getPageSize(): Query.DFLT_PAGE_SIZE); + } + catch (Exception err) { + throw PlatformUtils.unwrapQueryException(err); + } + } + + /** + * Runs specified fields query. + */ + private PlatformFieldsQueryCursor runFieldsQuery(PortableRawReaderEx reader, Query qry) + throws IgniteCheckedException { + try { + QueryCursorEx cursor = (QueryCursorEx) cache.query(qry); + + return new PlatformFieldsQueryCursor(platformCtx, cursor, + qry.getPageSize() > 0 ? qry.getPageSize() : Query.DFLT_PAGE_SIZE); + } + catch (Exception err) { + throw PlatformUtils.unwrapQueryException(err); + } + } + + /** + * Reads the query of specified type. + */ + private Query readInitialQuery(PortableRawReaderEx reader) throws IgniteCheckedException { + int typ = reader.readInt(); + + switch (typ) { + case -1: + return null; + + case OP_QRY_SCAN: + return readScanQuery(reader); + + case OP_QRY_SQL: + return readSqlQuery(reader); + + case OP_QRY_TXT: + return readTextQuery(reader); + } + + throw new IgniteCheckedException("Unsupported query type: " + typ); + } + + /** + * Reads sql query. + */ + private Query readSqlQuery(PortableRawReaderEx reader) { + boolean loc = reader.readBoolean(); + String sql = reader.readString(); + String typ = reader.readString(); + final int pageSize = reader.readInt(); + + Object[] args = readQueryArgs(reader); + + return new SqlQuery(typ, sql).setPageSize(pageSize).setArgs(args).setLocal(loc); + } + + /** + * Reads fields query. + */ + private Query readFieldsQuery(PortableRawReaderEx reader) { + boolean loc = reader.readBoolean(); + String sql = reader.readString(); + final int pageSize = reader.readInt(); + + Object[] args = readQueryArgs(reader); + + return new SqlFieldsQuery(sql).setPageSize(pageSize).setArgs(args).setLocal(loc); + } + + /** + * Reads text query. + */ + private Query readTextQuery(PortableRawReaderEx reader) { + boolean loc = reader.readBoolean(); + String txt = reader.readString(); + String typ = reader.readString(); + final int pageSize = reader.readInt(); + + return new TextQuery(typ, txt).setPageSize(pageSize).setLocal(loc); + } + + /** + * Reads scan query. + */ + private Query readScanQuery(PortableRawReaderEx reader) { + boolean loc = reader.readBoolean(); + final int pageSize = reader.readInt(); + + boolean hasPart = reader.readBoolean(); + + Integer part = hasPart ? reader.readInt() : null; + + ScanQuery qry = new ScanQuery().setPageSize(pageSize); + + qry.setPartition(part); + + Object pred = reader.readObjectDetached(); + + if (pred != null) + qry.setFilter(platformCtx.createCacheEntryFilter(pred, reader.readLong())); + + qry.setLocal(loc); + + return qry; + } + + /** + * Writes error with EntryProcessorException cause. + */ + private static class GetAllWriter implements PlatformFutureUtils.Writer { + /** <inheritDoc /> */ + @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { + assert obj instanceof Map; + + PlatformUtils.writeNullableMap(writer, (Map) obj); + } + + /** <inheritDoc /> */ + @Override public boolean canWrite(Object obj, Throwable err) { + return err == null; + } + } + + /** + * Writes error with EntryProcessorException cause. + */ + private static class EntryProcessorInvokeWriter implements PlatformFutureUtils.Writer { + /** <inheritDoc /> */ + @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { + if (err == null) { + writer.writeBoolean(false); // No error. + + writer.writeObjectDetached(obj); + } + else { + writer.writeBoolean(true); // Error. + + writeError(writer, (Exception) err); + } + } + + /** <inheritDoc /> */ + @Override public boolean canWrite(Object obj, Throwable err) { + return true; + } + } + + /** + * Writes results of InvokeAll method. + */ + private static class EntryProcessorInvokeAllWriter implements PlatformFutureUtils.Writer { + /** <inheritDoc /> */ + @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { + writeInvokeAllResult(writer, (Map)obj); + } + + /** <inheritDoc /> */ + @Override public boolean canWrite(Object obj, Throwable err) { + return obj != null && err == null; + } + } + + /** + * Interop expiry policy. + */ + private static class InteropExpiryPolicy implements ExpiryPolicy { + /** Duration: unchanged. */ + private static final long DUR_UNCHANGED = -2; + + /** Duration: eternal. */ + private static final long DUR_ETERNAL = -1; + + /** Duration: zero. */ + private static final long DUR_ZERO = 0; + + /** Expiry for create. */ + private final Duration create; + + /** Expiry for update. */ + private final Duration update; + + /** Expiry for access. */ + private final Duration access; + + /** + * Constructor. + * + * @param create Expiry for create. + * @param update Expiry for update. + * @param access Expiry for access. + */ + public InteropExpiryPolicy(long create, long update, long access) { + this.create = convert(create); + this.update = convert(update); + this.access = convert(access); + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForCreation() { + return create; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForUpdate() { + return update; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForAccess() { + return access; + } + + /** + * Convert encoded duration to actual duration. + * + * @param dur Encoded duration. + * @return Actual duration. + */ + private static Duration convert(long dur) { + if (dur == DUR_UNCHANGED) + return null; + else if (dur == DUR_ETERNAL) + return Duration.ETERNAL; + else if (dur == DUR_ZERO) + return Duration.ZERO; + else { + assert dur > 0; + + return new Duration(TimeUnit.MILLISECONDS, dur); + } + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java new file mode 100644 index 0000000..5f8ec8f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.resources.IgniteInstanceResource; + +/** + * Interop filter. Delegates apply to native platform. + */ +public class PlatformCacheEntryFilterImpl extends PlatformAbstractPredicate implements PlatformCacheEntryFilter { + /** */ + private static final long serialVersionUID = 0L; + + /** + * {@link java.io.Externalizable} support. + */ + public PlatformCacheEntryFilterImpl() { + super(); + } + + /** + * Constructor. + * + * @param pred .Net portable predicate. + * @param ptr Pointer to predicate in the native platform. + * @param ctx Kernal context. + */ + public PlatformCacheEntryFilterImpl(Object pred, long ptr, PlatformContext ctx) { + super(pred, ptr, ctx); + + assert pred != null; + } + + /** {@inheritDoc} */ + @Override public boolean apply(Object k, Object v) { + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(k); + writer.writeObject(v); + + out.synchronize(); + + return ctx.gateway().cacheEntryFilterApply(ptr, mem.pointer()) != 0; + } + } + + /** {@inheritDoc} */ + @Override public void onClose() { + if (ptr == 0) + return; + + assert ctx != null; + + ctx.gateway().cacheEntryFilterDestroy(ptr); + + ptr = 0; + } + + /** + * @param ignite Ignite instance. + */ + @IgniteInstanceResource + public void setIgniteInstance(Ignite ignite) { + ctx = PlatformUtils.platformContext(ignite); + + if (ptr != 0) + return; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(pred); + + out.synchronize(); + + ptr = ctx.gateway().cacheEntryFilterCreate(mem.pointer()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java new file mode 100644 index 0000000..f59a63f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformProcessor; +import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Platform cache entry processor. Delegates processing to native platform. + */ +public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProcessor, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Indicates that entry has not been modified */ + private static final byte ENTRY_STATE_INTACT = 0; + + /** Indicates that entry value has been set */ + private static final byte ENTRY_STATE_VALUE_SET = 1; + + /** Indicates that remove has been called on an entry */ + private static final byte ENTRY_STATE_REMOVED = 2; + + /** Indicates error in processor that is written as portable. */ + private static final byte ENTRY_STATE_ERR_PORTABLE = 3; + + /** Indicates error in processor that is written as string. */ + private static final byte ENTRY_STATE_ERR_STRING = 4; + + /** Native portable processor */ + private Object proc; + + /** Pointer to processor in the native platform. */ + private transient long ptr; + + /** + * {@link java.io.Externalizable} support. + */ + public PlatformCacheEntryProcessorImpl() { + // No-op. + } + + /** + * Constructor. + * + * @param proc Native portable processor + * @param ptr Pointer to processor in the native platform. + */ + public PlatformCacheEntryProcessorImpl(Object proc, long ptr) { + this.proc = proc; + this.ptr = ptr; + } + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry entry, Object... args) + throws EntryProcessorException { + try { + IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class); + + PlatformProcessor interopProc; + + try { + interopProc = PlatformUtils.platformProcessor(ignite); + } + catch (IllegalStateException ex){ + throw new EntryProcessorException(ex); + } + + interopProc.awaitStart(); + + return execute0(interopProc.context(), entry); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * Executes interop entry processor on a given entry, updates entry and returns result. + * + * @param ctx Context. + * @param entry Entry. + * @return Processing result. + * @throws org.apache.ignite.IgniteCheckedException + */ + private Object execute0(PlatformContext ctx, MutableEntry entry) + throws IgniteCheckedException { + try (PlatformMemory outMem = ctx.memory().allocate()) { + PlatformOutputStream out = outMem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writeEntryAndProcessor(entry, writer); + + out.synchronize(); + + try (PlatformMemory inMem = ctx.memory().allocate()) { + PlatformInputStream in = inMem.input(); + + ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer()); + + in.synchronize(); + + PortableRawReaderEx reader = ctx.reader(in); + + return readResultAndUpdateEntry(ctx, entry, reader); + } + } + } + + /** + * Writes mutable entry and entry processor to the stream. + * + * @param entry Entry to process. + * @param writer Writer. + */ + private void writeEntryAndProcessor(MutableEntry entry, PortableRawWriterEx writer) { + writer.writeObject(entry.getKey()); + writer.writeObject(entry.getValue()); + + if (ptr != 0) { + // Execute locally - we have a pointer to native processor. + writer.writeBoolean(true); + writer.writeLong(ptr); + } + else { + // We are on a remote node. Send processor holder back to native. + writer.writeBoolean(false); + writer.writeObject(proc); + } + } + + /** + * Reads processing result from stream, updates mutable entry accordingly, and returns the result. + * + * @param entry Mutable entry to update. + * @param reader Reader. + * @return Entry processing result + * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code. + */ + @SuppressWarnings("unchecked") + private Object readResultAndUpdateEntry(PlatformContext ctx, MutableEntry entry, PortableRawReaderEx reader) { + byte state = reader.readByte(); + + switch (state) { + case ENTRY_STATE_VALUE_SET: + entry.setValue(reader.readObjectDetached()); + + break; + + case ENTRY_STATE_REMOVED: + entry.remove(); + + break; + + case ENTRY_STATE_ERR_PORTABLE: + // Full exception + Object nativeErr = reader.readObjectDetached(); + + assert nativeErr != null; + + throw new EntryProcessorException("Failed to execute native cache entry processor.", + ctx.createNativeException(nativeErr)); + + case ENTRY_STATE_ERR_STRING: + // Native exception was not serializable, we have only message. + String errMsg = reader.readString(); + + assert errMsg != null; + + throw new EntryProcessorException("Failed to execute native cache entry processor: " + errMsg); + + default: + assert state == ENTRY_STATE_INTACT; + } + + return reader.readObjectDetached(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(proc); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + proc = in.readObject(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java new file mode 100644 index 0000000..78ca683 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache; + +import java.util.Iterator; +import javax.cache.Cache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; + +/** + * Interop cache iterator. + */ +public class PlatformCacheIterator extends PlatformAbstractTarget { + /** Operation: next entry. */ + private static final int OP_NEXT = 1; + + /** Iterator. */ + private final Iterator<Cache.Entry> iter; + + /** + * Constructor. + * + * @param platformCtx Context. + * @param iter Iterator. + */ + public PlatformCacheIterator(PlatformContext platformCtx, Iterator<Cache.Entry> iter) { + super(platformCtx); + + this.iter = iter; + } + + /** {@inheritDoc} */ + @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException { + switch (type) { + case OP_NEXT: + if (iter.hasNext()) { + Cache.Entry e = iter.next(); + + assert e != null; + + writer.writeBoolean(true); + + writer.writeObjectDetached(e.getKey()); + writer.writeObjectDetached(e.getValue()); + } + else + writer.writeBoolean(false); + + break; + + default: + super.processOutStream(type, writer); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java new file mode 100644 index 0000000..ef17a06 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache; + +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformExtendedException; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; + +import java.util.Collection; + +/** + * Interop cache partial update exception. + */ +public class PlatformCachePartialUpdateException extends PlatformExtendedException { + /** */ + private static final long serialVersionUID = 0L; + + /** Keep portable flag. */ + private final boolean keepPortable; + + /** + * Constructor. + * + * @param cause Root cause. + * @param ctx Context. + * @param keepPortable Keep portable flag. + */ + public PlatformCachePartialUpdateException(CachePartialUpdateCheckedException cause, PlatformContext ctx, + boolean keepPortable) { + super(cause, ctx); + this.keepPortable = keepPortable; + } + + /** {@inheritDoc} */ + @Override public void writeData(PortableRawWriterEx writer) { + Collection keys = ((CachePartialUpdateCheckedException)getCause()).failedKeys(); + + writer.writeBoolean(keepPortable); + + PlatformUtils.writeNullableCollection(writer, keys); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java new file mode 100644 index 0000000..9dd7416 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Native cache wrapper implementation. + */ +@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"}) +public class PlatformAffinity extends PlatformAbstractTarget { + /** */ + public static final int OP_AFFINITY_KEY = 1; + + /** */ + public static final int OP_ALL_PARTITIONS = 2; + + /** */ + public static final int OP_BACKUP_PARTITIONS = 3; + + /** */ + public static final int OP_IS_BACKUP = 4; + + /** */ + public static final int OP_IS_PRIMARY = 5; + + /** */ + public static final int OP_IS_PRIMARY_OR_BACKUP = 6; + + /** */ + public static final int OP_MAP_KEY_TO_NODE = 7; + + /** */ + public static final int OP_MAP_KEY_TO_PRIMARY_AND_BACKUPS = 8; + + /** */ + public static final int OP_MAP_KEYS_TO_NODES = 9; + + /** */ + public static final int OP_MAP_PARTITION_TO_NODE = 10; + + /** */ + public static final int OP_MAP_PARTITION_TO_PRIMARY_AND_BACKUPS = 11; + + /** */ + public static final int OP_MAP_PARTITIONS_TO_NODES = 12; + + /** */ + public static final int OP_PARTITION = 13; + + /** */ + public static final int OP_PRIMARY_PARTITIONS = 14; + + /** */ + private static final C1<ClusterNode, UUID> TO_NODE_ID = new C1<ClusterNode, UUID>() { + @Nullable @Override public UUID apply(ClusterNode node) { + return node != null ? node.id() : null; + } + }; + + /** Underlying cache affinity. */ + private final Affinity<Object> aff; + + /** Discovery manager */ + private final GridDiscoveryManager discovery; + + /** + * Constructor. + * + * @param platformCtx Context. + * @param igniteCtx Ignite context. + * @param name Cache name. + */ + public PlatformAffinity(PlatformContext platformCtx, GridKernalContext igniteCtx, @Nullable String name) + throws IgniteCheckedException { + super(platformCtx); + + this.aff = igniteCtx.grid().affinity(name); + + if (aff == null) + throw new IgniteCheckedException("Cache with the given name doesn't exist: " + name); + + discovery = igniteCtx.discovery(); + } + + /** {@inheritDoc} */ + @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case OP_PARTITION: + return aff.partition(reader.readObjectDetached()); + + case OP_IS_PRIMARY: { + UUID nodeId = reader.readUuid(); + + Object key = reader.readObjectDetached(); + + ClusterNode node = discovery.node(nodeId); + + if (node == null) + return FALSE; + + return aff.isPrimary(node, key) ? TRUE : FALSE; + } + + case OP_IS_BACKUP: { + UUID nodeId = reader.readUuid(); + + Object key = reader.readObjectDetached(); + + ClusterNode node = discovery.node(nodeId); + + if (node == null) + return FALSE; + + return aff.isBackup(node, key) ? TRUE : FALSE; + } + + case OP_IS_PRIMARY_OR_BACKUP: { + UUID nodeId = reader.readUuid(); + + Object key = reader.readObjectDetached(); + + ClusterNode node = discovery.node(nodeId); + + if (node == null) + return FALSE; + + return aff.isPrimaryOrBackup(node, key) ? TRUE : FALSE; + } + + default: + return super.processInStreamOutLong(type, reader); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"}) + @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer) + throws IgniteCheckedException { + switch (type) { + case OP_PRIMARY_PARTITIONS: { + UUID nodeId = reader.readObject(); + + ClusterNode node = discovery.node(nodeId); + + int[] parts = node != null ? aff.primaryPartitions(node) : U.EMPTY_INTS; + + writer.writeIntArray(parts); + + break; + } + + case OP_BACKUP_PARTITIONS: { + UUID nodeId = reader.readObject(); + + ClusterNode node = discovery.node(nodeId); + + int[] parts = node != null ? aff.backupPartitions(node) : U.EMPTY_INTS; + + writer.writeIntArray(parts); + + break; + } + + case OP_ALL_PARTITIONS: { + UUID nodeId = reader.readObject(); + + ClusterNode node = discovery.node(nodeId); + + int[] parts = node != null ? aff.allPartitions(node) : U.EMPTY_INTS; + + writer.writeIntArray(parts); + + break; + } + + case OP_AFFINITY_KEY: { + Object key = reader.readObjectDetached(); + + writer.writeObject(aff.affinityKey(key)); + + break; + } + + case OP_MAP_KEY_TO_NODE: { + Object key = reader.readObjectDetached(); + + ClusterNode node = aff.mapKeyToNode(key); + + platformCtx.writeNode(writer, node); + + break; + } + + case OP_MAP_PARTITION_TO_NODE: { + int part = reader.readObject(); + + ClusterNode node = aff.mapPartitionToNode(part); + + platformCtx.writeNode(writer, node); + + break; + } + + case OP_MAP_KEY_TO_PRIMARY_AND_BACKUPS: { + Object key = reader.readObjectDetached(); + + platformCtx.writeNodes(writer, aff.mapKeyToPrimaryAndBackups(key)); + + break; + } + + case OP_MAP_PARTITION_TO_PRIMARY_AND_BACKUPS: { + int part = reader.readObject(); + + platformCtx.writeNodes(writer, aff.mapPartitionToPrimaryAndBackups(part)); + + break; + } + + case OP_MAP_KEYS_TO_NODES: { + Collection<Object> keys = reader.readCollection(); + + Map<ClusterNode, Collection<Object>> map = aff.mapKeysToNodes(keys); + + writer.writeInt(map.size()); + + for (Map.Entry<ClusterNode, Collection<Object>> e : map.entrySet()) { + platformCtx.addNode(e.getKey()); + + writer.writeUuid(e.getKey().id()); + writer.writeObject(e.getValue()); + } + + break; + } + + case OP_MAP_PARTITIONS_TO_NODES: { + Collection<Integer> parts = reader.readCollection(); + + Map<Integer, ClusterNode> map = aff.mapPartitionsToNodes(parts); + + writer.writeInt(map.size()); + + for (Map.Entry<Integer, ClusterNode> e : map.entrySet()) { + platformCtx.addNode(e.getValue()); + + writer.writeInt(e.getKey()); + + writer.writeUuid(e.getValue().id()); + } + + break; + } + + default: + super.processInStreamOutStream(type, reader, writer); + } + } + + /** + * @return Gets number of partitions in cache. + */ + public int partitions() { + return aff.partitions(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java new file mode 100644 index 0000000..6c2c873 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.query; + +import java.util.Iterator; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; + +/** + * + */ +public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTarget implements AutoCloseable { + /** Get multiple entries. */ + private static final int OP_GET_ALL = 1; + + /** Get all entries. */ + private static final int OP_GET_BATCH = 2; + + /** Get single entry. */ + private static final int OP_GET_SINGLE = 3; + + /** Underlying cursor. */ + private final QueryCursorEx<T> cursor; + + /** Batch size size. */ + private final int batchSize; + + /** Underlying iterator. */ + private Iterator<T> iter; + + /** + * Constructor. + * + * @param platformCtx Context. + * @param cursor Underlying cursor. + * @param batchSize Batch size. + */ + public PlatformAbstractQueryCursor(PlatformContext platformCtx, QueryCursorEx<T> cursor, int batchSize) { + super(platformCtx); + + this.cursor = cursor; + this.batchSize = batchSize; + } + + /** {@inheritDoc} */ + @Override protected void processOutStream(int type, final PortableRawWriterEx writer) throws IgniteCheckedException { + switch (type) { + case OP_GET_BATCH: { + assert iter != null : "iterator() has not been called"; + + try { + int cntPos = writer.reserveInt(); + + int cnt; + + for (cnt = 0; cnt < batchSize; cnt++) { + if (iter.hasNext()) + write(writer, iter.next()); + else + break; + } + + writer.writeInt(cntPos, cnt); + } + catch (Exception err) { + throw PlatformUtils.unwrapQueryException(err); + } + + break; + } + + case OP_GET_SINGLE: { + assert iter != null : "iterator() has not been called"; + + try { + if (iter.hasNext()) { + write(writer, iter.next()); + + return; + } + } + catch (Exception err) { + throw PlatformUtils.unwrapQueryException(err); + } + + throw new IgniteCheckedException("No more data available."); + } + + case OP_GET_ALL: { + try { + int pos = writer.reserveInt(); + + Consumer<T> consumer = new Consumer<>(this, writer); + + cursor.getAll(consumer); + + writer.writeInt(pos, consumer.cnt); + } + catch (Exception err) { + throw PlatformUtils.unwrapQueryException(err); + } + + break; + } + + default: + super.processOutStream(type, writer); + } + } + + /** + * Get cursor iterator. + */ + public void iterator() { + iter = cursor.iterator(); + } + + /** + * Check whether next iterator entry exists. + * + * @return {@code True} if exists. + */ + @SuppressWarnings("UnusedDeclaration") + public boolean iteratorHasNext() { + assert iter != null : "iterator() has not been called"; + + return iter.hasNext(); + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + cursor.close(); + } + + /** + * Write value to the stream. Extension point to perform conversions on the object before writing it. + * + * @param writer Writer. + * @param val Value. + */ + protected abstract void write(PortableRawWriterEx writer, T val); + + /** + * Query cursor consumer. + */ + private static class Consumer<T> implements QueryCursorEx.Consumer<T> { + /** Current query cursor. */ + private final PlatformAbstractQueryCursor<T> cursor; + + /** Writer. */ + private final PortableRawWriterEx writer; + + /** Count. */ + private int cnt; + + /** + * Constructor. + * + * @param writer Writer. + */ + public Consumer(PlatformAbstractQueryCursor<T> cursor, PortableRawWriterEx writer) { + this.cursor = cursor; + this.writer = writer; + } + + /** {@inheritDoc} */ + @Override public void consume(T val) throws IgniteCheckedException { + cursor.write(writer, val); + + cnt++; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java new file mode 100644 index 0000000..453e233 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.query; + +import java.io.ObjectStreamException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.cache.Cache; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; + +/** + * Interop continuous query handle. + */ +public class PlatformContinuousQueryImpl implements PlatformContinuousQuery { + /** */ + private static final long serialVersionUID = 0L; + + /** Context. */ + protected final PlatformContext platformCtx; + + /** Whether filter exists. */ + private final boolean hasFilter; + + /** Native filter in serialized form. If null, then filter is either not set, or this is local query. */ + protected final Object filter; + + /** Pointer to native counterpart; zero if closed. */ + private long ptr; + + /** Cursor to handle filter close. */ + private QueryCursor cursor; + + /** Lock for concurrency control. */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** Wrapped initial qry cursor. */ + private PlatformQueryCursor initialQryCur; + + /** + * Constructor. + * + * @param platformCtx Context. + * @param ptr Pointer to native counterpart. + * @param hasFilter Whether filter exists. + * @param filter Filter. + */ + public PlatformContinuousQueryImpl(PlatformContext platformCtx, long ptr, boolean hasFilter, Object filter) { + assert ptr != 0L; + + this.platformCtx = platformCtx; + this.ptr = ptr; + this.hasFilter = hasFilter; + this.filter = filter; + } + + /** + * Start query execution. + * + * @param cache Cache. + * @param loc Local flag. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto-unsubscribe flag. + * @param initialQry Initial query. + */ + @SuppressWarnings("unchecked") + public void start(IgniteCacheProxy cache, boolean loc, int bufSize, long timeInterval, boolean autoUnsubscribe, + Query initialQry) throws IgniteCheckedException { + assert !loc || filter == null; + + lock.writeLock().lock(); + + try { + try { + ContinuousQuery qry = new ContinuousQuery(); + + qry.setLocalListener(this); + qry.setRemoteFilter(this); // Filter must be set always for correct resource release. + qry.setPageSize(bufSize); + qry.setTimeInterval(timeInterval); + qry.setAutoUnsubscribe(autoUnsubscribe); + qry.setInitialQuery(initialQry); + + cursor = cache.query(qry.setLocal(loc)); + + if (initialQry != null) + initialQryCur = new PlatformQueryCursor(platformCtx, new QueryCursorEx<Cache.Entry>() { + @Override public Iterator<Cache.Entry> iterator() { + return cursor.iterator(); + } + + @Override public List<Cache.Entry> getAll() { + return cursor.getAll(); + } + + @Override public void close() { + // No-op: do not close whole continuous query when initial query cursor closes. + } + + @Override public void getAll(Consumer<Cache.Entry> clo) throws IgniteCheckedException { + for (Cache.Entry t : this) + clo.consume(t); + } + + @Override public List<GridQueryFieldMetadata> fieldsMeta() { + return null; + } + }, initialQry.getPageSize() > 0 ? initialQry.getPageSize() : Query.DFLT_PAGE_SIZE); + } + catch (Exception e) { + try + { + close0(); + } + catch (Exception ignored) + { + // Ignore + } + + throw PlatformUtils.unwrapQueryException(e); + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void onUpdated(Iterable evts) throws CacheEntryListenerException { + lock.readLock().lock(); + + try { + if (ptr == 0) + throw new CacheEntryListenerException("Failed to notify listener because it has been closed."); + + PlatformUtils.applyContinuousQueryEvents(platformCtx, ptr, evts); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException { + lock.readLock().lock(); + + try { + if (ptr == 0) + throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed."); + + return !hasFilter || PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onQueryUnregister() { + close(); + } + + /** {@inheritDoc} */ + @Override public void close() { + lock.writeLock().lock(); + + try { + close0(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"UnusedDeclaration", "unchecked"}) + @Override public PlatformTarget getInitialQueryCursor() { + return initialQryCur; + } + + /** + * Internal close routine. + */ + private void close0() { + if (ptr != 0) { + long ptr0 = ptr; + + ptr = 0; + + if (cursor != null) + cursor.close(); + + platformCtx.gateway().continuousQueryFilterRelease(ptr0); + } + } + + /** + * Replacer for remote filter. + * + * @return Filter to be deployed on remote node. + * @throws ObjectStreamException If failed. + */ + Object writeReplace() throws ObjectStreamException { + return filter == null ? null : platformCtx.createContinuousQueryFilter(filter); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java new file mode 100644 index 0000000..71aa38c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.resources.IgniteInstanceResource; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Continuous query filter deployed on remote nodes. + */ +public class PlatformContinuousQueryRemoteFilter implements PlatformContinuousQueryFilter, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Lock for concurrency control. */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** Native filter in serialized form. */ + private Object filter; + + /** Grid hosting the filter. */ + @IgniteInstanceResource + private transient Ignite grid; + + /** Native platform pointer. */ + private transient volatile long ptr; + + /** Close flag. Once set, none requests to native platform is possible. */ + private transient boolean closed; + + /** + * {@link java.io.Externalizable} support. + */ + public PlatformContinuousQueryRemoteFilter() { + // No-op. + } + + /** + * Constructor. + * + * @param filter Serialized native filter. + */ + public PlatformContinuousQueryRemoteFilter(Object filter) { + assert filter != null; + + this.filter = filter; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException { + long ptr0 = ptr; + + if (ptr0 == 0) + deploy(); + + lock.readLock().lock(); + + try { + if (closed) + throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed."); + + PlatformContext platformCtx = PlatformUtils.platformContext(grid); + + return PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Deploy filter to native platform. + */ + private void deploy() { + lock.writeLock().lock(); + + try { + // 1. Do not deploy if the filter has been closed concurrently. + if (closed) + throw new CacheEntryListenerException("Failed to deploy the filter because it has been closed."); + + // 2. Deploy. + PlatformContext ctx = PlatformUtils.platformContext(grid); + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(filter); + + out.synchronize(); + + ptr = ctx.gateway().continuousQueryFilterCreate(mem.pointer()); + } + catch (Exception e) { + // 3. Close in case of failure. + close(); + + throw new CacheEntryListenerException("Failed to deploy the filter.", e); + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onQueryUnregister() { + lock.writeLock().lock(); + + try { + close(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * Close the filter. + */ + private void close() { + if (!closed) { + try { + if (ptr != 0) { + try { + PlatformUtils.platformContext(grid).gateway().continuousQueryFilterRelease(ptr); + } + finally { + // Nullify the pointer in any case. + ptr = 0; + } + } + } + finally { + closed = true; + } + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(filter); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + filter = in.readObject(); + + assert filter != null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PlatformContinuousQueryRemoteFilter.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java new file mode 100644 index 0000000..44a4f14 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.query; + +import java.util.List; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; + +/** + * Interop cursor for fields query. + */ +public class PlatformFieldsQueryCursor extends PlatformAbstractQueryCursor<List<?>> { + /** + * Constructor. + * + * @param platformCtx Platform context. + * @param cursor Backing cursor. + * @param batchSize Batch size. + */ + public PlatformFieldsQueryCursor(PlatformContext platformCtx, QueryCursorEx<List<?>> cursor, int batchSize) { + super(platformCtx, cursor, batchSize); + } + + /** {@inheritDoc} */ + @Override protected void write(PortableRawWriterEx writer, List vals) { + assert vals != null; + + writer.writeInt(vals.size()); + + for (Object val : vals) + writer.writeObjectDetached(val); + } +} \ No newline at end of file