IGNITE-1317: Moved platform cache to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/207b6820 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/207b6820 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/207b6820 Branch: refs/heads/ignite-1093 Commit: 207b6820778e0d65c8181d8094903cc1dd82a863 Parents: 26f0ee0 Author: vozerov-gridgain <[email protected]> Authored: Fri Aug 28 14:36:29 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Aug 28 14:36:29 2015 +0300 ---------------------------------------------------------------------- .../processors/platform/PlatformContext.java | 22 +- .../cache/PlatformCacheEntryFilter.java | 29 + .../platform/cache/PlatformCache.java | 1056 ++++++++++++++++++ .../cache/PlatformCacheEntryFilterImpl.java | 105 ++ .../cache/PlatformCacheEntryProcessor.java | 212 ++++ 5 files changed, 1423 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/207b6820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java index 5275e0d..cbcc91b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java @@ -18,12 +18,14 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.portable.*; import org.apache.ignite.internal.processors.cache.query.continuous.*; +import org.apache.ignite.internal.processors.platform.cache.*; import org.apache.ignite.internal.processors.platform.cache.query.*; import org.apache.ignite.internal.processors.platform.callback.*; import org.apache.ignite.internal.processors.platform.compute.*; @@ -223,10 +225,28 @@ public interface PlatformContext { /** * Create closure job. * - * @param task Task. + * @param task Native task. * @param ptr Pointer. * @param job Native job. * @return Closure job. */ public PlatformJob createClosureJob(Object task, long ptr, Object job); + + /** + * Create cache entry processor. + * + * @param proc Native processor. + * @param ptr Pointer. + * @return Entry processor. + */ + public CacheEntryProcessor createCacheEntryProcessor(Object proc, long ptr); + + /** + * Create cache entry filter. + * + * @param filter Native filter. + * @param ptr Pointer. + * @return Entry filter. + */ + public PlatformCacheEntryFilter createCacheEntryFilter(Object filter, long ptr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/207b6820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java new file mode 100644 index 0000000..ac7cba4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache; + +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.query.*; + +/** + * Platform cache entry filter interface. + */ +public interface PlatformCacheEntryFilter<K, V> extends GridLoadCacheCloseablePredicate<K, V>, + CacheQueryCloseableScanBiPredicate<K, V> { + // No-op. +} http://git-wip-us.apache.org/repos/asf/ignite/blob/207b6820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java new file mode 100644 index 0000000..dff9d67 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -0,0 +1,1056 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.cache.query.*; +import org.apache.ignite.internal.processors.platform.compute.*; +import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.expiry.*; +import javax.cache.processor.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +/** + * Native cache wrapper implementation. + */ +@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"}) +public class PlatformCache extends PlatformAbstractTarget { + /** */ + public static final int OP_CLEAR = 1; + + /** */ + public static final int OP_CLEAR_ALL = 2; + + /** */ + public static final int OP_CONTAINS_KEY = 3; + + /** */ + public static final int OP_CONTAINS_KEYS = 4; + + /** */ + public static final int OP_GET = 5; + + /** */ + public static final int OP_GET_ALL = 6; + + /** */ + public static final int OP_GET_AND_PUT = 7; + + /** */ + public static final int OP_GET_AND_PUT_IF_ABSENT = 8; + + /** */ + public static final int OP_GET_AND_REMOVE = 9; + + /** */ + public static final int OP_GET_AND_REPLACE = 10; + + /** */ + public static final int OP_GET_NAME = 11; + + /** */ + public static final int OP_INVOKE = 12; + + /** */ + public static final int OP_INVOKE_ALL = 13; + + /** */ + public static final int OP_IS_LOCAL_LOCKED = 14; + + /** */ + public static final int OP_LOAD_CACHE = 15; + + /** */ + public static final int OP_LOC_EVICT = 16; + + /** */ + public static final int OP_LOC_LOAD_CACHE = 17; + + /** */ + public static final int OP_LOC_PROMOTE = 18; + + /** */ + public static final int OP_LOCAL_CLEAR = 20; + + /** */ + public static final int OP_LOCAL_CLEAR_ALL = 21; + + /** */ + public static final int OP_LOCK = 22; + + /** */ + public static final int OP_LOCK_ALL = 23; + + /** */ + public static final int OP_METRICS = 24; + + /** */ + private static final int OP_PEEK = 25; + + /** */ + private static final int OP_PUT = 26; + + /** */ + private static final int OP_PUT_ALL = 27; + + /** */ + public static final int OP_PUT_IF_ABSENT = 28; + + /** */ + public static final int OP_QRY_CONTINUOUS = 29; + + /** */ + public static final int OP_QRY_SCAN = 30; + + /** */ + public static final int OP_QRY_SQL = 31; + + /** */ + public static final int OP_QRY_SQL_FIELDS = 32; + + /** */ + public static final int OP_QRY_TXT = 33; + + /** */ + public static final int OP_REMOVE_ALL = 34; + + /** */ + public static final int OP_REMOVE_BOOL = 35; + + /** */ + public static final int OP_REMOVE_OBJ = 36; + + /** */ + public static final int OP_REPLACE_2 = 37; + + /** */ + public static final int OP_REPLACE_3 = 38; + + /** Underlying JCache. */ + private final IgniteCacheProxy cache; + + /** Whether this cache is created with "keepPortable" flag on the other side. */ + private final boolean keepPortable; + + /** */ + private static final GetAllWriter WRITER_GET_ALL = new GetAllWriter(); + + /** */ + private static final EntryProcessorExceptionWriter WRITER_PROC_ERR = new EntryProcessorExceptionWriter(); + + /** */ + private static final EntryProcessorResultsWriter WRITER_INVOKE_ALL = new EntryProcessorResultsWriter(); + + /** Map with currently active locks. */ + private final ConcurrentMap<Long, Lock> lockMap = GridConcurrentFactory.newMap(); + + /** Lock ID sequence. */ + private static final AtomicLong LOCK_ID_GEN = new AtomicLong(); + + /** + * Constructor. + * + * @param platformCtx Context. + * @param cache Underlying cache. + * @param keepPortable Keep portable flag. + */ + public PlatformCache(PlatformContext platformCtx, IgniteCache cache, boolean keepPortable) { + super(platformCtx); + + this.cache = (IgniteCacheProxy)cache; + this.keepPortable = keepPortable; + } + + /** + * Gets cache with "skip-store" flag set. + * + * @return Cache with "skip-store" flag set. + */ + public PlatformCache withSkipStore() { + if (cache.delegate().skipStore()) + return this; + + return new PlatformCache(platformCtx, cache.withSkipStore(), keepPortable); + } + + /** + * Gets cache with "keep portable" flag. + * + * @return Cache with "keep portable" flag set. + */ + public PlatformCache withKeepPortable() { + if (keepPortable) + return this; + + return new PlatformCache(platformCtx, cache.withSkipStore(), true); + } + + /** + * Gets cache with provided expiry policy. + * + * @param create Create. + * @param update Update. + * @param access Access. + * @return Cache. + */ + public PlatformCache withExpiryPolicy(final long create, final long update, final long access) { + IgniteCache cache0 = cache.withExpiryPolicy(new InteropExpiryPolicy(create, update, access)); + + return new PlatformCache(platformCtx, cache0, keepPortable); + } + + /** + * Gets cache with asynchronous mode enabled. + * + * @return Cache with asynchronous mode enabled. + */ + public PlatformCache withAsync() { + if (cache.isAsync()) + return this; + + return new PlatformCache(platformCtx, (IgniteCache)cache.withAsync(), keepPortable); + } + + /** + * Gets cache with no-retries mode enabled. + * + * @return Cache with no-retries mode enabled. + */ + public PlatformCache withNoRetries() { + CacheOperationContext opCtx = cache.operationContext(); + + if (opCtx != null && opCtx.noRetries()) + return this; + + return new PlatformCache(platformCtx, cache.withNoRetries(), keepPortable); + } + + /** {@inheritDoc} */ + @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case OP_PUT: + cache.put(reader.readObjectDetached(), reader.readObjectDetached()); + + return TRUE; + + case OP_REMOVE_BOOL: + return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; + + case OP_REMOVE_ALL: + cache.removeAll(PlatformUtils.readSet(reader)); + + return TRUE; + + case OP_PUT_ALL: + cache.putAll(PlatformUtils.readMap(reader)); + + return TRUE; + + case OP_LOC_EVICT: + cache.localEvict(PlatformUtils.readCollection(reader)); + + return TRUE; + + case OP_CONTAINS_KEY: + return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE; + + case OP_CONTAINS_KEYS: + return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE; + + case OP_LOC_PROMOTE: { + cache.localPromote(PlatformUtils.readSet(reader)); + + break; + } + + case OP_REPLACE_3: + return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(), + reader.readObjectDetached()) ? TRUE : FALSE; + + case OP_LOC_LOAD_CACHE: + loadCache0(reader, true); + + break; + + case OP_LOAD_CACHE: + loadCache0(reader, false); + + break; + + case OP_CLEAR: + cache.clear(reader.readObjectDetached()); + + break; + + case OP_CLEAR_ALL: + cache.clearAll(PlatformUtils.readSet(reader)); + + break; + + case OP_LOCAL_CLEAR: + cache.localClear(reader.readObjectDetached()); + + break; + + case OP_LOCAL_CLEAR_ALL: + cache.localClearAll(PlatformUtils.readSet(reader)); + + break; + + case OP_PUT_IF_ABSENT: { + return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; + } + + case OP_REPLACE_2: { + return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; + } + + case OP_REMOVE_OBJ: { + return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE; + } + + case OP_IS_LOCAL_LOCKED: + return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE; + + default: + throw new IgniteCheckedException("Unsupported operation type: " + type); + } + + return TRUE; + } + + /** + * Loads cache via localLoadCache or loadCache. + */ + private void loadCache0(PortableRawReaderEx reader, boolean loc) throws IgniteCheckedException { + PlatformCacheEntryFilter filter = null; + + Object pred = reader.readObjectDetached(); + + if (pred != null) + filter = platformCtx.createCacheEntryFilter(pred, reader.readLong()); + + Object[] args = reader.readObjectArray(); + + if (loc) + cache.localLoadCache(filter, args); + else + cache.loadCache(filter, args); + } + + /** {@inheritDoc} */ + @Override protected Object processInOpObject(int type, PortableRawReaderEx reader) + throws IgniteCheckedException { + switch (type) { + case OP_QRY_SQL: + return runQuery(reader, readSqlQuery(reader)); + + case OP_QRY_SQL_FIELDS: + return runFieldsQuery(reader, readFieldsQuery(reader)); + + case OP_QRY_TXT: + return runQuery(reader, readTextQuery(reader)); + + case OP_QRY_SCAN: + return runQuery(reader, readScanQuery(reader)); + + case OP_QRY_CONTINUOUS: { + long ptr = reader.readLong(); + boolean loc = reader.readBoolean(); + boolean hasFilter = reader.readBoolean(); + Object filter = reader.readObjectDetached(); + int bufSize = reader.readInt(); + long timeInterval = reader.readLong(); + boolean autoUnsubscribe = reader.readBoolean(); + Query initQry = readInitialQuery(reader); + + PlatformContinuousQuery qry = platformCtx.createContinuousQuery(ptr, hasFilter, filter); + + qry.start(cache, loc, bufSize, timeInterval, autoUnsubscribe, initQry); + + return qry; + } + + default: + return throwUnsupported(type); + } + } + + /** + * Read arguments for SQL query. + * + * @param reader Reader. + * @return Arguments. + */ + @Nullable private Object[] readQueryArgs(PortableRawReaderEx reader) { + int cnt = reader.readInt(); + + if (cnt > 0) { + Object[] args = new Object[cnt]; + + for (int i = 0; i < cnt; i++) + args[i] = reader.readObjectDetached(); + + return args; + } + else + return null; + } + + /** {@inheritDoc} */ + @Override protected void processOutOp(int type, PortableRawWriterEx w) throws IgniteCheckedException { + switch (type) { + case OP_GET_NAME: + w.writeObject(cache.getName()); + + break; + + case OP_METRICS: + CacheMetrics metrics = cache.metrics(); + + w.writeLong(metrics.getCacheGets()); + w.writeLong(metrics.getCachePuts()); + w.writeLong(metrics.getCacheHits()); + w.writeLong(metrics.getCacheMisses()); + w.writeLong(metrics.getCacheTxCommits()); + w.writeLong(metrics.getCacheTxRollbacks()); + w.writeLong(metrics.getCacheEvictions()); + w.writeLong(metrics.getCacheRemovals()); + w.writeFloat(metrics.getAveragePutTime()); + w.writeFloat(metrics.getAverageGetTime()); + w.writeFloat(metrics.getAverageRemoveTime()); + w.writeFloat(metrics.getAverageTxCommitTime()); + w.writeFloat(metrics.getAverageTxRollbackTime()); + w.writeString(metrics.name()); + w.writeLong(metrics.getOverflowSize()); + w.writeLong(metrics.getOffHeapEntriesCount()); + w.writeLong(metrics.getOffHeapAllocatedSize()); + w.writeInt(metrics.getSize()); + w.writeInt(metrics.getKeySize()); + w.writeBoolean(metrics.isEmpty()); + w.writeInt(metrics.getDhtEvictQueueCurrentSize()); + w.writeInt(metrics.getTxThreadMapSize()); + w.writeInt(metrics.getTxXidMapSize()); + w.writeInt(metrics.getTxCommitQueueSize()); + w.writeInt(metrics.getTxPrepareQueueSize()); + w.writeInt(metrics.getTxStartVersionCountsSize()); + w.writeInt(metrics.getTxCommittedVersionsSize()); + w.writeInt(metrics.getTxRolledbackVersionsSize()); + w.writeInt(metrics.getTxDhtThreadMapSize()); + w.writeInt(metrics.getTxDhtXidMapSize()); + w.writeInt(metrics.getTxDhtCommitQueueSize()); + w.writeInt(metrics.getTxDhtPrepareQueueSize()); + w.writeInt(metrics.getTxDhtStartVersionCountsSize()); + w.writeInt(metrics.getTxDhtCommittedVersionsSize()); + w.writeInt(metrics.getTxDhtRolledbackVersionsSize()); + w.writeBoolean(metrics.isWriteBehindEnabled()); + w.writeInt(metrics.getWriteBehindFlushSize()); + w.writeInt(metrics.getWriteBehindFlushThreadCount()); + w.writeLong(metrics.getWriteBehindFlushFrequency()); + w.writeInt(metrics.getWriteBehindStoreBatchSize()); + w.writeInt(metrics.getWriteBehindTotalCriticalOverflowCount()); + w.writeInt(metrics.getWriteBehindCriticalOverflowCount()); + w.writeInt(metrics.getWriteBehindErrorRetryCount()); + w.writeInt(metrics.getWriteBehindBufferSize()); + w.writeString(metrics.getKeyType()); + w.writeString(metrics.getValueType()); + w.writeBoolean(metrics.isStoreByValue()); + w.writeBoolean(metrics.isStatisticsEnabled()); + w.writeBoolean(metrics.isManagementEnabled()); + w.writeBoolean(metrics.isReadThrough()); + w.writeBoolean(metrics.isWriteThrough()); + w.writeFloat(metrics.getCacheHitPercentage()); + w.writeFloat(metrics.getCacheMissPercentage()); + + break; + + default: + throwUnsupported(type); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"}) + @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer, + Object arg) throws IgniteCheckedException { + switch (type) { + case OP_GET: { + writer.writeObjectDetached(cache.get(reader.readObjectDetached())); + + break; + } + + case OP_GET_AND_PUT: { + writer.writeObjectDetached(cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached())); + + break; + } + + case OP_GET_AND_REPLACE: { + writer.writeObjectDetached(cache.getAndReplace(reader.readObjectDetached(), + reader.readObjectDetached())); + + break; + } + + case OP_GET_AND_REMOVE: { + writer.writeObjectDetached(cache.getAndRemove(reader.readObjectDetached())); + + break; + } + + case OP_GET_AND_PUT_IF_ABSENT: { + writer.writeObjectDetached(cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached())); + + break; + } + + case OP_PEEK: { + Object key = reader.readObjectDetached(); + + CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt()); + + writer.writeObjectDetached(cache.localPeek(key, modes)); + + break; + } + + case OP_GET_ALL: { + Set keys = PlatformUtils.readSet(reader); + + Map entries = cache.getAll(keys); + + PlatformUtils.writeNullableMap(writer, entries); + + break; + } + + case OP_INVOKE: { + Object key = reader.readObjectDetached(); + + CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); + + try { + writer.writeObjectDetached(cache.invoke(key, proc)); + } + catch (EntryProcessorException ex) + { + if (ex.getCause() instanceof PlatformNativeException) + writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause()); + else + throw ex; + } + + break; + } + + case OP_INVOKE_ALL: { + Set<Object> keys = PlatformUtils.readSet(reader); + + CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); + + writeInvokeAllResult(writer, cache.invokeAll(keys, proc)); + + break; + } + + case OP_LOCK: + writer.writeLong(registerLock(cache.lock(reader.readObjectDetached()))); + + break; + + case OP_LOCK_ALL: + writer.writeLong(registerLock(cache.lockAll(PlatformUtils.readCollection(reader)))); + + break; + } + } + + /** {@inheritDoc} */ + @Override protected Exception convertException(Exception e) { + if (e instanceof CachePartialUpdateException) + return new PlatformCachePartialUpdateException((CachePartialUpdateException)e, platformCtx, keepPortable); + + return super.convertException(e); + } + + /** + * Writes the result of InvokeAll cache method. + * + * @param writer Writer. + * @param results Results. + */ + private static void writeInvokeAllResult(PortableRawWriterEx writer, Map<Object, EntryProcessorResult> results) { + if (results == null) { + writer.writeInt(-1); + + return; + } + + writer.writeInt(results.size()); + + for (Map.Entry<Object, EntryProcessorResult> entry : results.entrySet()) { + writer.writeObjectDetached(entry.getKey()); + + EntryProcessorResult procRes = entry.getValue(); + + try { + Object res = procRes.get(); + + writer.writeBoolean(false); // No exception + + writer.writeObjectDetached(res); + } + catch (Exception ex) { + writer.writeBoolean(true); // Exception + + writeError(writer, ex); + } + } + } + + /** + * Writes an error to the writer either as a native exception, or as a couple of strings. + * @param writer Writer. + * @param ex Exception. + */ + private static void writeError(PortableRawWriterEx writer, Exception ex) { + if (ex.getCause() instanceof PlatformNativeException) + writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause()); + else { + writer.writeObjectDetached(ex.getClass().getName()); + writer.writeObjectDetached(ex.getMessage()); + } + } + + /** <inheritDoc /> */ + @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { + return cache.future(); + } + + /** <inheritDoc /> */ + @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) { + if (opId == OP_GET_ALL) + return WRITER_GET_ALL; + + if (opId == OP_INVOKE) + return WRITER_PROC_ERR; + + if (opId == OP_INVOKE_ALL) + return WRITER_INVOKE_ALL; + + return null; + } + + /** + * Clears the contents of the cache, without notifying listeners or + * {@link javax.cache.integration.CacheWriter}s. + * + * @throws IllegalStateException if the cache is closed. + * @throws javax.cache.CacheException if there is a problem during the clear + */ + public void clear() throws IgniteCheckedException { + cache.clear(); + } + + /** + * Removes all entries. + * + * @throws org.apache.ignite.IgniteCheckedException In case of error. + */ + public void removeAll() throws IgniteCheckedException { + cache.removeAll(); + } + + /** + * Read cache size. + * + * @param peekModes Encoded peek modes. + * @param loc Local mode flag. + * @return Size. + */ + public int size(int peekModes, boolean loc) { + CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(peekModes); + + return loc ? cache.localSize(modes) : cache.size(modes); + } + + /** + * Create cache iterator. + * + * @return Cache iterator. + */ + public PlatformCacheIterator iterator() { + Iterator<Cache.Entry> iter = cache.iterator(); + + return new PlatformCacheIterator(platformCtx, iter); + } + + /** + * Create cache iterator over local entries. + * + * @param peekModes Peke modes. + * @return Cache iterator. + */ + public PlatformCacheIterator localIterator(int peekModes) { + CachePeekMode[] peekModes0 = PlatformUtils.decodeCachePeekModes(peekModes); + + Iterator<Cache.Entry> iter = cache.localEntries(peekModes0).iterator(); + + return new PlatformCacheIterator(platformCtx, iter); + } + + /** + * Enters a lock. + * + * @param id Lock id. + */ + public void enterLock(long id) throws InterruptedException { + lock(id).lockInterruptibly(); + } + + /** + * Exits a lock. + * + * @param id Lock id. + */ + public void exitLock(long id) { + lock(id).unlock(); + } + + /** + * Attempts to enter a lock. + * + * @param id Lock id. + * @param timeout Timeout, in milliseconds. -1 for infinite timeout. + */ + public boolean tryEnterLock(long id, long timeout) throws InterruptedException { + return timeout == -1 + ? lock(id).tryLock() + : lock(id).tryLock(timeout, TimeUnit.MILLISECONDS); + } + + /** + * Rebalances the cache. + * + * @param futId Future id. + */ + public void rebalance(long futId) { + PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() { + @Override + public Object apply(IgniteFuture fut) { + return null; + } + }), futId, PlatformFutureUtils.TYP_OBJ); + } + + /** + * Unregister lock. + * + * @param id Lock id. + */ + public void closeLock(long id){ + Lock lock = lockMap.remove(id); + + assert lock != null : "Failed to unregister lock: " + id; + } + + /** + * Get lock by id. + * + * @param id Id. + * @return Lock. + */ + private Lock lock(long id) { + Lock lock = lockMap.get(id); + + assert lock != null : "Lock not found for ID: " + id; + + return lock; + } + + /** + * Registers a lock in a map. + * + * @param lock Lock to register. + * @return Registered lock id. + */ + private long registerLock(Lock lock) { + long id = LOCK_ID_GEN.incrementAndGet(); + + lockMap.put(id, lock); + + return id; + } + + /** + * Runs specified query. + */ + private PlatformQueryCursor runQuery(PortableRawReaderEx reader, Query qry) + throws IgniteCheckedException { + + try { + QueryCursorEx cursor = (QueryCursorEx) cache.query(qry); + + return new PlatformQueryCursor(platformCtx, cursor, + qry.getPageSize() > 0 ? qry.getPageSize(): Query.DFLT_PAGE_SIZE); + } + catch (Exception err) { + throw PlatformUtils.unwrapQueryException(err); + } + } + + /** + * Runs specified fields query. + */ + private PlatformFieldsQueryCursor runFieldsQuery(PortableRawReaderEx reader, Query qry) + throws IgniteCheckedException { + try { + QueryCursorEx cursor = (QueryCursorEx) cache.query(qry); + + return new PlatformFieldsQueryCursor(platformCtx, cursor, + qry.getPageSize() > 0 ? qry.getPageSize() : Query.DFLT_PAGE_SIZE); + } + catch (Exception err) { + throw PlatformUtils.unwrapQueryException(err); + } + } + + /** + * Reads the query of specified type. + */ + private Query readInitialQuery(PortableRawReaderEx reader) + throws IgniteCheckedException { + int typ = reader.readInt(); + + switch (typ) { + case -1: + return null; + + case OP_QRY_SCAN: + return readScanQuery(reader); + + case OP_QRY_SQL: + return readSqlQuery(reader); + + case OP_QRY_TXT: + return readTextQuery(reader); + } + + throw new IgniteCheckedException("Unsupported query type: " + typ); + } + + /** + * Reads sql query. + */ + private Query readSqlQuery(PortableRawReaderEx reader) { + boolean loc = reader.readBoolean(); + String sql = reader.readString(); + String typ = reader.readString(); + final int pageSize = reader.readInt(); + + Object[] args = readQueryArgs(reader); + + return new SqlQuery(typ, sql).setPageSize(pageSize).setArgs(args).setLocal(loc); + } + + /** + * Reads fields query. + */ + private Query readFieldsQuery(PortableRawReaderEx reader) { + boolean loc = reader.readBoolean(); + String sql = reader.readString(); + final int pageSize = reader.readInt(); + + Object[] args = readQueryArgs(reader); + + return new SqlFieldsQuery(sql).setPageSize(pageSize).setArgs(args).setLocal(loc); + } + + /** + * Reads text query. + */ + private Query readTextQuery(PortableRawReaderEx reader) { + boolean loc = reader.readBoolean(); + String txt = reader.readString(); + String typ = reader.readString(); + final int pageSize = reader.readInt(); + + return new TextQuery(typ, txt).setPageSize(pageSize).setLocal(loc); + } + + /** + * Reads scan query. + */ + private Query readScanQuery(PortableRawReaderEx reader) { + boolean loc = reader.readBoolean(); + final int pageSize = reader.readInt(); + + boolean hasPart = reader.readBoolean(); + + Integer part = hasPart ? reader.readInt() : null; + + ScanQuery qry = new ScanQuery().setPageSize(pageSize); + + qry.setPartition(part); + + Object pred = reader.readObjectDetached(); + + if (pred != null) + qry.setFilter(platformCtx.createCacheEntryFilter(pred, reader.readLong())); + + qry.setLocal(loc); + + return qry; + } + + /** + * Writes error with EntryProcessorException cause. + */ + private static class GetAllWriter implements PlatformFutureUtils.Writer { + /** <inheritDoc /> */ + @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { + assert obj instanceof Map; + + PlatformUtils.writeNullableMap(writer, (Map) obj); + } + + /** <inheritDoc /> */ + @Override public boolean canWrite(Object obj, Throwable err) { + return err == null; + } + } + + /** + * Writes error with EntryProcessorException cause. + */ + private static class EntryProcessorExceptionWriter implements PlatformFutureUtils.Writer { + /** <inheritDoc /> */ + @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { + EntryProcessorException entryEx = (EntryProcessorException) err; + + writeError(writer, entryEx); + } + + /** <inheritDoc /> */ + @Override public boolean canWrite(Object obj, Throwable err) { + return err instanceof EntryProcessorException; + } + } + + /** + * Writes results of InvokeAll method. + */ + private static class EntryProcessorResultsWriter implements PlatformFutureUtils.Writer { + /** <inheritDoc /> */ + @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { + writeInvokeAllResult(writer, (Map)obj); + } + + /** <inheritDoc /> */ + @Override public boolean canWrite(Object obj, Throwable err) { + return obj != null && err == null; + } + } + + /** + * Interop expiry policy. + */ + private static class InteropExpiryPolicy implements ExpiryPolicy { + /** Duration: unchanged. */ + private static final long DUR_UNCHANGED = -2; + + /** Duration: eternal. */ + private static final long DUR_ETERNAL = -1; + + /** Duration: zero. */ + private static final long DUR_ZERO = 0; + + /** Expiry for create. */ + private final Duration create; + + /** Expiry for update. */ + private final Duration update; + + /** Expiry for access. */ + private final Duration access; + + /** + * Constructor. + * + * @param create Expiry for create. + * @param update Expiry for update. + * @param access Expiry for access. + */ + public InteropExpiryPolicy(long create, long update, long access) { + this.create = convert(create); + this.update = convert(update); + this.access = convert(access); + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForCreation() { + return create; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForUpdate() { + return update; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForAccess() { + return access; + } + + /** + * Convert encoded duration to actual duration. + * + * @param dur Encoded duration. + * @return Actual duration. + */ + private static Duration convert(long dur) { + if (dur == DUR_UNCHANGED) + return null; + else if (dur == DUR_ETERNAL) + return Duration.ETERNAL; + else if (dur == DUR_ZERO) + return Duration.ZERO; + else { + assert dur > 0; + + return new Duration(TimeUnit.MILLISECONDS, dur); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/207b6820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java new file mode 100644 index 0000000..fee2995 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache; + +import org.apache.ignite.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.memory.*; +import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.resources.*; + +/** + * Interop filter. Delegates apply to native platform. + */ +public class PlatformCacheEntryFilterImpl<K, V> extends PlatformAbstractPredicate + implements PlatformCacheEntryFilter<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * {@link java.io.Externalizable} support. + */ + public PlatformCacheEntryFilterImpl() { + super(); + } + + /** + * Constructor. + * + * @param pred .Net portable predicate. + * @param ptr Pointer to predicate in the native platform. + * @param ctx Kernal context. + */ + public PlatformCacheEntryFilterImpl(Object pred, long ptr, PlatformContext ctx) { + super(pred, ptr, ctx); + + assert pred != null; + } + + /** {@inheritDoc} */ + @Override public boolean apply(K k, V v) { + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(k); + writer.writeObject(v); + + out.synchronize(); + + return ctx.gateway().cacheEntryFilterApply(ptr, mem.pointer()) != 0; + } + } + + /** {@inheritDoc} */ + @Override public void onClose() { + if (ptr == 0) + return; + + assert ctx != null; + + ctx.gateway().cacheEntryFilterDestroy(ptr); + + ptr = 0; + } + + /** + * @param ignite Ignite instance. + */ + @IgniteInstanceResource + public void setIgniteInstance(Ignite ignite) { + ctx = PlatformUtils.platformContext(ignite); + + if (ptr != 0) + return; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(pred); + + out.synchronize(); + + ptr = ctx.gateway().cacheEntryFilterCreate(mem.pointer()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/207b6820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java new file mode 100644 index 0000000..ab9ad7c --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.memory.*; +import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import javax.cache.processor.*; +import java.io.*; + +/** + * Interop cache entry processor. Delegates processing to native platform. + */ +public class PlatformCacheEntryProcessor<K, V, T> implements CacheEntryProcessor<K, V, T>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Indicates that entry has not been modified */ + private static final byte ENTRY_STATE_INTACT = 0; + + /** Indicates that entry value has been set */ + private static final byte ENTRY_STATE_VALUE_SET = 1; + + /** Indicates that remove has been called on an entry */ + private static final byte ENTRY_STATE_REMOVED = 2; + + /** Indicates error in processor that is written as portable. */ + private static final byte ENTRY_STATE_ERR_PORTABLE = 3; + + /** Indicates error in processor that is written as string. */ + private static final byte ENTRY_STATE_ERR_STRING = 4; + + /** Native portable processor */ + private Object proc; + + /** Pointer to processor in the native platform. */ + private transient long ptr; + + /** + * {@link java.io.Externalizable} support. + */ + public PlatformCacheEntryProcessor() { + // No-op. + } + + /** + * Constructor. + * + * @param proc Native portable processor + * @param ptr Pointer to processor in the native platform. + */ + public PlatformCacheEntryProcessor(Object proc, long ptr) { + this.proc = proc; + this.ptr = ptr; + } + + /** {@inheritDoc} */ + @Override public T process(MutableEntry<K, V> entry, Object... arguments) throws EntryProcessorException { + try { + IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class); + + PlatformProcessor interopProc; + + try { + interopProc = PlatformUtils.platformProcessor(ignite); + } + catch (IllegalStateException ex){ + throw new EntryProcessorException(ex); + } + + interopProc.awaitStart(); + + return execute0(interopProc.context(), entry); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * Executes interop entry processor on a given entry, updates entry and returns result. + * + * @param ctx Context. + * @param entry Entry. + * @return Processing result. + * @throws org.apache.ignite.IgniteCheckedException + */ + private T execute0(PlatformContext ctx, MutableEntry<K, V> entry) + throws IgniteCheckedException { + try (PlatformMemory outMem = ctx.memory().allocate()) { + PlatformOutputStream out = outMem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writeEntryAndProcessor(entry, writer); + + out.synchronize(); + + try (PlatformMemory inMem = ctx.memory().allocate()) { + PlatformInputStream in = inMem.input(); + + ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer()); + + in.synchronize(); + + PortableRawReaderEx reader = ctx.reader(in); + + return readResultAndUpdateEntry(ctx, entry, reader); + } + } + } + + /** + * Writes mutable entry and entry processor to the stream. + * + * @param entry Entry to process. + * @param writer Writer. + */ + private void writeEntryAndProcessor(MutableEntry<K, V> entry, PortableRawWriterEx writer) { + writer.writeObject(entry.getKey()); + writer.writeObject(entry.getValue()); + + if (ptr != 0) { + // Execute locally - we have a pointer to native processor. + writer.writeBoolean(true); + writer.writeLong(ptr); + } + else { + // We are on a remote node. Send processor holder back to native. + writer.writeBoolean(false); + writer.writeObject(proc); + } + } + + /** + * Reads processing result from stream, updates mutable entry accordingly, and returns the result. + * + * @param entry Mutable entry to update. + * @param reader Reader. + * @return Entry processing result + * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code. + */ + @SuppressWarnings("unchecked") + private T readResultAndUpdateEntry(PlatformContext ctx, MutableEntry<K, V> entry, PortableRawReaderEx reader) { + byte state = reader.readByte(); + + switch (state) { + case ENTRY_STATE_VALUE_SET: + entry.setValue((V)reader.readObject()); + + break; + + case ENTRY_STATE_REMOVED: + entry.remove(); + + break; + + case ENTRY_STATE_ERR_PORTABLE: + // Full exception + Object nativeErr = reader.readObjectDetached(); + + assert nativeErr != null; + + throw new EntryProcessorException("Failed to execute native cache entry processor.", + ctx.createNativeException(nativeErr)); + + case ENTRY_STATE_ERR_STRING: + // Native exception was not serializable, we have only message. + String errMsg = reader.readString(); + + assert errMsg != null; + + throw new EntryProcessorException("Failed to execute native cache entry processor: " + errMsg); + + default: + assert state == ENTRY_STATE_INTACT; + } + + return (T)reader.readObject(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(proc); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + proc = in.readObject(); + } +}
